{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## 1、分析伪代码\n", "\n", "目前DQN算法基本遵循[Nature DQN](https://www.nature.com/articles/nature14236)的伪代码步骤,如下:\n", "\n", "
\n", "\"\" \n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1、定义算法\n", "\n", "教程中提到相比于Q learning,DQN本质上是为了适应更为复杂的环境,并且经过不断的改良迭代,到了Nature DQN(即Volodymyr Mnih发表的Nature论文)这里才算是基本完善。DQN主要改动的点有三个:\n", "* 使用深度神经网络替代原来的Q表:这个很容易理解原因\n", "* 使用了经验回放(Replay Buffer):这个好处有很多,一个是使用一堆历史数据去训练,比之前用一次就扔掉好多了,大大提高样本效率,另外一个是面试常提到的,减少样本之间的相关性,原则上获取经验跟学习阶段是分开的,原来时序的训练数据有可能是不稳定的,打乱之后再学习有助于提高训练的稳定性,跟深度学习中划分训练测试集时打乱样本是一个道理。\n", "* 使用了两个网络:即策略网络和目标网络,每隔若干步才把每步更新的策略网络参数复制给目标网络,这样做也是为了训练的稳定,避免Q值的估计发散。想象一下,如果当前有个transition(这个Q learning中提过的,一定要记住!!!)样本导致对Q值进行了较差的过估计,如果接下来从经验回放中提取到的样本正好连续几个都这样的,很有可能导致Q值的发散(它的青春小鸟一去不回来了)。再打个比方,我们玩RPG或者闯关类游戏,有些人为了破纪录经常Save和Load,只要我出了错,我不满意我就加载之前的存档,假设不允许加载呢,就像DQN算法一样训练过程中会退不了,这时候是不是搞两个档,一个档每帧都存一下,另外一个档打了不错的结果再存,也就是若干个间隔再存一下,到最后用间隔若干步数再存的档一般都比每帧都存的档好些呢。当然你也可以再搞更多个档,也就是DQN增加多个目标网络,但是对于DQN则没有多大必要,多几个网络效果不见得会好很多。" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.1、定义模型\n", "\n", "前面说了DQN的模型不再是Q表,而是一个深度神经网络,这里我只用了一个三层的全连接网络(FCN),这种网络也叫多层感知机(MLP),至于怎么用Torch写网络这里就不多说明了,以下仅供参考。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class MLP(nn.Module):\n", " def __init__(self, n_states,n_actions,hidden_dim=128):\n", " \"\"\" 初始化q网络,为全连接网络\n", " n_states: 输入的特征数即环境的状态维度\n", " n_actions: 输出的动作维度\n", " \"\"\"\n", " super(MLP, self).__init__()\n", " self.fc1 = nn.Linear(n_states, hidden_dim) # 输入层\n", " self.fc2 = nn.Linear(hidden_dim,hidden_dim) # 隐藏层\n", " self.fc3 = nn.Linear(hidden_dim, n_actions) # 输出层\n", " \n", " def forward(self, x):\n", " # 各层对应的激活函数\n", " x = F.relu(self.fc1(x)) \n", " x = F.relu(self.fc2(x))\n", " return self.fc3(x)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.2、定义经验回放\n", "\n", "经验回放首先是具有一定容量的,只有存储一定的transition网络才会更新,否则就退回到了之前的逐步更新了。另外写经验回放的时候一般需要包涵两个功能或方法,一个是push,即将一个transition样本按顺序放到经验回放中,如果满了就把最开始放进去的样本挤掉,因此如果大家学过数据结构的话推荐用队列来写,虽然这里不是。另外一个是sample,很简单就是随机采样出一个或者若干个(具体多少就是batch_size了)样本供DQN网络更新。功能讲清楚了,大家可以按照自己的想法用代码来实现,可以肯定地说,我这里不是最高效的,毕竟这还是青涩时期写出的代码。" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class ReplayBuffer:\n", " def __init__(self, capacity):\n", " self.capacity = capacity # 经验回放的容量\n", " self.buffer = [] # 缓冲区\n", " self.position = 0 \n", " \n", " def push(self, state, action, reward, next_state, done):\n", " ''' 缓冲区是一个队列,容量超出时去掉开始存入的转移(transition)\n", " '''\n", " if len(self.buffer) < self.capacity:\n", " self.buffer.append(None)\n", " self.buffer[self.position] = (state, action, reward, next_state, done)\n", " self.position = (self.position + 1) % self.capacity \n", " \n", " def sample(self, batch_size):\n", " batch = random.sample(self.buffer, batch_size) # 随机采出小批量转移\n", " state, action, reward, next_state, done = zip(*batch) # 解压成状态,动作等\n", " return state, action, reward, next_state, done\n", " \n", " def __len__(self):\n", " ''' 返回当前存储的量\n", " '''\n", " return len(self.buffer)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1.3、真--定义算法\n", "\n", "到了高级一点的算法,定义算法就比较麻烦,要先定义一些子模块。可以看到,其实去掉子模块的话,DQN跟Q learning的算法结构没啥区别,当然因为神经网络一般需要Torch或者Tensorflow来写,因此推荐大家先去学一学这些工具,比如\"eat_pytorch_in_20_days\"。\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class DQN:\n", " def __init__(self,n_actions,model,memory,cfg):\n", "\n", " self.n_actions = n_actions \n", " self.device = torch.device(cfg.device) # cpu or cuda\n", " self.gamma = cfg.gamma # 奖励的折扣因子\n", " # e-greedy策略相关参数\n", " self.sample_count = 0 # 用于epsilon的衰减计数\n", " self.epsilon = lambda sample_count: cfg.epsilon_end + \\\n", " (cfg.epsilon_start - cfg.epsilon_end) * \\\n", " math.exp(-1. * sample_count / cfg.epsilon_decay)\n", " self.batch_size = cfg.batch_size\n", " self.policy_net = model.to(self.device)\n", " self.target_net = model.to(self.device)\n", " for target_param, param in zip(self.target_net.parameters(),self.policy_net.parameters()): # 复制参数到目标网路targe_net\n", " target_param.data.copy_(param.data)\n", " self.optimizer = optim.Adam(self.policy_net.parameters(), lr=cfg.lr) # 优化器\n", " self.memory = memory # 经验回放\n", "\n", " def sample(self, state):\n", " ''' 选择动作\n", " '''\n", " self.sample_count += 1\n", " if random.random() > self.epsilon(self.sample_count):\n", " with torch.no_grad():\n", " state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)\n", " q_values = self.policy_net(state)\n", " action = q_values.max(1)[1].item() # 选择Q值最大的动作\n", " else:\n", " action = random.randrange(self.n_actions)\n", " return action\n", " def predict(self,state):\n", " with torch.no_grad():\n", " state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)\n", " q_values = self.policy_net(state)\n", " action = q_values.max(1)[1].item() # 选择Q值最大的动作\n", " return action\n", " def update(self):\n", " if len(self.memory) < self.batch_size: # 当memory中不满足一个批量时,不更新策略\n", " return\n", " # 从经验回放中(replay memory)中随机采样一个批量的转移(transition)\n", " \n", " state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample(\n", " self.batch_size)\n", " state_batch = torch.tensor(np.array(state_batch), device=self.device, dtype=torch.float)\n", " action_batch = torch.tensor(action_batch, device=self.device).unsqueeze(1) \n", " reward_batch = torch.tensor(reward_batch, device=self.device, dtype=torch.float) \n", " next_state_batch = torch.tensor(np.array(next_state_batch), device=self.device, dtype=torch.float)\n", " done_batch = torch.tensor(np.float32(done_batch), device=self.device)\n", " q_values = self.policy_net(state_batch).gather(dim=1, index=action_batch) # 计算当前状态(s_t,a)对应的Q(s_t, a)\n", " next_q_values = self.target_net(next_state_batch).max(1)[0].detach() # 计算下一时刻的状态(s_t_,a)对应的Q值\n", " # 计算期望的Q值,对于终止状态,此时done_batch[0]=1, 对应的expected_q_value等于reward\n", " expected_q_values = reward_batch + self.gamma * next_q_values * (1-done_batch)\n", " loss = nn.MSELoss()(q_values, expected_q_values.unsqueeze(1)) # 计算均方根损失\n", " # 优化更新模型\n", " self.optimizer.zero_grad() \n", " loss.backward()\n", " for param in self.policy_net.parameters(): # clip防止梯度爆炸\n", " param.grad.data.clamp_(-1, 1)\n", " self.optimizer.step() \n", "\n", " def save(self, path):\n", " torch.save(self.target_net.state_dict(), path+'checkpoint.pth')\n", "\n", " def load(self, path):\n", " self.target_net.load_state_dict(torch.load(path+'checkpoint.pth'))\n", " for target_param, param in zip(self.target_net.parameters(), self.policy_net.parameters()):\n", " param.data.copy_(target_param.data)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2、定义训练" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def train(cfg, env, agent):\n", " ''' 训练\n", " '''\n", " print(\"开始训练!\")\n", " print(f\"回合:{cfg.env_name}, 算法:{cfg.algo_name}, 设备:{cfg.device}\")\n", " rewards = [] # 记录所有回合的奖励\n", " steps = []\n", " for i_ep in range(cfg.train_eps):\n", " ep_reward = 0 # 记录一回合内的奖励\n", " ep_step = 0\n", " state = env.reset() # 重置环境,返回初始状态\n", " while True:\n", " ep_step += 1\n", " action = agent.sample(state) # 选择动作\n", " next_state, reward, done, _ = env.step(action) # 更新环境,返回transition\n", " agent.memory.push(state, action, reward,\n", " next_state, done) # 保存transition\n", " state = next_state # 更新下一个状态\n", " agent.update() # 更新智能体\n", " ep_reward += reward # 累加奖励\n", " if done:\n", " break\n", " if (i_ep + 1) % cfg.target_update == 0: # 智能体目标网络更新\n", " agent.target_net.load_state_dict(agent.policy_net.state_dict())\n", " steps.append(ep_step)\n", " rewards.append(ep_reward)\n", " if (i_ep + 1) % 10 == 0:\n", " print(f'回合:{i_ep+1}/{cfg.train_eps},奖励:{ep_reward:.2f},Epislon:{agent.epsilon(agent.frame_idx):.3f}')\n", " print(\"完成训练!\")\n", " env.close()\n", " res_dic = {'rewards':rewards}\n", " return res_dic\n", "\n", "def test(cfg, env, agent):\n", " print(\"开始测试!\")\n", " print(f\"回合:{cfg.env_name}, 算法:{cfg.algo_name}, 设备:{cfg.device}\")\n", " rewards = [] # 记录所有回合的奖励\n", " steps = []\n", " for i_ep in range(cfg.test_eps):\n", " ep_reward = 0 # 记录一回合内的奖励\n", " ep_step = 0\n", " state = env.reset() # 重置环境,返回初始状态\n", " while True:\n", " ep_step+=1\n", " action = agent.predict(state) # 选择动作\n", " next_state, reward, done, _ = env.step(action) # 更新环境,返回transition\n", " state = next_state # 更新下一个状态\n", " ep_reward += reward # 累加奖励\n", " if done:\n", " break\n", " steps.append(ep_step)\n", " rewards.append(ep_reward)\n", " print(f'回合:{i_ep+1}/{cfg.train_eps},奖励:{ep_reward:.2f}')\n", " print(\"完成测试\")\n", " env.close()\n", " return {'rewards':rewards}" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.7.13 ('easyrl')", "language": "python", "name": "python3" }, "language_info": { "name": "python", "version": "3.7.13" }, "orig_nbformat": 4, "vscode": { "interpreter": { "hash": "8994a120d39b6e6a2ecc94b4007f5314b68aa69fc88a7f00edf21be39b41f49c" } } }, "nbformat": 4, "nbformat_minor": 2 }