Balancing immediate and long-term goals
그로킹 심층 강화학습 중 3장 내용인 "순간 목표와 장기 목표간의 균형"에 대한 내용입니다.
- 정책 반복법, 가치 반복법
Note: 라이브러리 설치를 위해 아래의 패키지들을 설치해주기 바랍니다.
!pip install tqdm numpy scikit-learn pyglet setuptools && \
!pip install gym asciinema pandas tabulate tornado==5.* PyBullet && \
!pip install git+https://github.com/pybox2d/pybox2d#egg=Box2D && \
!pip install git+https://github.com/mimoralea/gym-bandits#egg=gym-bandits && \
!pip install git+https://github.com/mimoralea/gym-walk#egg=gym-walk && \
!pip install git+https://github.com/mimoralea/gym-aima#egg=gym-aima && \
!pip install gym[atari]
import gym, gym_walk, gym_aima
import warnings ; warnings.filterwarnings('ignore')
import gym, gym_walk, gym_aima
import numpy as np
from pprint import pprint
from tqdm import tqdm_notebook as tqdm
from itertools import cycle
import random
np.set_printoptions(suppress=True)
random.seed(123); np.random.seed(123)
def print_policy(pi, P, action_symbols=('<', 'v', '>', '^'), n_cols=4, title='정책:'):
print(title)
arrs = {k:v for k,v in enumerate(action_symbols)}
for s in range(len(P)):
a = pi(s)
print("| ", end="")
if np.all([done for action in P[s].values() for _, _, _, done in action]):
print("".rjust(9), end=" ")
else:
print(str(s).zfill(2), arrs[a].rjust(6), end=" ")
if (s + 1) % n_cols == 0: print("|")
def print_state_value_function(V, P, n_cols=4, prec=3, title='상태-가치 함수:'):
print(title)
for s in range(len(P)):
v = V[s]
print("| ", end="")
if np.all([done for action in P[s].values() for _, _, _, done in action]):
print("".rjust(9), end=" ")
else:
print(str(s).zfill(2), '{}'.format(np.round(v, prec)).rjust(6), end=" ")
if (s + 1) % n_cols == 0: print("|")
def print_action_value_function(Q,
optimal_Q=None,
action_symbols=('<', '>'),
prec=3,
title='행동-가치 함수:'):
vf_types=('',) if optimal_Q is None else ('', '*', 'err')
headers = ['s',] + [' '.join(i) for i in list(itertools.product(vf_types, action_symbols))]
print(title)
states = np.arange(len(Q))[..., np.newaxis]
arr = np.hstack((states, np.round(Q, prec)))
if not (optimal_Q is None):
arr = np.hstack((arr, np.round(optimal_Q, prec), np.round(optimal_Q-Q, prec)))
print(tabulate(arr, headers, tablefmt="fancy_grid"))
def probability_success(env, pi, goal_state, n_episodes=100, max_steps=200):
random.seed(123); np.random.seed(123) ; env.seed(123)
results = []
for _ in range(n_episodes):
state, done, steps = env.reset(), False, 0
while not done and steps < max_steps:
state, _, done, h = env.step(pi(state))
steps += 1
results.append(state == goal_state)
return np.sum(results)/len(results)
def mean_return(env, pi, n_episodes=100, max_steps=200):
random.seed(123); np.random.seed(123) ; env.seed(123)
results = []
for _ in range(n_episodes):
state, done, steps = env.reset(), False, 0
results.append(0.0)
while not done and steps < max_steps:
state, reward, done, _ = env.step(pi(state))
results[-1] += reward
steps += 1
return np.mean(results)
env = gym.make('SlipperyWalkFive-v0')
P = env.env.P
init_state = env.reset()
goal_state = 6
LEFT, RIGHT = range(2)
pi = lambda s: {
0:LEFT, 1:LEFT, 2:LEFT, 3:LEFT, 4:LEFT, 5:LEFT, 6:LEFT
}[s]
print_policy(pi, P, action_symbols=('<', '>'), n_cols=7)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, pi, goal_state=goal_state)*100,
mean_return(env, pi)))
def policy_evaluation(pi, P, gamma=1.0, theta=1e-10):
prev_V = np.zeros(len(P), dtype=np.float64)
while True:
V = np.zeros(len(P), dtype=np.float64)
for s in range(len(P)):
for prob, next_state, reward, done in P[s][pi(s)]:
V[s] += prob * (reward + gamma * prev_V[next_state] * (not done))
if np.max(np.abs(prev_V - V)) < theta:
break
prev_V = V.copy()
return V
V = policy_evaluation(pi, P)
print_state_value_function(V, P, n_cols=7, prec=5)
def policy_improvement(V, P, gamma=1.0):
Q = np.zeros((len(P), len(P[0])), dtype=np.float64)
for s in range(len(P)):
for a in range(len(P[s])):
for prob, next_state, reward, done in P[s][a]:
Q[s][a] += prob * (reward + gamma * V[next_state] * (not done))
new_pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q, axis=1))}[s]
return new_pi
improved_pi = policy_improvement(V, P)
print_policy(improved_pi, P, action_symbols=('<', '>'), n_cols=7)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, improved_pi, goal_state=goal_state)*100,
mean_return(env, improved_pi)))
improved_V = policy_evaluation(improved_pi, P)
print_state_value_function(improved_V, P, n_cols=7, prec=5)
improved_improved_pi = policy_improvement(improved_V, P)
print_policy(improved_improved_pi, P, action_symbols=('<', '>'), n_cols=7)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, improved_improved_pi, goal_state=goal_state)*100,
mean_return(env, improved_improved_pi)))
# if we evaluate again, we can see there is nothing to improve
# that also means we reached the optimal policy
improved_improved_V = policy_evaluation(improved_improved_pi, P)
print_state_value_function(improved_improved_V, P, n_cols=7, prec=5)
assert np.all(improved_V == improved_improved_V)
def policy_iteration(P, gamma=1.0, theta=1e-10):
random_actions = np.random.choice(tuple(P[0].keys()), len(P))
pi = lambda s: {s:a for s, a in enumerate(random_actions)}[s]
while True:
old_pi = {s:pi(s) for s in range(len(P))}
V = policy_evaluation(pi, P, gamma, theta)
pi = policy_improvement(V, P, gamma)
if old_pi == {s:pi(s) for s in range(len(P))}:
break
return V, pi
optimal_V, optimal_pi = policy_iteration(P)
print('Optimal policy and state-value function (PI):')
print_policy(optimal_pi, P, action_symbols=('<', '>'), n_cols=7)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, optimal_pi, goal_state=goal_state)*100,
mean_return(env, optimal_pi)))
print()
print_state_value_function(optimal_V, P, n_cols=7, prec=5)
assert np.all(improved_V == optimal_V)
env = gym.make('FrozenLake-v0')
P = env.env.P
init_state = env.reset()
goal_state = 15
LEFT, DOWN, RIGHT, UP = range(4)
random_pi = lambda s: {
0:RIGHT, 1:LEFT, 2:DOWN, 3:UP,
4:LEFT, 5:LEFT, 6:RIGHT, 7:LEFT,
8:UP, 9:DOWN, 10:UP, 11:LEFT,
12:LEFT, 13:RIGHT, 14:DOWN, 15:LEFT
}[s]
print_policy(random_pi, P)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, random_pi, goal_state=goal_state)*100,
mean_return(env, random_pi)))
go_get_pi = lambda s: {
0:RIGHT, 1:RIGHT, 2:DOWN, 3:LEFT,
4:DOWN, 5:LEFT, 6:DOWN, 7:LEFT,
8:RIGHT, 9:RIGHT, 10:DOWN, 11:LEFT,
12:LEFT, 13:RIGHT, 14:RIGHT, 15:LEFT
}[s]
print_policy(go_get_pi, P)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, go_get_pi, goal_state=goal_state)*100,
mean_return(env, go_get_pi)))
careful_pi = lambda s: {
0:LEFT, 1:UP, 2:UP, 3:UP,
4:LEFT, 5:LEFT, 6:UP, 7:LEFT,
8:UP, 9:DOWN, 10:LEFT, 11:LEFT,
12:LEFT, 13:RIGHT, 14:RIGHT, 15:LEFT
}[s]
print_policy(careful_pi, P)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, careful_pi, goal_state=goal_state)*100,
mean_return(env, careful_pi)))
V = policy_evaluation(careful_pi, P, gamma=0.99)
print_state_value_function(V, P, prec=4)
careful_plus_pi = policy_improvement(V, P, gamma=0.99)
print_policy(careful_plus_pi, P)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, careful_plus_pi, goal_state=goal_state)*100,
mean_return(env, careful_plus_pi)))
new_V = policy_evaluation(careful_plus_pi, P, gamma=0.99)
print_state_value_function(new_V, P, prec=4)
print_state_value_function(new_V - V, P, prec=4)
adversarial_pi = lambda s: {
0:UP, 1:UP, 2:UP, 3:UP,
4:UP, 5:LEFT, 6:UP, 7:LEFT,
8:LEFT, 9:LEFT, 10:LEFT, 11:LEFT,
12:LEFT, 13:LEFT, 14:LEFT, 15:LEFT
}[s]
print_policy(adversarial_pi, P)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, adversarial_pi, goal_state=goal_state)*100,
mean_return(env, adversarial_pi)))
V = policy_evaluation(adversarial_pi, P, gamma=0.99)
print_state_value_function(V, P, prec=2)
i_pi = policy_improvement(V, P, gamma=0.99)
print_policy(i_pi, P)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, i_pi, goal_state=goal_state)*100,
mean_return(env, i_pi)))
i_V = policy_evaluation(i_pi, P, gamma=0.99)
print_state_value_function(i_V, P, prec=2)
ii_pi = policy_improvement(i_V, P, gamma=0.99)
print_policy(ii_pi, P)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, ii_pi, goal_state=goal_state)*100,
mean_return(env, ii_pi)))
ii_V = policy_evaluation(ii_pi, P, gamma=0.99)
print_state_value_function(ii_V, P, prec=2)
iii_pi = policy_improvement(ii_V, P, gamma=0.99)
print_policy(iii_pi, P)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, iii_pi, goal_state=goal_state)*100,
mean_return(env, iii_pi)))
iii_V = policy_evaluation(iii_pi, P, gamma=0.99)
print_state_value_function(iii_V, P, prec=2)
iiii_pi = policy_improvement(iii_V, P, gamma=0.99)
print_policy(iiii_pi, P)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, iiii_pi, goal_state=goal_state)*100,
mean_return(env, iiii_pi)))
iiii_V = policy_evaluation(iiii_pi, P, gamma=0.99)
print_state_value_function(iiii_V, P, prec=2)
iiiii_pi = policy_improvement(iiii_V, P, gamma=0.99)
print_policy(iiiii_pi, P)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, iiiii_pi, goal_state=goal_state)*100,
mean_return(env, iiiii_pi)))
iiiii_V = policy_evaluation(iiiii_pi, P, gamma=0.99)
print_state_value_function(iiiii_V, P, prec=2)
iiiiii_pi = policy_improvement(iiiii_V, P, gamma=0.99)
print_policy(iiiiii_pi, P)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, iiiiii_pi, goal_state=goal_state)*100,
mean_return(env, iiiiii_pi)))
iiiiii_V = policy_evaluation(iiiiii_pi, P, gamma=0.99)
print_state_value_function(iiiiii_V, P, prec=2)
iiiiiii_pi = policy_improvement(iiiiii_V, P, gamma=0.99)
print_policy(iiiiiii_pi, P)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, iiiiiii_pi, goal_state=goal_state)*100,
mean_return(env, iiiiiii_pi)))
V_best_p, pi_best_p = policy_iteration(P, gamma=0.99)
print_state_value_function(V_best_p, P, prec=4)
print()
print('Optimal policy and state-value function (PI):')
print_policy(pi_best_p, P)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, pi_best_p, goal_state=goal_state)*100,
mean_return(env, pi_best_p)))
env = gym.make('SlipperyWalkFive-v0')
init_state = env.reset()
goal_state = 6
P = env.env.P
def value_iteration(P, gamma=1.0, theta=1e-10):
V = np.zeros(len(P), dtype=np.float64)
while True:
Q = np.zeros((len(P), len(P[0])), dtype=np.float64)
for s in range(len(P)):
for a in range(len(P[s])):
for prob, next_state, reward, done in P[s][a]:
Q[s][a] += prob * (reward + gamma * V[next_state] * (not done))
if np.max(np.abs(V - np.max(Q, axis=1))) < theta:
break
V = np.max(Q, axis=1)
pi = lambda s: {s:a for s, a in enumerate(np.argmax(Q, axis=1))}[s]
return V, pi
optimal_V, optimal_pi = value_iteration(P)
print('Optimal policy and state-value function (PI):')
print_policy(optimal_pi, P, action_symbols=('<', '>'), n_cols=7)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, optimal_pi, goal_state=goal_state)*100,
mean_return(env, optimal_pi)))
print()
print_state_value_function(optimal_V, P, n_cols=7, prec=5)
# | | 01 0.668 | 02 0.890 | 03 0.964 | 04 0.989 | 05 0.997 | |
env = gym.make('FrozenLake-v0')
init_state = env.reset()
goal_state = 15
P = env.env.P
V_best_v, pi_best_v = value_iteration(P, gamma=0.99)
print('Optimal policy and state-value function (VI):')
print_policy(pi_best_v, P)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, pi_best_v, goal_state=goal_state)*100,
mean_return(env, pi_best_v)))
print()
print_state_value_function(V_best_v, P, prec=4)
print('For comparison, optimal policy and state-value function (PI):')
print_policy(pi_best_p, P)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, pi_best_p, goal_state=goal_state)*100,
mean_return(env, pi_best_p)))
print()
print_state_value_function(V_best_p, P)
env = gym.make('FrozenLake-v0')
P = env.env.P
# change reward function
reward_goal, reward_holes, reward_others = 1, -1, -0.01
goal, hole = 15, [5, 7, 11, 12]
for s in range(len(P)):
for a in range(len(P[s])):
for t in range(len(P[s][a])):
values = list(P[s][a][t])
if values[1] == goal:
values[2] = reward_goal
values[3] = False
elif values[1] in hole:
values[2] = reward_holes
values[3] = False
else:
values[2] = reward_others
values[3] = False
if s in hole or s == goal:
values[2] = 0
values[3] = True
P[s][a][t] = tuple(values)
# change transition function
prob_action, prob_drift_one, prob_drift_two = 0.8, 0.1, 0.1
for s in range(len(P)):
for a in range(len(P[s])):
for t in range(len(P[s][a])):
if P[s][a][t][0] == 1.0:
continue
values = list(P[s][a][t])
if t == 0:
values[0] = prob_drift_one
elif t == 1:
values[0] = prob_action
elif t == 2:
values[0] = prob_drift_two
P[s][a][t] = tuple(values)
env.env.P = P
V_best, pi_best = policy_iteration(env.env.P, gamma=0.99)
print('Optimal policy and state-value function (PI):')
print_policy(pi_best, P)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, pi_best, goal_state=goal_state)*100,
mean_return(env, pi_best)))
print()
print_state_value_function(V_best, P)
V_best, pi_best = value_iteration(env.env.P, gamma=0.99)
print('Optimal policy and state-value function (PI):')
print_policy(pi_best, P)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, pi_best, goal_state=goal_state)*100,
mean_return(env, pi_best)))
print()
print_state_value_function(V_best, P)
env = gym.make('RussellNorvigGridworld-v0')
init_state = env.reset()
goal_state = 3
P = env.env.P
V_best_p, pi_best = policy_iteration(P)
print('Optimal policy and state-value function (PI):')
print_policy(pi_best, P)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, pi_best, goal_state=goal_state)*100,
mean_return(env, pi_best)))
print()
print_state_value_function(V_best_p, P)
V_best_v, pi_best = value_iteration(P)
print('Optimal policy and state-value function (PI):')
print_policy(pi_best, P)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, pi_best, goal_state=goal_state)*100,
mean_return(env, pi_best)))
print()
print_state_value_function(V_best_v, P)
LEFT, DOWN, RIGHT, UP = range(4)
pi = lambda s: {
0:RIGHT, 1:RIGHT, 2:RIGHT, 3:LEFT,
4:UP, 5:LEFT, 6:UP, 7:LEFT,
8:UP, 9:LEFT, 10:LEFT, 11:LEFT
}[s]
print('Re-construct optimal policy:')
print_policy(pi, P)
print('Reaches goal {:.2f}%. Obtains an average undiscounted return of {:.4f}.'.format(
probability_success(env, pi, goal_state=goal_state)*100,
mean_return(env, pi)))
V = policy_evaluation(pi, P)
print('Evaluate optimal policy:')
print_state_value_function(V, P)
pi = policy_improvement(V, P)
print('Improve optimal policy (nothing to improve -- it is the same policy, because it is optimal):')
print_policy(pi, P)
print('There are no differences, nothing to improve on the optimal policy and state-value function:')
print(np.abs(V_best_p - V))
print(np.abs(V_best_v - V))