278 lines
14 KiB
Plaintext
278 lines
14 KiB
Plaintext
{
|
||
"cells": [
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"## 1、分析伪代码\n",
|
||
"\n",
|
||
"目前DQN算法基本遵循[Nature DQN](https://www.nature.com/articles/nature14236)的伪代码步骤,如下:\n",
|
||
"\n",
|
||
"<div align=\"center\">\n",
|
||
"<img src=\"./figs/dqn_pseu.png\" alt=\"\" style=\"zoom:40%;\" /> \n",
|
||
"</div>"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"## 1、定义算法\n",
|
||
"\n",
|
||
"教程中提到相比于Q learning,DQN本质上是为了适应更为复杂的环境,并且经过不断的改良迭代,到了Nature DQN(即Volodymyr Mnih发表的Nature论文)这里才算是基本完善。DQN主要改动的点有三个:\n",
|
||
"* 使用深度神经网络替代原来的Q表:这个很容易理解原因\n",
|
||
"* 使用了经验回放(Replay Buffer):这个好处有很多,一个是使用一堆历史数据去训练,比之前用一次就扔掉好多了,大大提高样本效率,另外一个是面试常提到的,减少样本之间的相关性,原则上获取经验跟学习阶段是分开的,原来时序的训练数据有可能是不稳定的,打乱之后再学习有助于提高训练的稳定性,跟深度学习中划分训练测试集时打乱样本是一个道理。\n",
|
||
"* 使用了两个网络:即策略网络和目标网络,每隔若干步才把每步更新的策略网络参数复制给目标网络,这样做也是为了训练的稳定,避免Q值的估计发散。想象一下,如果当前有个transition(这个Q learning中提过的,一定要记住!!!)样本导致对Q值进行了较差的过估计,如果接下来从经验回放中提取到的样本正好连续几个都这样的,很有可能导致Q值的发散(它的青春小鸟一去不回来了)。再打个比方,我们玩RPG或者闯关类游戏,有些人为了破纪录经常Save和Load,只要我出了错,我不满意我就加载之前的存档,假设不允许加载呢,就像DQN算法一样训练过程中会退不了,这时候是不是搞两个档,一个档每帧都存一下,另外一个档打了不错的结果再存,也就是若干个间隔再存一下,到最后用间隔若干步数再存的档一般都比每帧都存的档好些呢。当然你也可以再搞更多个档,也就是DQN增加多个目标网络,但是对于DQN则没有多大必要,多几个网络效果不见得会好很多。"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"### 1.1、定义模型\n",
|
||
"\n",
|
||
"前面说了DQN的模型不再是Q表,而是一个深度神经网络,这里我只用了一个三层的全连接网络(FCN),这种网络也叫多层感知机(MLP),至于怎么用Torch写网络这里就不多说明了,以下仅供参考。"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"class MLP(nn.Module):\n",
|
||
" def __init__(self, n_states,n_actions,hidden_dim=128):\n",
|
||
" \"\"\" 初始化q网络,为全连接网络\n",
|
||
" n_states: 输入的特征数即环境的状态维度\n",
|
||
" n_actions: 输出的动作维度\n",
|
||
" \"\"\"\n",
|
||
" super(MLP, self).__init__()\n",
|
||
" self.fc1 = nn.Linear(n_states, hidden_dim) # 输入层\n",
|
||
" self.fc2 = nn.Linear(hidden_dim,hidden_dim) # 隐藏层\n",
|
||
" self.fc3 = nn.Linear(hidden_dim, n_actions) # 输出层\n",
|
||
" \n",
|
||
" def forward(self, x):\n",
|
||
" # 各层对应的激活函数\n",
|
||
" x = F.relu(self.fc1(x)) \n",
|
||
" x = F.relu(self.fc2(x))\n",
|
||
" return self.fc3(x)"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"### 1.2、定义经验回放\n",
|
||
"\n",
|
||
"经验回放首先是具有一定容量的,只有存储一定的transition网络才会更新,否则就退回到了之前的逐步更新了。另外写经验回放的时候一般需要包涵两个功能或方法,一个是push,即将一个transition样本按顺序放到经验回放中,如果满了就把最开始放进去的样本挤掉,因此如果大家学过数据结构的话推荐用队列来写,虽然这里不是。另外一个是sample,很简单就是随机采样出一个或者若干个(具体多少就是batch_size了)样本供DQN网络更新。功能讲清楚了,大家可以按照自己的想法用代码来实现,可以肯定地说,我这里不是最高效的,毕竟这还是青涩时期写出的代码。"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"class ReplayBuffer:\n",
|
||
" def __init__(self, capacity):\n",
|
||
" self.capacity = capacity # 经验回放的容量\n",
|
||
" self.buffer = [] # 缓冲区\n",
|
||
" self.position = 0 \n",
|
||
" \n",
|
||
" def push(self, state, action, reward, next_state, done):\n",
|
||
" ''' 缓冲区是一个队列,容量超出时去掉开始存入的转移(transition)\n",
|
||
" '''\n",
|
||
" if len(self.buffer) < self.capacity:\n",
|
||
" self.buffer.append(None)\n",
|
||
" self.buffer[self.position] = (state, action, reward, next_state, done)\n",
|
||
" self.position = (self.position + 1) % self.capacity \n",
|
||
" \n",
|
||
" def sample(self, batch_size):\n",
|
||
" batch = random.sample(self.buffer, batch_size) # 随机采出小批量转移\n",
|
||
" state, action, reward, next_state, done = zip(*batch) # 解压成状态,动作等\n",
|
||
" return state, action, reward, next_state, done\n",
|
||
" \n",
|
||
" def __len__(self):\n",
|
||
" ''' 返回当前存储的量\n",
|
||
" '''\n",
|
||
" return len(self.buffer)"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"### 1.3、真--定义算法\n",
|
||
"\n",
|
||
"到了高级一点的算法,定义算法就比较麻烦,要先定义一些子模块。可以看到,其实去掉子模块的话,DQN跟Q learning的算法结构没啥区别,当然因为神经网络一般需要Torch或者Tensorflow来写,因此推荐大家先去学一学这些工具,比如\"eat_pytorch_in_20_days\"。\n"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"class DQN:\n",
|
||
" def __init__(self,n_actions,model,memory,cfg):\n",
|
||
"\n",
|
||
" self.n_actions = n_actions \n",
|
||
" self.device = torch.device(cfg.device) # cpu or cuda\n",
|
||
" self.gamma = cfg.gamma # 奖励的折扣因子\n",
|
||
" # e-greedy策略相关参数\n",
|
||
" self.sample_count = 0 # 用于epsilon的衰减计数\n",
|
||
" self.epsilon = lambda sample_count: cfg.epsilon_end + \\\n",
|
||
" (cfg.epsilon_start - cfg.epsilon_end) * \\\n",
|
||
" math.exp(-1. * sample_count / cfg.epsilon_decay)\n",
|
||
" self.batch_size = cfg.batch_size\n",
|
||
" self.policy_net = model.to(self.device)\n",
|
||
" self.target_net = model.to(self.device)\n",
|
||
" for target_param, param in zip(self.target_net.parameters(),self.policy_net.parameters()): # 复制参数到目标网路targe_net\n",
|
||
" target_param.data.copy_(param.data)\n",
|
||
" self.optimizer = optim.Adam(self.policy_net.parameters(), lr=cfg.lr) # 优化器\n",
|
||
" self.memory = memory # 经验回放\n",
|
||
"\n",
|
||
" def sample(self, state):\n",
|
||
" ''' 选择动作\n",
|
||
" '''\n",
|
||
" self.sample_count += 1\n",
|
||
" if random.random() > self.epsilon(self.sample_count):\n",
|
||
" with torch.no_grad():\n",
|
||
" state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)\n",
|
||
" q_values = self.policy_net(state)\n",
|
||
" action = q_values.max(1)[1].item() # 选择Q值最大的动作\n",
|
||
" else:\n",
|
||
" action = random.randrange(self.n_actions)\n",
|
||
" return action\n",
|
||
" def predict(self,state):\n",
|
||
" with torch.no_grad():\n",
|
||
" state = torch.tensor(state, device=self.device, dtype=torch.float32).unsqueeze(dim=0)\n",
|
||
" q_values = self.policy_net(state)\n",
|
||
" action = q_values.max(1)[1].item() # 选择Q值最大的动作\n",
|
||
" return action\n",
|
||
" def update(self):\n",
|
||
" if len(self.memory) < self.batch_size: # 当memory中不满足一个批量时,不更新策略\n",
|
||
" return\n",
|
||
" # 从经验回放中(replay memory)中随机采样一个批量的转移(transition)\n",
|
||
" \n",
|
||
" state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample(\n",
|
||
" self.batch_size)\n",
|
||
" state_batch = torch.tensor(np.array(state_batch), device=self.device, dtype=torch.float)\n",
|
||
" action_batch = torch.tensor(action_batch, device=self.device).unsqueeze(1) \n",
|
||
" reward_batch = torch.tensor(reward_batch, device=self.device, dtype=torch.float) \n",
|
||
" next_state_batch = torch.tensor(np.array(next_state_batch), device=self.device, dtype=torch.float)\n",
|
||
" done_batch = torch.tensor(np.float32(done_batch), device=self.device)\n",
|
||
" q_values = self.policy_net(state_batch).gather(dim=1, index=action_batch) # 计算当前状态(s_t,a)对应的Q(s_t, a)\n",
|
||
" next_q_values = self.target_net(next_state_batch).max(1)[0].detach() # 计算下一时刻的状态(s_t_,a)对应的Q值\n",
|
||
" # 计算期望的Q值,对于终止状态,此时done_batch[0]=1, 对应的expected_q_value等于reward\n",
|
||
" expected_q_values = reward_batch + self.gamma * next_q_values * (1-done_batch)\n",
|
||
" loss = nn.MSELoss()(q_values, expected_q_values.unsqueeze(1)) # 计算均方根损失\n",
|
||
" # 优化更新模型\n",
|
||
" self.optimizer.zero_grad() \n",
|
||
" loss.backward()\n",
|
||
" for param in self.policy_net.parameters(): # clip防止梯度爆炸\n",
|
||
" param.grad.data.clamp_(-1, 1)\n",
|
||
" self.optimizer.step() \n",
|
||
"\n",
|
||
" def save(self, path):\n",
|
||
" torch.save(self.target_net.state_dict(), path+'checkpoint.pth')\n",
|
||
"\n",
|
||
" def load(self, path):\n",
|
||
" self.target_net.load_state_dict(torch.load(path+'checkpoint.pth'))\n",
|
||
" for target_param, param in zip(self.target_net.parameters(), self.policy_net.parameters()):\n",
|
||
" param.data.copy_(target_param.data)"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"## 2、定义训练"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"def train(cfg, env, agent):\n",
|
||
" ''' 训练\n",
|
||
" '''\n",
|
||
" print(\"开始训练!\")\n",
|
||
" print(f\"回合:{cfg.env_name}, 算法:{cfg.algo_name}, 设备:{cfg.device}\")\n",
|
||
" rewards = [] # 记录所有回合的奖励\n",
|
||
" steps = []\n",
|
||
" for i_ep in range(cfg.train_eps):\n",
|
||
" ep_reward = 0 # 记录一回合内的奖励\n",
|
||
" ep_step = 0\n",
|
||
" state = env.reset() # 重置环境,返回初始状态\n",
|
||
" while True:\n",
|
||
" ep_step += 1\n",
|
||
" action = agent.sample(state) # 选择动作\n",
|
||
" next_state, reward, done, _ = env.step(action) # 更新环境,返回transition\n",
|
||
" agent.memory.push(state, action, reward,\n",
|
||
" next_state, done) # 保存transition\n",
|
||
" state = next_state # 更新下一个状态\n",
|
||
" agent.update() # 更新智能体\n",
|
||
" ep_reward += reward # 累加奖励\n",
|
||
" if done:\n",
|
||
" break\n",
|
||
" if (i_ep + 1) % cfg.target_update == 0: # 智能体目标网络更新\n",
|
||
" agent.target_net.load_state_dict(agent.policy_net.state_dict())\n",
|
||
" steps.append(ep_step)\n",
|
||
" rewards.append(ep_reward)\n",
|
||
" if (i_ep + 1) % 10 == 0:\n",
|
||
" print(f'回合:{i_ep+1}/{cfg.train_eps},奖励:{ep_reward:.2f},Epislon:{agent.epsilon(agent.frame_idx):.3f}')\n",
|
||
" print(\"完成训练!\")\n",
|
||
" env.close()\n",
|
||
" res_dic = {'rewards':rewards}\n",
|
||
" return res_dic\n",
|
||
"\n",
|
||
"def test(cfg, env, agent):\n",
|
||
" print(\"开始测试!\")\n",
|
||
" print(f\"回合:{cfg.env_name}, 算法:{cfg.algo_name}, 设备:{cfg.device}\")\n",
|
||
" rewards = [] # 记录所有回合的奖励\n",
|
||
" steps = []\n",
|
||
" for i_ep in range(cfg.test_eps):\n",
|
||
" ep_reward = 0 # 记录一回合内的奖励\n",
|
||
" ep_step = 0\n",
|
||
" state = env.reset() # 重置环境,返回初始状态\n",
|
||
" while True:\n",
|
||
" ep_step+=1\n",
|
||
" action = agent.predict(state) # 选择动作\n",
|
||
" next_state, reward, done, _ = env.step(action) # 更新环境,返回transition\n",
|
||
" state = next_state # 更新下一个状态\n",
|
||
" ep_reward += reward # 累加奖励\n",
|
||
" if done:\n",
|
||
" break\n",
|
||
" steps.append(ep_step)\n",
|
||
" rewards.append(ep_reward)\n",
|
||
" print(f'回合:{i_ep+1}/{cfg.train_eps},奖励:{ep_reward:.2f}')\n",
|
||
" print(\"完成测试\")\n",
|
||
" env.close()\n",
|
||
" return {'rewards':rewards}"
|
||
]
|
||
}
|
||
],
|
||
"metadata": {
|
||
"kernelspec": {
|
||
"display_name": "Python 3.7.13 ('easyrl')",
|
||
"language": "python",
|
||
"name": "python3"
|
||
},
|
||
"language_info": {
|
||
"name": "python",
|
||
"version": "3.7.13"
|
||
},
|
||
"orig_nbformat": 4,
|
||
"vscode": {
|
||
"interpreter": {
|
||
"hash": "8994a120d39b6e6a2ecc94b4007f5314b68aa69fc88a7f00edf21be39b41f49c"
|
||
}
|
||
}
|
||
},
|
||
"nbformat": 4,
|
||
"nbformat_minor": 2
|
||
}
|