This commit is contained in:
JohnJim0816
2020-10-07 21:47:25 +08:00
parent 07b835663a
commit 5fe8bfc6c1
23 changed files with 378 additions and 139 deletions

View File

@@ -0,0 +1,19 @@
## CliffWalking-v0环境简介
悬崖寻路问题CliffWalking是指在一个4 x 12的网格中智能体以网格的左下角位置为起点以网格的下角位置为终点目标是移动智能体到达终点位置智能体每次可以在上、下、左、右这4个方向中移动一步每移动一步会得到-1单位的奖励。
<img src="assets/image-20201007211441036.png" alt="image-20201007211441036" style="zoom:50%;" />
如图红色部分表示悬崖数字代表智能体能够观测到的位置信息即observation总共会有0-47等48个不同的值智能体再移动中会有以下限制
* 智能体不能移出网格,如果智能体想执行某个动作移出网格,那么这一步智能体不会移动,但是这个操作依然会得到-1单位的奖励
* 如果智能体“掉入悬崖” ,会立即回到起点位置,并得到-100单位的奖励
* 当智能体移动到终点时,该回合结束,该回合总奖励为各步奖励之和
实际的仿真界面如下:
<img src="assets/image-20201007211858925.png" alt="image-20201007211858925" style="zoom:50%;" />
由于从起点到终点最少需要13步每步得到-1的reward因此最佳训练算法下每个episode下reward总和应该为-13。

View File

@@ -1,3 +1,14 @@
#!/usr/bin/env python
# coding=utf-8
'''
Author: John
Email: johnjim0816@gmail.com
Date: 2020-09-11 23:03:00
LastEditor: John
LastEditTime: 2020-10-07 20:48:29
Discription:
Environment:
'''
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. # Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,64 +23,72 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# -*- coding: utf-8 -*-
import numpy as np import numpy as np
import math
class QLearning(object):
class QLearningAgent(object):
def __init__(self, def __init__(self,
obs_n, obs_dim,
act_n, action_dim,
learning_rate=0.01, learning_rate=0.01,
gamma=0.9, gamma=0.9,
e_greed=0.1): epsilon_start=0.9,epsilon_end=0.1,epsilon_decay=200):
self.act_n = act_n # 动作维度,有几个动作可选 self.action_dim = action_dim # 动作维度,有几个动作可选
self.lr = learning_rate # 学习率 self.lr = learning_rate # 学习率
self.gamma = gamma # reward的衰减率 self.gamma = gamma # reward 的衰减率
self.epsilon = e_greed # 按一定概率随机选动作 self.epsilon = 0 # 按一定概率随机选动作,即 e-greedy 策略, 并且epsilon逐渐衰减
self.Q = np.zeros((obs_n, act_n)) self.sample_count = 0 # epsilon随训练的也就是采样次数逐渐衰减所以需要计数
self.epsilon_start = epsilon_start
self.epsilon_end = epsilon_end
self.epsilon_decay= epsilon_decay
self.Q_table = np.zeros((obs_dim, action_dim)) # Q表
# 根据输入观察值,采样输出的动作值,带探索
def sample(self, obs): def sample(self, obs):
if np.random.uniform(0, 1) < (1.0 - self.epsilon): #根据table的Q值选动作 '''根据输入观测值,采样输出的动作值,带探索,训练模型时使用
'''
self.sample_count += 1
self.epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \
math.exp(-1. * self.sample_count / self.epsilon_decay)
if np.random.uniform(0, 1) > self.epsilon: # 随机选取0-1之间的值如果大于epsilon就按照贪心策略选取action否则随机选取
action = self.predict(obs) action = self.predict(obs)
else: else:
action = np.random.choice(self.act_n) #有一定概率随机探索选取一个动作 action = np.random.choice(self.action_dim) #有一定概率随机探索选取一个动作
return action return action
# 根据输入观察值,预测输出的动作值
def predict(self, obs): def predict(self, obs):
Q_list = self.Q[obs, :] '''根据输入观测值,采样输出的动作值,带探索,测试模型时使用
maxQ = np.max(Q_list) '''
action_list = np.where(Q_list == maxQ)[0] # maxQ可能对应多个action Q_list = self.Q_table[obs, :]
action = np.random.choice(action_list) Q_max = np.max(Q_list)
action_list = np.where(Q_list == Q_max)[0]
action = np.random.choice(action_list) # Q_max可能对应多个 action ,可以随机抽取一个
return action return action
# 学习方法也就是更新Q-table的方法
def learn(self, obs, action, reward, next_obs, done): def learn(self, obs, action, reward, next_obs, done):
""" off-policy '''学习方法(off-policy)也就是更新Q-table的方法
obs: 交互前的obs, s_t Args:
action: 本次交互选择的action, a_t obs [type]: 交互前的obs, s_t
reward: 本次动作获得的奖励r action [type]: 本次交互选择的action, a_t
next_obs: 本次交互后的obs, s_t+1 reward [type]: 本次动作获得的奖励r
done: episode是否结束 next_obs [type]: 本次交互后的obs, s_t+1
""" done function: episode是否结束
predict_Q = self.Q[obs, action] '''
Q_predict = self.Q_table[obs, action]
if done: if done:
target_Q = reward # 没有下一个状态了 Q_target = reward # 没有下一个状态了
else: else:
target_Q = reward + self.gamma * np.max( Q_target = reward + self.gamma * np.max(
self.Q[next_obs, :]) # Q-learning self.Q_table[next_obs, :]) # Q_table-learning
self.Q[obs, action] += self.lr * (target_Q - predict_Q) # 修正q self.Q_table[obs, action] += self.lr * (Q_target - Q_predict) # 修正q
# 把 Q表格 的数据保存到文件中
def save(self): def save(self):
npy_file = './q_table.npy' '''把 Q表格 的数据保存到文件中
np.save(npy_file, self.Q) '''
npy_file = './Q_table.npy'
np.save(npy_file, self.Q_table)
print(npy_file + ' saved.') print(npy_file + ' saved.')
def load(self, npy_file='./Q_table.npy'):
# 从文件中读取数据到 Q表格 '''从文件中读取数据到 Q表格
def restore(self, npy_file='./q_table.npy'): '''
self.Q = np.load(npy_file) self.Q_table = np.load(npy_file)
print(npy_file + ' loaded.') print(npy_file + 'loaded.')

Binary file not shown.

After

Width:  |  Height:  |  Size: 233 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 58 KiB

155
codes/Q-learning/main.py Normal file
View File

@@ -0,0 +1,155 @@
#!/usr/bin/env python
# coding=utf-8
'''
Author: John
Email: johnjim0816@gmail.com
Date: 2020-09-11 23:03:00
LastEditor: John
LastEditTime: 2020-10-07 21:05:33
Discription:
Environment:
'''
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -*- coding: utf-8 -*-
import gym
from gridworld import CliffWalkingWapper, FrozenLakeWapper
from agent import QLearning
import os
import numpy as np
import argparse
import time
import matplotlib.pyplot as plt
def get_args():
'''训练的模型参数
'''
parser = argparse.ArgumentParser()
parser.add_argument("--gamma", default=0.9,
type=float, help="reward 的衰减率")
parser.add_argument("--epsilon_start", default=0.9,
type=float,help="e-greedy策略中初始epsilon")
parser.add_argument("--epsilon_end", default=0.1, type=float,help="e-greedy策略中的结束epsilon")
parser.add_argument("--epsilon_decay", default=200, type=float,help="e-greedy策略中epsilon的衰减率")
parser.add_argument("--policy_lr", default=0.1, type=float,help="学习率")
parser.add_argument("--max_episodes", default=500, type=int,help="训练的最大episode数目")
config = parser.parse_args()
return config
def train(cfg):
# env = gym.make("FrozenLake-v0", is_slippery=False) # 0 left, 1 down, 2 right, 3 up
# env = FrozenLakeWapper(env)
env = gym.make("CliffWalking-v0") # 0 up, 1 right, 2 down, 3 left
env = CliffWalkingWapper(env)
agent = QLearning(
obs_dim=env.observation_space.n,
action_dim=env.action_space.n,
learning_rate=cfg.policy_lr,
gamma=cfg.gamma,
epsilon_start=cfg.epsilon_start,epsilon_end=cfg.epsilon_end,epsilon_decay=cfg.epsilon_decay)
render = False # 是否打开GUI画面
rewards = [] # 记录所有episode的reward
MA_rewards = [] # 记录滑动平均的reward
steps = []# 记录所有episode的steps
for i_episode in range(1,cfg.max_episodes+1):
ep_reward = 0 # 记录每个episode的reward
ep_steps = 0 # 记录每个episode走了多少step
obs = env.reset() # 重置环境, 重新开一局即开始新的一个episode
while True:
action = agent.sample(obs) # 根据算法选择一个动作
next_obs, reward, done, _ = env.step(action) # 与环境进行一个交互
# 训练 Q-learning算法
agent.learn(obs, action, reward, next_obs, done) # 不需要下一步的action
obs = next_obs # 存储上一个观察值
ep_reward += reward
ep_steps += 1 # 计算step数
if render:
env.render() #渲染新的一帧图形
if done:
break
steps.append(ep_steps)
rewards.append(ep_reward)
# 计算滑动平均的reward
if i_episode == 1:
MA_rewards.append(ep_reward)
else:
MA_rewards.append(
0.9*MA_rewards[-1]+0.1*ep_reward)
print('Episode %s: steps = %s , reward = %.1f, explore = %.2f' % (i_episode, ep_steps,
ep_reward,agent.epsilon))
# 每隔20个episode渲染一下看看效果
if i_episode % 20 == 0:
render = True
else:
render = False
agent.save() # 训练结束,保存模型
output_path = os.path.dirname(__file__)+"/result/"
# 检测是否存在文件夹
if not os.path.exists(output_path):
os.mkdir(output_path)
np.save(output_path+"rewards_train.npy", rewards)
np.save(output_path+"MA_rewards_train.npy", MA_rewards)
np.save(output_path+"steps_train.npy", steps)
def test(cfg):
env = gym.make("CliffWalking-v0") # 0 up, 1 right, 2 down, 3 left
env = CliffWalkingWapper(env)
agent = QLearning(
obs_dim=env.observation_space.n,
action_dim=env.action_space.n,
learning_rate=cfg.policy_lr,
gamma=cfg.gamma,
epsilon_start=cfg.epsilon_start,epsilon_end=cfg.epsilon_end,epsilon_decay=cfg.epsilon_decay)
agent.load() # 导入保存的模型
rewards = [] # 记录所有episode的reward
MA_rewards = [] # 记录滑动平均的reward
steps = []# 记录所有episode的steps
for i_episode in range(1,10+1):
ep_reward = 0 # 记录每个episode的reward
ep_steps = 0 # 记录每个episode走了多少step
obs = env.reset() # 重置环境, 重新开一局即开始新的一个episode
while True:
action = agent.predict(obs) # 根据算法选择一个动作
next_obs, reward, done, _ = env.step(action) # 与环境进行一个交互
obs = next_obs # 存储上一个观察值
time.sleep(0.5)
env.render()
ep_reward += reward
ep_steps += 1 # 计算step数
if done:
break
steps.append(ep_steps)
rewards.append(ep_reward)
# 计算滑动平均的reward
if i_episode == 1:
MA_rewards.append(ep_reward)
else:
MA_rewards.append(
0.9*MA_rewards[-1]+0.1*ep_reward)
print('Episode %s: steps = %s , reward = %.1f' % (i_episode, ep_steps, ep_reward))
plt.plot(MA_rewards)
plt.show()
def main():
cfg = get_args()
# train(cfg)
test(cfg)
if __name__ == "__main__":
main()

35
codes/Q-learning/plot.py Normal file
View File

@@ -0,0 +1,35 @@
#!/usr/bin/env python
# coding=utf-8
'''
Author: John
Email: johnjim0816@gmail.com
Date: 2020-10-07 20:57:11
LastEditor: John
LastEditTime: 2020-10-07 21:00:29
Discription:
Environment:
'''
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import os
def plot(item,ylabel='rewards'):
sns.set()
plt.figure()
plt.plot(np.arange(len(item)), item)
plt.title(ylabel+' of Q-learning')
plt.ylabel(ylabel)
plt.xlabel('episodes')
plt.savefig(os.path.dirname(__file__)+"/result/"+ylabel+".png")
plt.show()
if __name__ == "__main__":
output_path = os.path.dirname(__file__)+"/result/"
rewards=np.load(output_path+"rewards_train.npy", )
MA_rewards=np.load(output_path+"MA_rewards_train.npy")
steps = np.load(output_path+"steps_train.npy")
plot(rewards)
plot(MA_rewards,ylabel='moving_average_rewards')
plot(steps,ylabel='steps')

Binary file not shown.

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 27 KiB

Binary file not shown.

View File

@@ -1,90 +0,0 @@
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -*- coding: utf-8 -*-
import gym
from gridworld import CliffWalkingWapper, FrozenLakeWapper
from agent import QLearningAgent
import time
def run_episode(env, agent, render=False):
total_steps = 0 # 记录每个episode走了多少step
total_reward = 0
obs = env.reset() # 重置环境, 重新开一局即开始新的一个episode
while True:
action = agent.sample(obs) # 根据算法选择一个动作
next_obs, reward, done, _ = env.step(action) # 与环境进行一个交互
# 训练 Q-learning算法
agent.learn(obs, action, reward, next_obs, done) # 不需要下一步的action
obs = next_obs # 存储上一个观察值
total_reward += reward
total_steps += 1 # 计算step数
if render:
env.render() #渲染新的一帧图形
if done:
break
return total_reward, total_steps
def test_episode(env, agent):
total_reward = 0
obs = env.reset()
while True:
action = agent.predict(obs) # greedy
next_obs, reward, done, _ = env.step(action)
total_reward += reward
obs = next_obs
time.sleep(0.5)
env.render()
if done:
print('test reward = %.1f' % (total_reward))
break
def main():
# env = gym.make("FrozenLake-v0", is_slippery=False) # 0 left, 1 down, 2 right, 3 up
# env = FrozenLakeWapper(env)
env = gym.make("CliffWalking-v0") # 0 up, 1 right, 2 down, 3 left
env = CliffWalkingWapper(env)
agent = QLearningAgent(
obs_n=env.observation_space.n,
act_n=env.action_space.n,
learning_rate=0.1,
gamma=0.9,
e_greed=0.1)
is_render = False
for episode in range(500):
ep_reward, ep_steps = run_episode(env, agent, is_render)
print('Episode %s: steps = %s , reward = %.1f' % (episode, ep_steps,
ep_reward))
# 每隔20个episode渲染一下看看效果
if episode % 20 == 0:
is_render = True
else:
is_render = False
# 训练结束,查看算法效果
test_episode(env, agent)
if __name__ == "__main__":
main()

View File

@@ -5,7 +5,7 @@
@Email: johnjim0816@gmail.com @Email: johnjim0816@gmail.com
@Date: 2020-06-12 00:50:49 @Date: 2020-06-12 00:50:49
@LastEditor: John @LastEditor: John
LastEditTime: 2020-08-22 15:44:31 LastEditTime: 2020-10-07 17:32:18
@Discription: @Discription:
@Environment: python 3.7.7 @Environment: python 3.7.7
''' '''
@@ -30,7 +30,7 @@ class DQN:
self.n_actions = n_actions # 总的动作个数 self.n_actions = n_actions # 总的动作个数
self.device = device # 设备cpu或gpu等 self.device = device # 设备cpu或gpu等
self.gamma = gamma self.gamma = gamma
# e-greedy策略相关参数 # e-greedy 策略相关参数
self.epsilon = 0 self.epsilon = 0
self.epsilon_start = epsilon_start self.epsilon_start = epsilon_start
self.epsilon_end = epsilon_end self.epsilon_end = epsilon_end

View File

@@ -5,12 +5,11 @@
@Email: johnjim0816@gmail.com @Email: johnjim0816@gmail.com
@Date: 2020-06-11 16:30:09 @Date: 2020-06-11 16:30:09
@LastEditor: John @LastEditor: John
LastEditTime: 2020-08-20 16:34:34 LastEditTime: 2020-10-07 20:57:22
@Discription: @Discription:
@Environment: python 3.7.7 @Environment: python 3.7.7
''' '''
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns import seaborn as sns
import numpy as np import numpy as np
import os import os

View File

@@ -2,7 +2,7 @@
贪吃蛇是一个起源于1976年的街机游戏 Blockade玩家控制蛇上下左右吃到食物并将身体增长吃到食物后移动速度逐渐加快直到碰到墙体或者蛇的身体算游戏结束。 贪吃蛇是一个起源于1976年的街机游戏 Blockade玩家控制蛇上下左右吃到食物并将身体增长吃到食物后移动速度逐渐加快直到碰到墙体或者蛇的身体算游戏结束。
![image-20200901202636603](img/image-20200901202636603.png) ![image-20200901202636603](assets/image-20200901202636603.png)
如图本次任务整个游戏版面大小为560X560绿色部分就是我们的智能体贪吃蛇红色方块就是食物墙位于四周一旦食物被吃掉会在下一个随机位置刷出新的食物。蛇的每一节以及食物的大小为40X40除开墙体(厚度也为40)蛇可以活动的范围为480X480也就是12X12的栅格。环境的状态等信息如下 如图本次任务整个游戏版面大小为560X560绿色部分就是我们的智能体贪吃蛇红色方块就是食物墙位于四周一旦食物被吃掉会在下一个随机位置刷出新的食物。蛇的每一节以及食物的大小为40X40除开墙体(厚度也为40)蛇可以活动的范围为480X480也就是12X12的栅格。环境的状态等信息如下

View File

Before

Width:  |  Height:  |  Size: 32 KiB

After

Width:  |  Height:  |  Size: 32 KiB

View File

@@ -99,11 +99,13 @@ class SnakeEnv:
self.render = True self.render = True
class Snake: class Snake:
''' 定义贪吃蛇的类
'''
def __init__(self, snake_head_x, snake_head_y, food_x, food_y): def __init__(self, snake_head_x, snake_head_y, food_x, food_y):
self.init_snake_head_x = snake_head_x # 初始化蛇头的位置
self.init_snake_head_y = snake_head_y self.init_snake_head_x, self.init_snake_head_y = snake_head_x, snake_head_y
self.init_food_x = food_x # 初始化食物的位置
self.init_food_y = food_y self.init_food_x, self.init_food_y = food_x, food_y
self.reset() self.reset()
def reset(self): def reset(self):

100
docs/chapter3/README.md Normal file
View File

@@ -0,0 +1,100 @@
# 使用Q-learning解决悬崖寻路问题
## CliffWalking-v0环境简介
悬崖寻路问题CliffWalking是指在一个4 x 12的网格中智能体以网格的左下角位置为起点以网格的下角位置为终点目标是移动智能体到达终点位置智能体每次可以在上、下、左、右这4个方向中移动一步每移动一步会得到-1单位的奖励。
<img src="../../codes/Q-learning/assets/image-20201007211441036.png" alt="image-20201007211441036" style="zoom:50%;" />
如图红色部分表示悬崖数字代表智能体能够观测到的位置信息即observation总共会有0-47等48个不同的值智能体再移动中会有以下限制
* 智能体不能移出网格,如果智能体想执行某个动作移出网格,那么这一步智能体不会移动,但是这个操作依然会得到-1单位的奖励
* 如果智能体“掉入悬崖” ,会立即回到起点位置,并得到-100单位的奖励
* 当智能体移动到终点时,该回合结束,该回合总奖励为各步奖励之和
实际的仿真界面如下:
<img src="../../codes/Q-learning/assets/image-20201007211858925.png" alt="image-20201007211858925" style="zoom:50%;" />
**由于从起点到终点最少需要13步每步得到-1的reward因此最佳训练算法下每个episode下reward总和应该为-13**
## RL基本训练接口
```python
env = gym.make("CliffWalking-v0") # 0 up, 1 right, 2 down, 3 left
env = CliffWalkingWapper(env)
agent = QLearning(
obs_dim=env.observation_space.n,
action_dim=env.action_space.n,
learning_rate=cfg.policy_lr,
gamma=cfg.gamma,
epsilon_start=cfg.epsilon_start,epsilon_end=cfg.epsilon_end,epsilon_decay=cfg.epsilon_decay)
render = False # 是否打开GUI画面
rewards = [] # 记录所有episode的reward
MA_rewards = [] # 记录滑动平均的reward
steps = []# 记录所有episode的steps
for i_episode in range(1,cfg.max_episodes+1):
ep_reward = 0 # 记录每个episode的reward
ep_steps = 0 # 记录每个episode走了多少step
obs = env.reset() # 重置环境, 重新开一局即开始新的一个episode
while True:
action = agent.sample(obs) # 根据算法选择一个动作
next_obs, reward, done, _ = env.step(action) # 与环境进行一个交互
# 训练 Q-learning算法
agent.learn(obs, action, reward, next_obs, done) # 不需要下一步的action
obs = next_obs # 存储上一个观察值
ep_reward += reward
ep_steps += 1 # 计算step数
if render:
env.render() #渲染新的一帧图形
if done:
break
steps.append(ep_steps)
rewards.append(ep_reward)
# 计算滑动平均的reward
if i_episode == 1:
MA_rewards.append(ep_reward)
else:
MA_rewards.append(
0.9*MA_rewards[-1]+0.1*ep_reward)
print('Episode %s: steps = %s , reward = %.1f, explore = %.2f' % (i_episode, ep_steps,
ep_reward,agent.epsilon))
# 每隔20个episode渲染一下看看效果
if i_episode % 20 == 0:
render = True
else:
render = False
agent.save() # 训练结束,保存模型
```
## 任务要求
训练并绘制reward以及滑动平均后的reward随epiosde的变化曲线图并记录超参数写成报告图示如下
![rewards](assets/rewards.png)
![moving_average_rewards](assets/moving_average_rewards.png)
### 代码清单
**main.py**保存强化学习基本接口以及相应的超参数可使用argparse
**model.py**:保存神经网络,比如全链接网络
**agent.py**: 保存算法模型主要包含predict(预测动作)和learn两个函数
**plot.py**:保存相关绘制函数
## 备注
* 注意 e-greedy 策略的使用以及相应的参数epsilon如何衰减
* 训练模型和测试模型的时候选择动作有一些不同训练时采取e-greedy策略而测试时直接选取Q值最大对应的动作所以算法在动作选择的时候会包括sample(训练时的动作采样)和predict(测试时的动作选择)
* Q值最大对应的动作可能不止一个此时可以随机选择一个输出结果

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 28 KiB

0
main.py Normal file
View File