PyTorch
Acrobot-ing with Actor Critic

Acrobot-ing with Actor Critic

The Acrobot environment is based on Sutton’s work in Generalization in Reinforcement Learning: Successful Examples Using Sparse Coarse Coding. The system consists of two links connected linearly to form a chain, with one end of the chain fixed. The joint between the two links is actuated. The goal is to apply torques on the actuated joint to swing the free end of the linear chain above a given height while starting from the initial state of hanging downwards.

For detail read: Acrobot.

Acrobot with Random Policy

import gym
import random
import time

#Episode ends when free end reaches target height
env = gym.envs.make("Acrobot-v1")
env.seed(786)
env.reset()
time.sleep(5)
state = env.reset()
is_done = False
total_reward_episode = 0

#the agent selecting action randomly
while not is_done:
    env.render()
    time.sleep(0.05)
    action = random.choice([0, 1, 2])
    next_state, reward, is_done, _ = env.step(action)
    state = next_state
    total_reward_episode += reward

print(total_reward_episode)

The Actor Critic Algorithm

The network for the actor-critic algorithm consists of the following two parts:

Actor: This takes in the input state and outputs the action probabilities.
Essentially, it learns the optimal policy by updating the model using information
provided by the critic.
Critic: This evaluates how good it is to be at the input state by computing the
value function. The value guides the actor on how it should adjust.

These two components share parameters of input and hidden layers in the network, as
learning is more efficient in this way than learning them separately. Accordingly, the loss
function is a summation of two parts, specifically, the negative log likelihood of action
measuring the actor, and the mean squared error between the estimated and computed
return measuring the critic.

Import all the necessary packages and create a Acrobot instance

import torch
import gym
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import random
env = gym.make('Acrobot-v1')

Actor Critic Neural Network Model

class ActorCriticModel(nn.Module):
    def __init__(self, n_input, n_output, n_hidden):
        super(ActorCriticModel, self).__init__()
        self.fc = nn.Linear(n_input, n_hidden)
        self.action = nn.Linear(n_hidden, n_output)
        self.value = nn.Linear(n_hidden, 1)

    def forward(self, x):
        x = torch.Tensor(x)
        x = F.relu(self.fc(x))
     

        action_probs = F.softmax(self.action(x), dim=-1)
        state_values = self.value(x)
        return action_probs, state_values

In above class, the actor and critic share parameters of the input and the hidden
layers; the output of the actor consists of the probability of taking individual actions, and
the output of the critic is the estimated value of the input state.

The Policy Network

class PolicyNetwork():
    def __init__(self, n_state, n_action, n_hidden=50, lr=0.001):
        self.model = ActorCriticModel(n_state, n_action, n_hidden)
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr)
        self.scheduler = torch.optim.lr_scheduler.StepLR(self.optimizer, step_size=10, gamma=0.9)

    def predict(self, s):
        """
        Compute the output using the Actor Critic model
        @param s: input state
        @return: action probabilities, state_value
        """
        return self.model(torch.Tensor(s))

    def update(self, returns, log_probs, state_values):
        """
        Update the weights of the Actor Critic network given the training samples
        @param returns: return (cumulative rewards) for each step in an episode
        @param log_probs: log probability for each step
        @param state_values: state-value for each step
        """
        loss = 0
        for log_prob, value, Gt in zip(log_probs, state_values, returns):
            advantage = Gt - value.item()
            policy_loss = -log_prob * advantage

            value_loss = F.smooth_l1_loss(value, Gt)

            loss += policy_loss + value_loss

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def get_action(self, s):
        """
        Estimate the policy and sample an action, compute its log probability
        @param s: input state
        @return: the selected action, log probability, predicted state-value
        """
        action_probs, state_value = self.predict(s)
        action = torch.multinomial(action_probs, 1).item()
        log_prob = torch.log(action_probs[action])
        return action, log_prob, state_value

The Function implementing Actor-Critic

def actor_critic(env, estimator, n_episode, gamma=1.0):
    """
    Actor Critic algorithm
    @param env: Gym environment
    @param estimator: policy network
    @param n_episode: number of episodes
    @param gamma: the discount factor
    """
    for episode in range(n_episode):
        log_probs = []
        rewards = []
        state_values = []
        state = env.reset()
        while True:
            action, log_prob, state_value = estimator.get_action(state)
            next_state, reward, is_done, _ = env.step(action)
            total_reward_episode[episode] += reward
            log_probs.append(log_prob)
            state_values.append(state_value)
            rewards.append(reward)
            if is_done:

                returns = []

                Gt = 0
                pw = 0

                for reward in rewards[::-1]:
                    Gt += gamma ** pw * reward
                    pw += 1
                    returns.append(Gt)

                returns = returns[::-1]
                returns = torch.tensor(returns)
                returns = (returns - returns.mean()) / (returns.std() + 1e-9)

                estimator.update(returns, log_probs, state_values)
                print('Episode: {}, total reward: {}'.format(episode, total_reward_episode[episode]))

                if total_reward_episode[episode] >= 195:
                    estimator.scheduler.step()

                break

            state = next_state

Setting the parameters and hyperparameters and calling actor-critic function

#number of parameters in observation space of acrobot environment
n_state = env.observation_space.shape[0]
#number of actions the agent can take
n_action = env.action_space.n

#number of units of hidden layer
n_hidden = 500

#learning rate
lr = 0.003

#Instantiate the PolicyNetwork
policy_net = PolicyNetwork(n_state, n_action, n_hidden, lr)

#total number of episode to train the agent
n_episode = 1000

#discount rate
gamma = 0.9

#array to hold reward in each episode
total_reward_episode = [0] * n_episode

#calling the actor_critic function with above parameters and hyper parameters 
#and training the agent
actor_critic(env, policy_net, n_episode, gamma)

Plotting the Total Reward Gained in each episode

import matplotlib.pyplot as plt

plt.plot(total_reward_episode)
plt.title('Episode reward over time')
plt.xlabel('Episode')
plt.ylabel('Total reward')
plt.show()

Output Figure

Testing the trained model for 100 episodes and calculating the average reward

episodes = 100
total_reward = []
total_positive_reward = []
count = 0
for episode in range(episodes):
  print(episode)
  state = env.reset()
  is_done = False
  total_reward_episode = 0
  positive_reward_count = 0
  while not is_done:
    action, log_prob, state_value  = policy_net.get_action(state)
    next_state, reward, is_done, _ = env.step(action)
    state = next_state
    total_reward_episode += reward
    if(reward > -1):
      positive_reward_count += 1

  #counting all episodes with total reward 
  if(total_reward_episode > -120):
    count = count + 1
  total_reward.append(total_reward_episode)
  total_positive_reward.append(positive_reward_count)
print(count)
print(sum(total_reward)/episodes)

37

-137.6

Saving the model (Object of Policy Network) with Pickle

import pickle
def save_object(obj, filename):
    with open(filename, 'wb') as outp:
        pickle.dump(obj, outp, pickle.HIGHEST_PROTOCOL)
save_object(policy_net, 'actor_critic_acrobat.pkl')

Putting All classes in single file so we can reuse them

PolicyNetwork_AC.py

Reloading the Trained Model and simulating

import gym
import torch

from collections import deque
import random
from nn_estimator import DQN
import pickle
import time

#need to import class of PolicyNetwork before importing its pickled object 
#otherwise it will generate error
from PolicyNetwork_AC import PolicyNetwork
from PolicyNetwork_AC import ActorCriticModel


#Episode ends when free end reaches target height
env = gym.envs.make("Acrobot-v1")
env.seed(786)

env.reset()
time.sleep(5)


file_to_read = open("actor_critic_acrobat.pkl", "rb")

policy_net = pickle.load(file_to_read)
print(type(policy_net))
state = env.reset()
is_done = False
total_reward_episode = 0

while not is_done:
    env.render()
    time.sleep(0.05)
    action, log_prob, state_value  = policy_net.get_action(state)
    next_state, reward, is_done, _ = env.step(action)
    state = next_state
    total_reward_episode += reward

print(total_reward_episode)

The Final Output.

The free end of the Acrobot touching the line