REINFORCE on CartPole-v0
In this post, We will take a hands-on-lab of Monte Carlo Policy Gradient (also known as REINFORCE) on openAI gym CartPole-v0 environment. This is the coding exercise from udacity Deep Reinforcement Learning Nanodegree.
REINFORCE
In this notebook, you will implement REINFORCE agent on OpenAI Gym's CartPole-v0 environment. For summary, The REINFORCE algorithm (Williams, 1992) is a monte carlo variation of policy gradient algorithm in RL. The agent collects the trajectory of an episode from current policy. Usually, this policy depends on the policy parameter which denoted as $\theta$. Actually, REINFORCE is acronym for "REward Increment = Nonnegative Factor Offset Reinforcement Characteristic Eligibility"
Import the Necessary Packages
import gym
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = (16, 10)
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
torch.manual_seed(0)
import base64, io
# For visualization
from gym.wrappers.monitoring import video_recorder
from IPython.display import HTML
from IPython import display
import glob
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device
env = gym.make('CartPole-v0')
env.seed(0)
print('observation space:', env.observation_space)
print('action space:', env.action_space)
class Policy(nn.Module):
def __init__(self, state_size=4, action_size=2, hidden_size=32):
super(Policy, self).__init__()
self.fc1 = nn.Linear(state_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, action_size)
def forward(self, state):
x = F.relu(self.fc1(state))
x = self.fc2(x)
# we just consider 1 dimensional probability of action
return F.softmax(x, dim=1)
def act(self, state):
state = torch.from_numpy(state).float().unsqueeze(0).to(device)
probs = self.forward(state).cpu()
model = Categorical(probs)
action = model.sample()
return action.item(), model.log_prob(action)
def reinforce(policy, optimizer, n_episodes=1000, max_t=1000, gamma=1.0, print_every=100):
scores_deque = deque(maxlen=100)
scores = []
for e in range(1, n_episodes):
saved_log_probs = []
rewards = []
state = env.reset()
# Collect trajectory
for t in range(max_t):
# Sample the action from current policy
action, log_prob = policy.act(state)
saved_log_probs.append(log_prob)
state, reward, done, _ = env.step(action)
rewards.append(reward)
if done:
break
# Calculate total expected reward
scores_deque.append(sum(rewards))
scores.append(sum(rewards))
# Recalculate the total reward applying discounted factor
discounts = [gamma ** i for i in range(len(rewards) + 1)]
R = sum([a * b for a,b in zip(discounts, rewards)])
# Calculate the loss
policy_loss = []
for log_prob in saved_log_probs:
# Note that we are using Gradient Ascent, not Descent. So we need to calculate it with negative rewards.
policy_loss.append(-log_prob * R)
# After that, we concatenate whole policy loss in 0th dimension
policy_loss = torch.cat(policy_loss).sum()
# Backpropagation
optimizer.zero_grad()
policy_loss.backward()
optimizer.step()
if e % print_every == 0:
print('Episode {}\tAverage Score: {:.2f}'.format(e, np.mean(scores_deque)))
if np.mean(scores_deque) >= 195.0:
print('Environment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(e - 100, np.mean(scores_deque)))
break
return scores
policy = Policy().to(device)
optimizer = optim.Adam(policy.parameters(), lr=1e-2)
scores = reinforce(policy, optimizer, n_episodes=2000)
fig = plt.figure()
ax = fig.add_subplot(111)
plt.plot(np.arange(1, len(scores)+1), scores)
plt.ylabel('Score')
plt.xlabel('Episode #')
plt.show()
def show_video(env_name):
mp4list = glob.glob('video/*.mp4')
if len(mp4list) > 0:
mp4 = 'video/{}.mp4'.format(env_name)
video = io.open(mp4, 'r+b').read()
encoded = base64.b64encode(video)
display.display(HTML(data='''<video alt="test" autoplay
loop controls style="height: 400px;">
<source src="data:video/mp4;base64,{0}" type="video/mp4" />
</video>'''.format(encoded.decode('ascii'))))
else:
print("Could not find video")
def show_video_of_model(policy, env_name):
env = gym.make(env_name)
vid = video_recorder.VideoRecorder(env, path="video/{}.mp4".format(env_name))
state = env.reset()
done = False
for t in range(1000):
vid.capture_frame()
action, _ = policy.act(state)
next_state, reward, done, _ = env.step(action)
state = next_state
if done:
break
vid.close()
env.close()
show_video_of_model(policy, 'CartPole-v0')
show_video('CartPole-v0')