Files
easy-rl/notebooks/DoubleDQN.ipynb
2023-01-14 15:24:43 +08:00

511 lines
132 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1、定义算法\n",
"\n",
"Double DQN除了在更新时对期望Q值的近似方式与DQN不同之外其他都是相同的\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 1.1、定义模型\n"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import torch.nn as nn\n",
"import torch.nn.functional as F\n",
"class MLP(nn.Module):\n",
" def __init__(self, n_states,n_actions,hidden_dim=128):\n",
" \"\"\" 初始化q网络为全连接网络\n",
" \"\"\"\n",
" super(MLP, self).__init__()\n",
" self.fc1 = nn.Linear(n_states, hidden_dim) # 输入层\n",
" self.fc2 = nn.Linear(hidden_dim,hidden_dim) # 隐藏层\n",
" self.fc3 = nn.Linear(hidden_dim, n_actions) # 输出层\n",
" \n",
" def forward(self, x):\n",
" # 各层对应的激活函数\n",
" x = F.relu(self.fc1(x)) \n",
" x = F.relu(self.fc2(x))\n",
" return self.fc3(x)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 1.2、定义经验回放\n",
"\n",
"经验回放首先是具有一定容量的只有存储一定的transition网络才会更新否则就退回到了之前的逐步更新了。另外写经验回放的时候一般需要包涵两个功能或方法一个是push即将一个transition样本按顺序放到经验回放中如果满了就把最开始放进去的样本挤掉因此如果大家学过数据结构的话推荐用队列来写虽然这里不是。另外一个是sample很简单就是随机采样出一个或者若干个具体多少就是batch_size了样本供DQN网络更新。功能讲清楚了大家可以按照自己的想法用代码来实现参考如下。"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"from collections import deque\n",
"import random\n",
"class ReplayBuffer(object):\n",
" def __init__(self, capacity: int) -> None:\n",
" self.capacity = capacity\n",
" self.buffer = deque(maxlen=self.capacity)\n",
" def push(self,transitions):\n",
" ''' 存储transition到经验回放中\n",
" '''\n",
" self.buffer.append(transitions)\n",
" def sample(self, batch_size: int, sequential: bool = False):\n",
" if batch_size > len(self.buffer): # 如果批量大小大于经验回放的容量,则取经验回放的容量\n",
" batch_size = len(self.buffer)\n",
" if sequential: # 顺序采样\n",
" rand = random.randint(0, len(self.buffer) - batch_size)\n",
" batch = [self.buffer[i] for i in range(rand, rand + batch_size)]\n",
" return zip(*batch)\n",
" else: # 随机采样\n",
" batch = random.sample(self.buffer, batch_size)\n",
" return zip(*batch)\n",
" def clear(self):\n",
" ''' 清空经验回放\n",
" '''\n",
" self.buffer.clear()\n",
" def __len__(self):\n",
" ''' 返回当前存储的量\n",
" '''\n",
" return len(self.buffer)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 1.3、真定义算法\n",
"\n",
"跟DQN算法几乎一模一样"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"import torch\n",
"import torch.optim as optim\n",
"import math\n",
"import numpy as np\n",
"class DoubleDQN(object):\n",
" def __init__(self,cfg):\n",
" self.states = cfg.n_states\n",
" self.n_actions = cfg.n_actions \n",
" self.device = torch.device(cfg.device) \n",
" self.gamma = cfg.gamma # 折扣因子\n",
" # e-greedy策略相关参数\n",
" self.sample_count = 0 # 用于epsilon的衰减计数\n",
" self.epsilon = cfg.epsilon_start\n",
" self.sample_count = 0 \n",
" self.epsilon_start = cfg.epsilon_start\n",
" self.epsilon_end = cfg.epsilon_end\n",
" self.epsilon_decay = cfg.epsilon_decay\n",
" self.batch_size = cfg.batch_size\n",
" self.target_update = cfg.target_update\n",
" self.policy_net = MLP(cfg.n_states,cfg.n_actions,hidden_dim=cfg.hidden_dim).to(self.device)\n",
" self.target_net = MLP(cfg.n_states,cfg.n_actions,hidden_dim=cfg.hidden_dim).to(self.device)\n",
" # 复制参数到目标网络\n",
" for target_param, param in zip(self.target_net.parameters(),self.policy_net.parameters()): \n",
" target_param.data.copy_(param.data)\n",
" # self.target_net.load_state_dict(self.policy_net.state_dict()) # or use this to copy parameters\n",
" self.optimizer = optim.Adam(self.policy_net.parameters(), lr=cfg.lr) # 优化器\n",
" self.memory = ReplayBuffer(cfg.buffer_size) # 经验回放\n",
" self.update_flag = False \n",
"\n",
" def sample_action(self, state):\n",
" ''' 采样动作\n",
" '''\n",
" self.sample_count += 1\n",
" # epsilon指数衰减\n",
" self.epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \\\n",
" math.exp(-1. * self.sample_count / self.epsilon_decay) \n",
" if random.random() > self.epsilon:\n",
" with torch.no_grad():\n",
" state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)\n",
" q_values = self.policy_net(state)\n",
" action = q_values.max(1)[1].item() # choose action corresponding to the maximum q value\n",
" else:\n",
" action = random.randrange(self.n_actions)\n",
" return action\n",
" @torch.no_grad() # 不计算梯度该装饰器效果等同于with torch.no_grad()\n",
" def predict_action(self, state):\n",
" ''' 预测动作\n",
" '''\n",
" state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)\n",
" q_values = self.policy_net(state)\n",
" action = q_values.max(1)[1].item() # choose action corresponding to the maximum q value\n",
" return action\n",
" def update(self):\n",
" if len(self.memory) < self.batch_size: # 当经验回放中不满足一个批量时,不更新策略\n",
" return\n",
" else:\n",
" if not self.update_flag:\n",
" print(\"开始更新策略!\")\n",
" self.update_flag = True\n",
" # 从经验回放中随机采样一个批量的转移(transition)\n",
" state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample(\n",
" self.batch_size)\n",
" # 将数据转换为tensor\n",
" state_batch = torch.tensor(np.array(state_batch), device=self.device, dtype=torch.float)\n",
" action_batch = torch.tensor(action_batch, device=self.device).unsqueeze(1) \n",
" reward_batch = torch.tensor(reward_batch, device=self.device, dtype=torch.float).unsqueeze(1) \n",
" next_state_batch = torch.tensor(np.array(next_state_batch), device=self.device, dtype=torch.float)\n",
" done_batch = torch.tensor(np.float32(done_batch), device=self.device).unsqueeze(1)\n",
" q_value_batch = self.policy_net(state_batch).gather(dim=1, index=action_batch) # 实际的Q值\n",
" next_q_value_batch = self.policy_net(next_state_batch) # 下一个状态对应的实际策略网络Q值\n",
" next_target_value_batch = self.target_net(next_state_batch) # 下一个状态对应的目标网络Q值\n",
" # 将策略网络Q值最大的动作对应的目标网络Q值作为期望的Q值\n",
" next_target_q_value_batch = next_target_value_batch.gather(1, torch.max(next_q_value_batch, 1)[1].unsqueeze(1))\n",
" expected_q_value_batch = reward_batch + self.gamma * next_target_q_value_batch* (1-done_batch) # 期望的Q值\n",
" # 计算损失\n",
" loss = nn.MSELoss()(q_value_batch, expected_q_value_batch)\n",
" # 优化更新模型\n",
" self.optimizer.zero_grad() \n",
" loss.backward()\n",
" # clip防止梯度爆炸\n",
" for param in self.policy_net.parameters(): \n",
" param.grad.data.clamp_(-1, 1)\n",
" self.optimizer.step() \n",
" if self.sample_count % self.target_update == 0: # 每隔一段时间,将策略网络的参数复制到目标网络\n",
" self.target_net.load_state_dict(self.policy_net.state_dict()) \n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2、定义训练"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"def train(cfg, env, agent):\n",
" ''' 训练\n",
" '''\n",
" print(\"开始训练!\")\n",
" rewards = [] # 记录所有回合的奖励\n",
" steps = []\n",
" for i_ep in range(cfg.train_eps):\n",
" ep_reward = 0 # 记录一回合内的奖励\n",
" ep_step = 0\n",
" state = env.reset() # 重置环境,返回初始状态\n",
" for _ in range(cfg.max_steps):\n",
" ep_step += 1\n",
" action = agent.sample_action(state) # 选择动作\n",
" next_state, reward, done, _ = env.step(action) # 更新环境返回transition\n",
" agent.memory.push((state, action, reward,next_state, done)) # 保存transition\n",
" state = next_state # 更新下一个状态\n",
" agent.update() # 更新智能体\n",
" ep_reward += reward # 累加奖励\n",
" if done:\n",
" break\n",
" steps.append(ep_step)\n",
" rewards.append(ep_reward)\n",
" if (i_ep + 1) % 10 == 0:\n",
" print(f\"回合:{i_ep+1}/{cfg.train_eps},奖励:{ep_reward:.2f}Epislon{agent.epsilon:.3f}\")\n",
" print(\"完成训练!\")\n",
" env.close()\n",
" return {'rewards':rewards}\n",
"\n",
"def test(cfg, env, agent):\n",
" print(\"开始测试!\")\n",
" rewards = [] # 记录所有回合的奖励\n",
" steps = []\n",
" for i_ep in range(cfg.test_eps):\n",
" ep_reward = 0 # 记录一回合内的奖励\n",
" state = env.reset() # 重置环境,返回初始状态\n",
" for _ in range(cfg.max_steps):\n",
" action = agent.predict_action(state) # 选择动作\n",
" next_state, reward, done, _ = env.step(action) # 更新环境返回transition\n",
" state = next_state # 更新下一个状态\n",
" ep_reward += reward # 累加奖励\n",
" if done:\n",
" break\n",
" rewards.append(ep_reward)\n",
" print(f\"回合:{i_ep+1}/{cfg.test_eps},奖励:{ep_reward:.2f}\")\n",
" print(\"完成测试\")\n",
" env.close()\n",
" return {'rewards':rewards}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3. 定义环境"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"import gym\n",
"import os\n",
"def all_seed(env,seed = 1):\n",
" ''' 万能的seed函数\n",
" '''\n",
" env.seed(seed) # env config\n",
" np.random.seed(seed)\n",
" random.seed(seed)\n",
" torch.manual_seed(seed) # config for CPU\n",
" torch.cuda.manual_seed(seed) # config for GPU\n",
" os.environ['PYTHONHASHSEED'] = str(seed) # config for python scripts\n",
" # config for cudnn\n",
" torch.backends.cudnn.deterministic = True\n",
" torch.backends.cudnn.benchmark = False\n",
" torch.backends.cudnn.enabled = False\n",
"def env_agent_config(cfg):\n",
" env = gym.make(cfg.env_name) # 创建环境\n",
" all_seed(env,seed=cfg.seed)\n",
" n_states = env.observation_space.shape[0]\n",
" n_actions = env.action_space.n\n",
" print(f\"状态空间维度:{n_states},动作空间维度:{n_actions}\")\n",
" # 更新n_states和n_actions到cfg参数中\n",
" setattr(cfg, 'n_states', n_states)\n",
" setattr(cfg, 'n_actions', n_actions) \n",
" agent = DoubleDQN(cfg)\n",
" return env,agent"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4、设置参数"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n",
"import seaborn as sns\n",
"class Config:\n",
" def __init__(self):\n",
" self.algo_name = 'DoubleDQN' # 算法名称\n",
" self.env_name = 'CartPole-v1' # 环境名称\n",
" self.seed = 1 # 随机种子\n",
" self.train_eps = 100 # 训练回合数\n",
" self.test_eps = 10 # 测试回合数\n",
" self.max_steps = 200 # 每回合最大步数\n",
" self.gamma = 0.99 # 折扣因子\n",
" self.lr = 0.0001 # 学习率\n",
" self.epsilon_start = 0.95 # epsilon初始值\n",
" self.epsilon_end = 0.01 # epsilon最终值\n",
" self.epsilon_decay = 500 # epsilon衰减率\n",
" self.buffer_size = 10000 # ReplayBuffer容量\n",
" self.batch_size = 64 # ReplayBuffer中批次大小\n",
" self.target_update = 4 # 目标网络更新频率\n",
" self.hidden_dim = 256 # 神经网络隐藏层维度\n",
" if torch.cuda.is_available(): # 是否使用GPUs\n",
" self.device = 'cuda'\n",
" else:\n",
" self.device = 'cpu'\n",
"def smooth(data, weight=0.9): \n",
" '''用于平滑曲线类似于Tensorboard中的smooth曲线\n",
" '''\n",
" last = data[0] \n",
" smoothed = []\n",
" for point in data:\n",
" smoothed_val = last * weight + (1 - weight) * point # 计算平滑值\n",
" smoothed.append(smoothed_val) \n",
" last = smoothed_val \n",
" return smoothed\n",
"\n",
"def plot_rewards(rewards,title=\"learning curve\"):\n",
" sns.set()\n",
" plt.figure() # 创建一个图形实例,方便同时多画几个图\n",
" plt.title(f\"{title}\")\n",
" plt.xlim(0, len(rewards), 10) # 设置x轴的范围\n",
" plt.xlabel('epsiodes')\n",
" plt.plot(rewards, label='rewards')\n",
" plt.plot(smooth(rewards), label='smoothed')\n",
" plt.legend()\n",
"\n",
"def print_cfgs(cfg):\n",
" ''' 打印参数\n",
" '''\n",
" cfg_dict = vars(cfg)\n",
" print(\"Hyperparameters:\")\n",
" print(''.join(['=']*80))\n",
" tplt = \"{:^20}\\t{:^20}\\t{:^20}\"\n",
" print(tplt.format(\"Name\", \"Value\", \"Type\"))\n",
" for k,v in cfg_dict.items():\n",
" if v.__class__.__name__ == 'list':\n",
" v = str(v)\n",
" print(tplt.format(k,v,str(type(v)))) \n",
" print(''.join(['=']*80))\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 5、开始训练"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Hyperparameters:\n",
"================================================================================\n",
" Name \t Value \t Type \n",
" algo_name \t DoubleDQN \t <class 'str'> \n",
" env_name \t CartPole-v1 \t <class 'str'> \n",
" seed \t 1 \t <class 'int'> \n",
" train_eps \t 100 \t <class 'int'> \n",
" test_eps \t 10 \t <class 'int'> \n",
" max_steps \t 200 \t <class 'int'> \n",
" gamma \t 0.99 \t <class 'float'> \n",
" lr \t 0.0001 \t <class 'float'> \n",
" epsilon_start \t 0.95 \t <class 'float'> \n",
" epsilon_end \t 0.01 \t <class 'float'> \n",
" epsilon_decay \t 500 \t <class 'int'> \n",
" buffer_size \t 10000 \t <class 'int'> \n",
" batch_size \t 64 \t <class 'int'> \n",
" target_update \t 4 \t <class 'int'> \n",
" hidden_dim \t 256 \t <class 'int'> \n",
" device \t cuda \t <class 'str'> \n",
"================================================================================\n",
"状态空间维度4动作空间维度2\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"c:\\Users\\24438\\anaconda3\\envs\\easyrl\\lib\\site-packages\\gym\\core.py:318: DeprecationWarning: \u001b[33mWARN: Initializing wrapper in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future.\u001b[0m\n",
" \"Initializing wrapper in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future.\"\n",
"c:\\Users\\24438\\anaconda3\\envs\\easyrl\\lib\\site-packages\\gym\\wrappers\\step_api_compatibility.py:40: DeprecationWarning: \u001b[33mWARN: Initializing environment in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future.\u001b[0m\n",
" \"Initializing environment in old step API which returns one bool instead of two. It is recommended to set `new_step_api=True` to use new step API. This will be the default behaviour in future.\"\n",
"c:\\Users\\24438\\anaconda3\\envs\\easyrl\\lib\\site-packages\\gym\\core.py:257: DeprecationWarning: \u001b[33mWARN: Function `env.seed(seed)` is marked as deprecated and will be removed in the future. Please use `env.reset(seed=seed)` instead.\u001b[0m\n",
" \"Function `env.seed(seed)` is marked as deprecated and will be removed in the future. \"\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"开始训练!\n",
"开始更新策略!\n",
"回合10/100奖励24.00Epislon0.663\n",
"回合20/100奖励10.00Epislon0.508\n",
"回合30/100奖励10.00Epislon0.395\n",
"回合40/100奖励10.00Epislon0.308\n",
"回合50/100奖励46.00Epislon0.222\n",
"回合60/100奖励98.00Epislon0.061\n",
"回合70/100奖励61.00Epislon0.023\n",
"回合80/100奖励200.00Epislon0.011\n",
"回合90/100奖励80.00Epislon0.010\n",
"回合100/100奖励177.00Epislon0.010\n",
"完成训练!\n",
"开始测试!\n",
"回合1/10奖励200.00\n",
"回合2/10奖励200.00\n",
"回合3/10奖励193.00\n",
"回合4/10奖励200.00\n",
"回合5/10奖励200.00\n",
"回合6/10奖励200.00\n",
"回合7/10奖励200.00\n",
"回合8/10奖励200.00\n",
"回合9/10奖励200.00\n",
"回合10/10奖励200.00\n",
"完成测试\n"
]
},
{
"data": {
"image/png": "",
"text/plain": [
"<Figure size 640x480 with 1 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"image/png": "",
"text/plain": [
"<Figure size 640x480 with 1 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"# 获取参数\n",
"cfg = Config() \n",
"print_cfgs(cfg)\n",
"# 训练\n",
"env, agent = env_agent_config(cfg)\n",
"res_dic = train(cfg, env, agent)\n",
" \n",
"plot_rewards(res_dic['rewards'], title=f\"training curve on {cfg.device} of {cfg.algo_name} for {cfg.env_name}\") \n",
"# 测试\n",
"res_dic = test(cfg, env, agent)\n",
"plot_rewards(res_dic['rewards'], title=f\"testing curve on {cfg.device} of {cfg.algo_name} for {cfg.env_name}\") # 画出结果"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.7.12 ('easyrl')",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.12"
},
"orig_nbformat": 4,
"vscode": {
"interpreter": {
"hash": "f5a9629e9f3b9957bf68a43815f911e93447d47b3d065b6a8a04975e44c504d9"
}
}
},
"nbformat": 4,
"nbformat_minor": 2
}