Files
easy-rl/notebooks/DuelingDQN.ipynb
2022-12-04 20:54:36 +08:00

483 lines
112 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1、定义算法\n",
"\n",
"DQN等算法中使用的是一个简单的三层神经网络一个输入层一个隐藏层和一个输出层。如下左图\n",
"\n",
"<img src=\"figs/duelingdqn_model.png\" alt=\"image-20211112022028670\" style=\"zoom:50%;\" />\n",
"\n",
"而在Dueling DQN中我们在后面加了两个子网络结构分别对应上面上到价格函数网络部分和优势函数网络部分。对应上面右图所示。最终Q网络的输出由价格函数网络的输出和优势函数网络的输出线性组合得到。\n",
"\n",
"我们可以直接使用上一节的价值函数的组合公式得到我们的动作价值,但是这个式子无法辨识最终输出里面$V(S, w, \\alpha)$和$A(S, A, w, \\beta)$各自的作用,为了可以体现这种可辨识性(identifiability),实际使用的组合公式如下:\n",
"\n",
"$$\n",
"Q(S, A, w, \\alpha, \\beta)=V(S, w, \\alpha)+\\left(A(S, A, w, \\beta)-\\frac{1}{\\mathcal{A}} \\sum_{a^{\\prime} \\in \\mathcal{A}} A\\left(S, a^{\\prime}, w, \\beta\\right)\\right)\n",
"$$"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 1.1、定义模型\n"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [],
"source": [
"import torch.nn as nn\n",
"import torch.nn.functional as F\n",
"class DuelingNet(nn.Module):\n",
" def __init__(self, n_states, n_actions,hidden_dim=128):\n",
" super(DuelingNet, self).__init__()\n",
" \n",
" # hidden layer\n",
" self.hidden_layer = nn.Sequential(\n",
" nn.Linear(n_states, hidden_dim),\n",
" nn.ReLU()\n",
" )\n",
" \n",
" # advantage\n",
" self.advantage_layer = nn.Sequential(\n",
" nn.Linear(hidden_dim, hidden_dim),\n",
" nn.ReLU(),\n",
" nn.Linear(hidden_dim, n_actions)\n",
" )\n",
" \n",
" # value\n",
" self.value_layer = nn.Sequential(\n",
" nn.Linear(hidden_dim, hidden_dim),\n",
" nn.ReLU(),\n",
" nn.Linear(hidden_dim, 1)\n",
" )\n",
" \n",
" def forward(self, state):\n",
" x = self.hidden_layer(state)\n",
" advantage = self.advantage_layer(x)\n",
" value = self.value_layer(x)\n",
" return value + advantage - advantage.mean()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 1.2、定义经验回放\n",
"\n",
"经验回放首先是具有一定容量的只有存储一定的transition网络才会更新否则就退回到了之前的逐步更新了。另外写经验回放的时候一般需要包涵两个功能或方法一个是push即将一个transition样本按顺序放到经验回放中如果满了就把最开始放进去的样本挤掉因此如果大家学过数据结构的话推荐用队列来写虽然这里不是。另外一个是sample很简单就是随机采样出一个或者若干个具体多少就是batch_size了样本供DQN网络更新。功能讲清楚了大家可以按照自己的想法用代码来实现参考如下。"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [],
"source": [
"from collections import deque\n",
"import random\n",
"class ReplayBuffer(object):\n",
" def __init__(self, capacity: int) -> None:\n",
" self.capacity = capacity\n",
" self.buffer = deque(maxlen=self.capacity)\n",
" def push(self,transitions):\n",
" ''' 存储transition到经验回放中\n",
" '''\n",
" self.buffer.append(transitions)\n",
" def sample(self, batch_size: int, sequential: bool = False):\n",
" if batch_size > len(self.buffer): # 如果批量大小大于经验回放的容量,则取经验回放的容量\n",
" batch_size = len(self.buffer)\n",
" if sequential: # 顺序采样\n",
" rand = random.randint(0, len(self.buffer) - batch_size)\n",
" batch = [self.buffer[i] for i in range(rand, rand + batch_size)]\n",
" return zip(*batch)\n",
" else: # 随机采样\n",
" batch = random.sample(self.buffer, batch_size)\n",
" return zip(*batch)\n",
" def clear(self):\n",
" ''' 清空经验回放\n",
" '''\n",
" self.buffer.clear()\n",
" def __len__(self):\n",
" ''' 返回当前存储的量\n",
" '''\n",
" return len(self.buffer)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 1.3、真定义算法\n",
"\n",
"跟DQN算法几乎一模一样"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [],
"source": [
"import torch\n",
"import torch.optim as optim\n",
"import math\n",
"import numpy as np\n",
"class DuelingDQN:\n",
" def __init__(self,model,memory,cfg):\n",
" self.n_actions = cfg.n_actions \n",
" self.device = torch.device(cfg.device) \n",
" self.gamma = cfg.gamma # 折扣因子\n",
" # e-greedy策略相关参数\n",
" self.sample_count = 0 # 用于epsilon的衰减计数\n",
" self.epsilon = cfg.epsilon_start\n",
" self.sample_count = 0 \n",
" self.epsilon_start = cfg.epsilon_start\n",
" self.epsilon_end = cfg.epsilon_end\n",
" self.epsilon_decay = cfg.epsilon_decay\n",
" self.batch_size = cfg.batch_size\n",
" self.target_update = cfg.target_update\n",
" self.policy_net = model.to(self.device)\n",
" self.target_net = model.to(self.device)\n",
" # 复制参数到目标网络\n",
" for target_param, param in zip(self.target_net.parameters(),self.policy_net.parameters()): \n",
" target_param.data.copy_(param.data)\n",
" # self.target_net.load_state_dict(self.policy_net.state_dict()) # or use this to copy parameters\n",
" self.optimizer = optim.Adam(self.policy_net.parameters(), lr=cfg.lr) # 优化器\n",
" self.memory = memory # 经验回放\n",
" self.update_flag = False \n",
"\n",
" def sample_action(self, state):\n",
" ''' 采样动作\n",
" '''\n",
" self.sample_count += 1\n",
" # epsilon指数衰减\n",
" self.epsilon = self.epsilon_end + (self.epsilon_start - self.epsilon_end) * \\\n",
" math.exp(-1. * self.sample_count / self.epsilon_decay) \n",
" if random.random() > self.epsilon:\n",
" with torch.no_grad():\n",
" state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)\n",
" q_values = self.policy_net(state)\n",
" action = q_values.max(1)[1].item() # choose action corresponding to the maximum q value\n",
" else:\n",
" action = random.randrange(self.n_actions)\n",
" return action\n",
" @torch.no_grad() # 不计算梯度该装饰器效果等同于with torch.no_grad()\n",
" def predict_action(self, state):\n",
" ''' 预测动作\n",
" '''\n",
" state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)\n",
" q_values = self.policy_net(state)\n",
" action = q_values.max(1)[1].item() # choose action corresponding to the maximum q value\n",
" return action\n",
" def update(self):\n",
" if len(self.memory) < self.batch_size: # 当经验回放中不满足一个批量时,不更新策略\n",
" return\n",
" else:\n",
" if not self.update_flag:\n",
" print(\"开始更新策略!\")\n",
" self.update_flag = True\n",
" # 从经验回放中随机采样一个批量的转移(transition)\n",
" state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample(\n",
" self.batch_size)\n",
" # 将数据转换为tensor\n",
" state_batch = torch.tensor(np.array(state_batch), device=self.device, dtype=torch.float)\n",
" action_batch = torch.tensor(action_batch, device=self.device).unsqueeze(1) \n",
" reward_batch = torch.tensor(reward_batch, device=self.device, dtype=torch.float) \n",
" next_state_batch = torch.tensor(np.array(next_state_batch), device=self.device, dtype=torch.float)\n",
" done_batch = torch.tensor(np.float32(done_batch), device=self.device).unsqueeze(1)\n",
" q_value_batch = self.policy_net(state_batch).gather(dim=1, index=action_batch) # 实际的Q值\n",
" # 计算目标Q值\n",
" next_max_q_value_batch = self.target_net(next_state_batch).max(1)[0].detach().unsqueeze(1) # 最大的Q值\n",
" expected_q_value_batch = reward_batch + self.gamma * next_max_q_value_batch* (1-done_batch) # 期望的Q值\n",
" # 计算损失\n",
" loss = nn.MSELoss()(q_value_batch, expected_q_value_batch)\n",
" # 优化更新模型\n",
" self.optimizer.zero_grad() \n",
" loss.backward()\n",
" # clip防止梯度爆炸\n",
" for param in self.policy_net.parameters(): \n",
" param.grad.data.clamp_(-1, 1)\n",
" self.optimizer.step() \n",
" if self.sample_count % self.target_update == 0: # 每隔一段时间,将策略网络的参数复制到目标网络\n",
" self.target_net.load_state_dict(self.policy_net.state_dict()) \n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2、定义训练"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [],
"source": [
"def train(cfg, env, agent):\n",
" ''' 训练\n",
" '''\n",
" print(\"开始训练!\")\n",
" rewards = [] # 记录所有回合的奖励\n",
" steps = []\n",
" for i_ep in range(cfg.train_eps):\n",
" ep_reward = 0 # 记录一回合内的奖励\n",
" ep_step = 0\n",
" state = env.reset() # 重置环境,返回初始状态\n",
" for _ in range(cfg.max_steps):\n",
" ep_step += 1\n",
" action = agent.sample_action(state) # 选择动作\n",
" next_state, reward, done, _ = env.step(action) # 更新环境返回transition\n",
" agent.memory.push((state, action, reward,next_state, done)) # 保存transition\n",
" state = next_state # 更新下一个状态\n",
" agent.update() # 更新智能体\n",
" ep_reward += reward # 累加奖励\n",
" if done:\n",
" break\n",
" steps.append(ep_step)\n",
" rewards.append(ep_reward)\n",
" if (i_ep + 1) % 10 == 0:\n",
" print(f\"回合:{i_ep+1}/{cfg.train_eps},奖励:{ep_reward:.2f}Epislon{agent.epsilon:.3f}\")\n",
" print(\"完成训练!\")\n",
" env.close()\n",
" return {'rewards':rewards}\n",
"\n",
"def test(cfg, env, agent):\n",
" print(\"开始测试!\")\n",
" rewards = [] # 记录所有回合的奖励\n",
" steps = []\n",
" for i_ep in range(cfg.test_eps):\n",
" ep_reward = 0 # 记录一回合内的奖励\n",
" state = env.reset() # 重置环境,返回初始状态\n",
" for _ in range(cfg.max_steps):\n",
" action = agent.predict_action(state) # 选择动作\n",
" next_state, reward, done, _ = env.step(action) # 更新环境返回transition\n",
" state = next_state # 更新下一个状态\n",
" ep_reward += reward # 累加奖励\n",
" if done:\n",
" break\n",
" rewards.append(ep_reward)\n",
" print(f\"回合:{i_ep+1}/{cfg.test_eps},奖励:{ep_reward:.2f}\")\n",
" print(\"完成测试\")\n",
" env.close()\n",
" return {'rewards':rewards}"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 3. 定义环境"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [],
"source": [
"import gym\n",
"import os\n",
"def all_seed(env,seed = 1):\n",
" ''' 万能的seed函数\n",
" '''\n",
" env.seed(seed) # env config\n",
" np.random.seed(seed)\n",
" random.seed(seed)\n",
" torch.manual_seed(seed) # config for CPU\n",
" torch.cuda.manual_seed(seed) # config for GPU\n",
" os.environ['PYTHONHASHSEED'] = str(seed) # config for python scripts\n",
" # config for cudnn\n",
" torch.backends.cudnn.deterministic = True\n",
" torch.backends.cudnn.benchmark = False\n",
" torch.backends.cudnn.enabled = False\n",
"def env_agent_config(cfg):\n",
" env = gym.make(cfg.env_name) # 创建环境\n",
" all_seed(env,seed=cfg.seed)\n",
" n_states = env.observation_space.shape[0]\n",
" n_actions = env.action_space.n\n",
" print(f\"状态空间维度:{n_states},动作空间维度:{n_actions}\")\n",
" # 更新n_states和n_actions到cfg参数中\n",
" setattr(cfg, 'n_states', n_states)\n",
" setattr(cfg, 'n_actions', n_actions) \n",
" model = DuelingNet(n_states, n_actions, hidden_dim = cfg.hidden_dim) # 创建模型\n",
" memory = ReplayBuffer(cfg.memory_capacity) # 创建经验池\n",
" agent = DuelingDQN(model,memory,cfg)\n",
" return env,agent"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 4、设置参数"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [],
"source": [
"import argparse\n",
"import matplotlib.pyplot as plt\n",
"import seaborn as sns\n",
"class Config:\n",
" def __init__(self):\n",
" self.algo_name = 'DuelingDQN' # 算法名称\n",
" self.env_name = 'CartPole-v1' # 环境名称\n",
" self.seed = 1 # 随机种子\n",
" self.train_eps = 100 # 训练回合数\n",
" self.test_eps = 10 # 测试回合数\n",
" self.max_steps = 200 # 每回合最大步数\n",
" self.gamma = 0.95 # 折扣因子\n",
" self.lr = 0.0001 # 学习率\n",
" self.epsilon_start = 0.95 # epsilon初始值\n",
" self.epsilon_end = 0.01 # epsilon最终值\n",
" self.epsilon_decay = 500 # epsilon衰减率\n",
" self.memory_capacity = 10000 # ReplayBuffer容量\n",
" self.batch_size = 64 # ReplayBuffer中批次大小\n",
" self.target_update = 800 # 目标网络更新频率\n",
" self.hidden_dim = 256 # 神经网络隐藏层维度\n",
" if torch.cuda.is_available(): # 是否使用GPUs\n",
" self.device = torch.device('cuda')\n",
" else:\n",
" self.device = torch.device('cpu')\n",
"def smooth(data, weight=0.9): \n",
" '''用于平滑曲线类似于Tensorboard中的smooth曲线\n",
" '''\n",
" last = data[0] \n",
" smoothed = []\n",
" for point in data:\n",
" smoothed_val = last * weight + (1 - weight) * point # 计算平滑值\n",
" smoothed.append(smoothed_val) \n",
" last = smoothed_val \n",
" return smoothed\n",
"\n",
"def plot_rewards(rewards,title=\"learning curve\"):\n",
" sns.set()\n",
" plt.figure() # 创建一个图形实例,方便同时多画几个图\n",
" plt.title(f\"{title}\")\n",
" plt.xlim(0, len(rewards), 10) # 设置x轴的范围\n",
" plt.xlabel('epsiodes')\n",
" plt.plot(rewards, label='rewards')\n",
" plt.plot(smooth(rewards), label='smoothed')\n",
" plt.legend()\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 5、开始训练"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"状态空间维度4动作空间维度2\n",
"开始训练!\n",
"开始更新策略!\n",
"回合10/100奖励24.00Epislon0.663\n",
"回合20/100奖励11.00Epislon0.507\n",
"回合30/100奖励10.00Epislon0.385\n",
"回合40/100奖励61.00Epislon0.187\n",
"回合50/100奖励200.00Epislon0.055\n",
"回合60/100奖励200.00Epislon0.011\n",
"回合70/100奖励200.00Epislon0.010\n",
"回合80/100奖励200.00Epislon0.010\n",
"回合90/100奖励200.00Epislon0.010\n",
"回合100/100奖励200.00Epislon0.010\n",
"完成训练!\n",
"开始测试!\n",
"回合1/10奖励200.00\n",
"回合2/10奖励200.00\n",
"回合3/10奖励200.00\n",
"回合4/10奖励200.00\n",
"回合5/10奖励200.00\n",
"回合6/10奖励200.00\n",
"回合7/10奖励200.00\n",
"回合8/10奖励200.00\n",
"回合9/10奖励200.00\n",
"回合10/10奖励200.00\n",
"完成测试\n"
]
},
{
"data": {
"image/png": "",
"text/plain": [
"<Figure size 640x480 with 1 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"image/png": "",
"text/plain": [
"<Figure size 640x480 with 1 Axes>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"# 获取参数\n",
"cfg = Config() \n",
"# 训练\n",
"env, agent = env_agent_config(cfg)\n",
"res_dic = train(cfg, env, agent)\n",
" \n",
"plot_rewards(res_dic['rewards'], title=f\"training curve on {cfg.device} of {cfg.algo_name} for {cfg.env_name}\") \n",
"# 测试\n",
"res_dic = test(cfg, env, agent)\n",
"plot_rewards(res_dic['rewards'], title=f\"testing curve on {cfg.device} of {cfg.algo_name} for {cfg.env_name}\") # 画出结果"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.7.12 ('easyrl')",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.12"
},
"orig_nbformat": 4,
"vscode": {
"interpreter": {
"hash": "f5a9629e9f3b9957bf68a43815f911e93447d47b3d065b6a8a04975e44c504d9"
}
}
},
"nbformat": 4,
"nbformat_minor": 2
}