203 lines
8.8 KiB
Plaintext
203 lines
8.8 KiB
Plaintext
{
|
||
"cells": [
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"## 1. 定义算法\n",
|
||
"\n",
|
||
"最基础的策略梯度算法就是REINFORCE算法,又称作Monte-Carlo Policy Gradient算法。我们策略优化的目标如下:\n",
|
||
"\n",
|
||
"$$\n",
|
||
"J_{\\theta}= \\Psi_{\\pi} \\nabla_\\theta \\log \\pi_\\theta\\left(a_t \\mid s_t\\right)\n",
|
||
"$$\n",
|
||
"\n",
|
||
"其中$\\Psi_{\\pi}$在REINFORCE算法中表示衰减的回报(具体公式见伪代码),也可以用优势来估计,也就是我们熟知的A3C算法,这个在后面包括GAE算法中都会讲到。\n",
|
||
"\n",
|
||
"### 1.1. 策略函数设计\n",
|
||
"\n",
|
||
"既然策略梯度是直接对策略函数进行梯度计算,那么策略函数如何设计呢?一般来讲有两种设计方式,一个是softmax函数,另外一个是高斯分布$\\mathbb{N}\\left(\\phi(\\mathbb{s})^{\\mathbb{\\pi}} \\theta, \\sigma^2\\right)$,前者用于离散动作空间,后者多用于连续动作空间。\n",
|
||
"\n",
|
||
"softmax函数可以表示为:\n",
|
||
"$$\n",
|
||
"\\pi_\\theta(s, a)=\\frac{e^{\\phi(s, a)^{T_\\theta}}}{\\sum_b e^{\\phi(s, b)^{T^T}}}\n",
|
||
"$$\n",
|
||
"对应的梯度为:\n",
|
||
"$$\n",
|
||
"\\nabla_\\theta \\log \\pi_\\theta(s, a)=\\phi(s, a)-\\mathbb{E}_{\\pi_\\theta}[\\phi(s,)\n",
|
||
"$$\n",
|
||
"高斯分布对应的梯度为:\n",
|
||
"$$\n",
|
||
"\\nabla_\\theta \\log \\pi_\\theta(s, a)=\\frac{\\left(a-\\phi(s)^T \\theta\\right) \\phi(s)}{\\sigma^2}\n",
|
||
"$$\n",
|
||
"但是对于一些特殊的情况,例如在本次演示中动作维度=2且为离散空间,这个时候可以用伯努利分布来实现,这种方式其实是不推荐的,这里给大家做演示也是为了展现一些特殊情况,启发大家一些思考,例如Bernoulli,Binomial,Gaussian分布之间的关系。简单说来,Binomial分布,$n = 1$时就是Bernoulli分布,$n \\rightarrow \\infty$时就是Gaussian分布。\n",
|
||
"\n",
|
||
"\n"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"### 1.2. 模型设计\n",
|
||
"\n",
|
||
"前面讲到,尽管本次演示是离散空间,但是由于动作维度等于2,此时就可以用特殊的高斯分布来表示策略函数,即伯努利分布。伯努利的分布实际上是用一个概率作为输入,然后从中采样动作,伯努利采样出来的动作只可能是0或1,就像投掷出硬币的正反面。在这种情况下,我们的策略模型就需要在MLP的基础上,将状态作为输入,将动作作为倒数第二层输出,并在最后一层增加激活函数来输出对应动作的概率。不清楚激活函数作用的同学可以再看一遍深度学习相关的知识,简单来说其作用就是增加神经网络的非线性。既然需要输出对应动作的概率,那么输出的值需要处于0-1之间,此时sigmoid函数刚好满足我们的需求,实现代码参考如下。"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"import torch\n",
|
||
"import torch.nn as nn\n",
|
||
"import torch.nn.functional as F\n",
|
||
"class PGNet(nn.Module):\n",
|
||
" def __init__(self, input_dim,output_dim,hidden_dim=128):\n",
|
||
" \"\"\" 初始化q网络,为全连接网络\n",
|
||
" input_dim: 输入的特征数即环境的状态维度\n",
|
||
" output_dim: 输出的动作维度\n",
|
||
" \"\"\"\n",
|
||
" super(PGNet, self).__init__()\n",
|
||
" self.fc1 = nn.Linear(input_dim, hidden_dim) # 输入层\n",
|
||
" self.fc2 = nn.Linear(hidden_dim,hidden_dim) # 隐藏层\n",
|
||
" self.fc3 = nn.Linear(hidden_dim, output_dim) # 输出层\n",
|
||
" def forward(self, x):\n",
|
||
" x = F.relu(self.fc1(x))\n",
|
||
" x = F.relu(self.fc2(x))\n",
|
||
" x = torch.sigmoid(self.fc3(x))\n",
|
||
" return x"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "markdown",
|
||
"metadata": {},
|
||
"source": [
|
||
"### 1.3. 更新函数设计\n",
|
||
"\n",
|
||
"前面提到我们的优化目标也就是策略梯度算法的损失函数如下:\n",
|
||
"$$\n",
|
||
"J_{\\theta}= \\Psi_{\\pi} \\nabla_\\theta \\log \\pi_\\theta\\left(a_t \\mid s_t\\right)\n",
|
||
"$$\n",
|
||
"\n",
|
||
"我们需要拆开成两个部分$\\Psi_{\\pi}$和$\\nabla_\\theta \\log \\pi_\\theta\\left(a_t \\mid s_t\\right)$分开计算,首先看值函数部分$\\Psi_{\\pi}$,在REINFORCE算法中值函数是从当前时刻开始的衰减回报,如下:\n",
|
||
"$$\n",
|
||
"G \\leftarrow \\sum_{k=t+1}^{T} \\gamma^{k-1} r_{k}\n",
|
||
"$$\n",
|
||
"\n",
|
||
"这个实际用代码来实现的时候可能有点绕,我们可以倒过来看,在同一回合下,我们的终止时刻是$T$,那么对应的回报$G_T=\\gamma^{T-1}r_T$,而对应的$G_{T-1}=\\gamma^{T-2}r_{T-1}+\\gamma^{T-1}r_T$,在这里代码中我们使用了一个动态规划的技巧,如下:\n",
|
||
"```python\n",
|
||
"running_add = running_add * self.gamma + reward_pool[i] # running_add初始值为0\n",
|
||
"```\n",
|
||
"这个公式也是倒过来循环的,第一次的值等于:\n",
|
||
"$$\n",
|
||
"running\\_add = r_T\n",
|
||
"$$\n",
|
||
"第二次的值则等于:\n",
|
||
"$$\n",
|
||
"running\\_add = r_T*\\gamma+r_{T-1}\n",
|
||
"$$\n",
|
||
"第三次的值等于:\n",
|
||
"$$\n",
|
||
"running\\_add = (r_T*\\gamma+r_{T-1})*\\gamma+r_{T-2} = r_T*\\gamma^2+r_{T-1}*\\gamma+r_{T-2}\n",
|
||
"$$\n"
|
||
]
|
||
},
|
||
{
|
||
"cell_type": "code",
|
||
"execution_count": null,
|
||
"metadata": {},
|
||
"outputs": [],
|
||
"source": [
|
||
"import torch\n",
|
||
"from torch.distributions import Bernoulli\n",
|
||
"from torch.autograd import Variable\n",
|
||
"import numpy as np\n",
|
||
"\n",
|
||
"class PolicyGradient:\n",
|
||
" \n",
|
||
" def __init__(self, model,memory,cfg):\n",
|
||
" self.gamma = cfg['gamma']\n",
|
||
" self.device = torch.device(cfg['device']) \n",
|
||
" self.memory = memory\n",
|
||
" self.policy_net = model.to(self.device)\n",
|
||
" self.optimizer = torch.optim.RMSprop(self.policy_net.parameters(), lr=cfg['lr'])\n",
|
||
"\n",
|
||
" def sample_action(self,state):\n",
|
||
"\n",
|
||
" state = torch.from_numpy(state).float()\n",
|
||
" state = Variable(state)\n",
|
||
" probs = self.policy_net(state)\n",
|
||
" m = Bernoulli(probs) # 伯努利分布\n",
|
||
" action = m.sample()\n",
|
||
" \n",
|
||
" action = action.data.numpy().astype(int)[0] # 转为标量\n",
|
||
" return action\n",
|
||
" def predict_action(self,state):\n",
|
||
"\n",
|
||
" state = torch.from_numpy(state).float()\n",
|
||
" state = Variable(state)\n",
|
||
" probs = self.policy_net(state)\n",
|
||
" m = Bernoulli(probs) # 伯努利分布\n",
|
||
" action = m.sample()\n",
|
||
" action = action.data.numpy().astype(int)[0] # 转为标量\n",
|
||
" return action\n",
|
||
" \n",
|
||
" def update(self):\n",
|
||
" state_pool,action_pool,reward_pool= self.memory.sample()\n",
|
||
" state_pool,action_pool,reward_pool = list(state_pool),list(action_pool),list(reward_pool)\n",
|
||
" # Discount reward\n",
|
||
" running_add = 0\n",
|
||
" for i in reversed(range(len(reward_pool))):\n",
|
||
" if reward_pool[i] == 0:\n",
|
||
" running_add = 0\n",
|
||
" else:\n",
|
||
" running_add = running_add * self.gamma + reward_pool[i]\n",
|
||
" reward_pool[i] = running_add\n",
|
||
"\n",
|
||
" # Normalize reward\n",
|
||
" reward_mean = np.mean(reward_pool)\n",
|
||
" reward_std = np.std(reward_pool)\n",
|
||
" for i in range(len(reward_pool)):\n",
|
||
" reward_pool[i] = (reward_pool[i] - reward_mean) / reward_std\n",
|
||
"\n",
|
||
" # Gradient Desent\n",
|
||
" self.optimizer.zero_grad()\n",
|
||
"\n",
|
||
" for i in range(len(reward_pool)):\n",
|
||
" state = state_pool[i]\n",
|
||
" action = Variable(torch.FloatTensor([action_pool[i]]))\n",
|
||
" reward = reward_pool[i]\n",
|
||
" state = Variable(torch.from_numpy(state).float())\n",
|
||
" probs = self.policy_net(state)\n",
|
||
" m = Bernoulli(probs)\n",
|
||
" loss = -m.log_prob(action) * reward # Negtive score function x reward\n",
|
||
" # print(loss)\n",
|
||
" loss.backward()\n",
|
||
" self.optimizer.step()\n",
|
||
" self.memory.clear()"
|
||
]
|
||
}
|
||
],
|
||
"metadata": {
|
||
"kernelspec": {
|
||
"display_name": "Python 3.7.13 ('easyrl')",
|
||
"language": "python",
|
||
"name": "python3"
|
||
},
|
||
"language_info": {
|
||
"name": "python",
|
||
"version": "3.7.13"
|
||
},
|
||
"orig_nbformat": 4,
|
||
"vscode": {
|
||
"interpreter": {
|
||
"hash": "8994a120d39b6e6a2ecc94b4007f5314b68aa69fc88a7f00edf21be39b41f49c"
|
||
}
|
||
}
|
||
},
|
||
"nbformat": 4,
|
||
"nbformat_minor": 2
|
||
}
|