Files
easy-rl/projects/notebooks/3.DQN.ipynb
2022-08-15 22:31:37 +08:00

278 lines
14 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1、分析伪代码\n",
"\n",
"目前DQN算法基本遵循[Nature DQN](https://www.nature.com/articles/nature14236)的伪代码步骤,如下:\n",
"\n",
"<div align=\"center\">\n",
"<img src=\"./figs/dqn_pseu.png\" alt=\"\" style=\"zoom:40%;\" /> \n",
"</div>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 1、定义算法\n",
"\n",
"教程中提到相比于Q learningDQN本质上是为了适应更为复杂的环境并且经过不断的改良迭代到了Nature DQN即Volodymyr Mnih发表的Nature论文这里才算是基本完善。DQN主要改动的点有三个\n",
"* 使用深度神经网络替代原来的Q表这个很容易理解原因\n",
"* 使用了经验回放Replay Buffer这个好处有很多一个是使用一堆历史数据去训练比之前用一次就扔掉好多了大大提高样本效率另外一个是面试常提到的减少样本之间的相关性原则上获取经验跟学习阶段是分开的原来时序的训练数据有可能是不稳定的打乱之后再学习有助于提高训练的稳定性跟深度学习中划分训练测试集时打乱样本是一个道理。\n",
"* 使用了两个网络即策略网络和目标网络每隔若干步才把每步更新的策略网络参数复制给目标网络这样做也是为了训练的稳定避免Q值的估计发散。想象一下如果当前有个transition这个Q learning中提过的一定要记住样本导致对Q值进行了较差的过估计如果接下来从经验回放中提取到的样本正好连续几个都这样的很有可能导致Q值的发散它的青春小鸟一去不回来了。再打个比方我们玩RPG或者闯关类游戏有些人为了破纪录经常Save和Load只要我出了错我不满意我就加载之前的存档假设不允许加载呢就像DQN算法一样训练过程中会退不了这时候是不是搞两个档一个档每帧都存一下另外一个档打了不错的结果再存也就是若干个间隔再存一下到最后用间隔若干步数再存的档一般都比每帧都存的档好些呢。当然你也可以再搞更多个档也就是DQN增加多个目标网络但是对于DQN则没有多大必要多几个网络效果不见得会好很多。"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 1.1、定义模型\n",
"\n",
"前面说了DQN的模型不再是Q表而是一个深度神经网络这里我只用了一个三层的全连接网络FCN这种网络也叫多层感知机MLP至于怎么用Torch写网络这里就不多说明了以下仅供参考。"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class MLP(nn.Module):\n",
" def __init__(self, n_states,n_actions,hidden_dim=128):\n",
" \"\"\" 初始化q网络为全连接网络\n",
" n_states: 输入的特征数即环境的状态维度\n",
" n_actions: 输出的动作维度\n",
" \"\"\"\n",
" super(MLP, self).__init__()\n",
" self.fc1 = nn.Linear(n_states, hidden_dim) # 输入层\n",
" self.fc2 = nn.Linear(hidden_dim,hidden_dim) # 隐藏层\n",
" self.fc3 = nn.Linear(hidden_dim, n_actions) # 输出层\n",
" \n",
" def forward(self, x):\n",
" # 各层对应的激活函数\n",
" x = F.relu(self.fc1(x)) \n",
" x = F.relu(self.fc2(x))\n",
" return self.fc3(x)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 1.2、定义经验回放\n",
"\n",
"经验回放首先是具有一定容量的只有存储一定的transition网络才会更新否则就退回到了之前的逐步更新了。另外写经验回放的时候一般需要包涵两个功能或方法一个是push即将一个transition样本按顺序放到经验回放中如果满了就把最开始放进去的样本挤掉因此如果大家学过数据结构的话推荐用队列来写虽然这里不是。另外一个是sample很简单就是随机采样出一个或者若干个具体多少就是batch_size了样本供DQN网络更新。功能讲清楚了大家可以按照自己的想法用代码来实现可以肯定地说我这里不是最高效的毕竟这还是青涩时期写出的代码。"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class ReplayBuffer:\n",
" def __init__(self, capacity):\n",
" self.capacity = capacity # 经验回放的容量\n",
" self.buffer = [] # 缓冲区\n",
" self.position = 0 \n",
" \n",
" def push(self, state, action, reward, next_state, done):\n",
" ''' 缓冲区是一个队列,容量超出时去掉开始存入的转移(transition)\n",
" '''\n",
" if len(self.buffer) < self.capacity:\n",
" self.buffer.append(None)\n",
" self.buffer[self.position] = (state, action, reward, next_state, done)\n",
" self.position = (self.position + 1) % self.capacity \n",
" \n",
" def sample(self, batch_size):\n",
" batch = random.sample(self.buffer, batch_size) # 随机采出小批量转移\n",
" state, action, reward, next_state, done = zip(*batch) # 解压成状态,动作等\n",
" return state, action, reward, next_state, done\n",
" \n",
" def __len__(self):\n",
" ''' 返回当前存储的量\n",
" '''\n",
" return len(self.buffer)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 1.3、真--定义算法\n",
"\n",
"到了高级一点的算法定义算法就比较麻烦要先定义一些子模块。可以看到其实去掉子模块的话DQN跟Q learning的算法结构没啥区别当然因为神经网络一般需要Torch或者Tensorflow来写因此推荐大家先去学一学这些工具比如\"eat_pytorch_in_20_days\"。\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class DQN:\n",
" def __init__(self,n_actions,model,memory,cfg):\n",
"\n",
" self.n_actions = n_actions \n",
" self.device = torch.device(cfg.device) # cpu or cuda\n",
" self.gamma = cfg.gamma # 奖励的折扣因子\n",
" # e-greedy策略相关参数\n",
" self.sample_count = 0 # 用于epsilon的衰减计数\n",
" self.epsilon = lambda sample_count: cfg.epsilon_end + \\\n",
" (cfg.epsilon_start - cfg.epsilon_end) * \\\n",
" math.exp(-1. * sample_count / cfg.epsilon_decay)\n",
" self.batch_size = cfg.batch_size\n",
" self.policy_net = model.to(self.device)\n",
" self.target_net = model.to(self.device)\n",
" for target_param, param in zip(self.target_net.parameters(),self.policy_net.parameters()): # 复制参数到目标网路targe_net\n",
" target_param.data.copy_(param.data)\n",
" self.optimizer = optim.Adam(self.policy_net.parameters(), lr=cfg.lr) # 优化器\n",
" self.memory = memory # 经验回放\n",
"\n",
" def sample(self, state):\n",
" ''' 选择动作\n",
" '''\n",
" self.sample_count += 1\n",
" if random.random() > self.epsilon(self.sample_count):\n",
" with torch.no_grad():\n",
" state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)\n",
" q_values = self.policy_net(state)\n",
" action = q_values.max(1)[1].item() # 选择Q值最大的动作\n",
" else:\n",
" action = random.randrange(self.n_actions)\n",
" return action\n",
" def predict(self,state):\n",
" with torch.no_grad():\n",
" state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)\n",
" q_values = self.policy_net(state)\n",
" action = q_values.max(1)[1].item() # 选择Q值最大的动作\n",
" return action\n",
" def update(self):\n",
" if len(self.memory) < self.batch_size: # 当memory中不满足一个批量时不更新策略\n",
" return\n",
" # 从经验回放中(replay memory)中随机采样一个批量的转移(transition)\n",
" \n",
" state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample(\n",
" self.batch_size)\n",
" state_batch = torch.tensor(np.array(state_batch), device=self.device, dtype=torch.float)\n",
" action_batch = torch.tensor(action_batch, device=self.device).unsqueeze(1) \n",
" reward_batch = torch.tensor(reward_batch, device=self.device, dtype=torch.float) \n",
" next_state_batch = torch.tensor(np.array(next_state_batch), device=self.device, dtype=torch.float)\n",
" done_batch = torch.tensor(np.float32(done_batch), device=self.device)\n",
" q_values = self.policy_net(state_batch).gather(dim=1, index=action_batch) # 计算当前状态(s_t,a)对应的Q(s_t, a)\n",
" next_q_values = self.target_net(next_state_batch).max(1)[0].detach() # 计算下一时刻的状态(s_t_,a)对应的Q值\n",
" # 计算期望的Q值对于终止状态此时done_batch[0]=1, 对应的expected_q_value等于reward\n",
" expected_q_values = reward_batch + self.gamma * next_q_values * (1-done_batch)\n",
" loss = nn.MSELoss()(q_values, expected_q_values.unsqueeze(1)) # 计算均方根损失\n",
" # 优化更新模型\n",
" self.optimizer.zero_grad() \n",
" loss.backward()\n",
" for param in self.policy_net.parameters(): # clip防止梯度爆炸\n",
" param.grad.data.clamp_(-1, 1)\n",
" self.optimizer.step() \n",
"\n",
" def save(self, path):\n",
" torch.save(self.target_net.state_dict(), path+'checkpoint.pth')\n",
"\n",
" def load(self, path):\n",
" self.target_net.load_state_dict(torch.load(path+'checkpoint.pth'))\n",
" for target_param, param in zip(self.target_net.parameters(), self.policy_net.parameters()):\n",
" param.data.copy_(target_param.data)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## 2、定义训练"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def train(cfg, env, agent):\n",
" ''' 训练\n",
" '''\n",
" print(\"开始训练!\")\n",
" print(f\"回合:{cfg.env_name}, 算法:{cfg.algo_name}, 设备:{cfg.device}\")\n",
" rewards = [] # 记录所有回合的奖励\n",
" steps = []\n",
" for i_ep in range(cfg.train_eps):\n",
" ep_reward = 0 # 记录一回合内的奖励\n",
" ep_step = 0\n",
" state = env.reset() # 重置环境,返回初始状态\n",
" while True:\n",
" ep_step += 1\n",
" action = agent.sample(state) # 选择动作\n",
" next_state, reward, done, _ = env.step(action) # 更新环境返回transition\n",
" agent.memory.push(state, action, reward,\n",
" next_state, done) # 保存transition\n",
" state = next_state # 更新下一个状态\n",
" agent.update() # 更新智能体\n",
" ep_reward += reward # 累加奖励\n",
" if done:\n",
" break\n",
" if (i_ep + 1) % cfg.target_update == 0: # 智能体目标网络更新\n",
" agent.target_net.load_state_dict(agent.policy_net.state_dict())\n",
" steps.append(ep_step)\n",
" rewards.append(ep_reward)\n",
" if (i_ep + 1) % 10 == 0:\n",
" print(f'回合:{i_ep+1}/{cfg.train_eps},奖励:{ep_reward:.2f}Epislon{agent.epsilon(agent.frame_idx):.3f}')\n",
" print(\"完成训练!\")\n",
" env.close()\n",
" res_dic = {'rewards':rewards}\n",
" return res_dic\n",
"\n",
"def test(cfg, env, agent):\n",
" print(\"开始测试!\")\n",
" print(f\"回合:{cfg.env_name}, 算法:{cfg.algo_name}, 设备:{cfg.device}\")\n",
" rewards = [] # 记录所有回合的奖励\n",
" steps = []\n",
" for i_ep in range(cfg.test_eps):\n",
" ep_reward = 0 # 记录一回合内的奖励\n",
" ep_step = 0\n",
" state = env.reset() # 重置环境,返回初始状态\n",
" while True:\n",
" ep_step+=1\n",
" action = agent.predict(state) # 选择动作\n",
" next_state, reward, done, _ = env.step(action) # 更新环境返回transition\n",
" state = next_state # 更新下一个状态\n",
" ep_reward += reward # 累加奖励\n",
" if done:\n",
" break\n",
" steps.append(ep_step)\n",
" rewards.append(ep_reward)\n",
" print(f'回合:{i_ep+1}/{cfg.train_eps},奖励:{ep_reward:.2f}')\n",
" print(\"完成测试\")\n",
" env.close()\n",
" return {'rewards':rewards}"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3.7.13 ('easyrl')",
"language": "python",
"name": "python3"
},
"language_info": {
"name": "python",
"version": "3.7.13"
},
"orig_nbformat": 4,
"vscode": {
"interpreter": {
"hash": "8994a120d39b6e6a2ecc94b4007f5314b68aa69fc88a7f00edf21be39b41f49c"
}
}
},
"nbformat": 4,
"nbformat_minor": 2
}