update
This commit is contained in:
BIN
codes/Docs/assets/eval_rewards_curve_cn-1689282.png
Normal file
BIN
codes/Docs/assets/eval_rewards_curve_cn-1689282.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 34 KiB |
BIN
codes/Docs/assets/eval_rewards_curve_cn-1760950.png
Normal file
BIN
codes/Docs/assets/eval_rewards_curve_cn-1760950.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 66 KiB |
BIN
codes/Docs/assets/pendulum_1.png
Normal file
BIN
codes/Docs/assets/pendulum_1.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 148 KiB |
BIN
codes/Docs/assets/train_rewards_curve_cn-1689150.png
Normal file
BIN
codes/Docs/assets/train_rewards_curve_cn-1689150.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 45 KiB |
BIN
codes/Docs/assets/train_rewards_curve_cn-1760758.png
Normal file
BIN
codes/Docs/assets/train_rewards_curve_cn-1760758.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 74 KiB |
175
codes/Docs/使用DDPG解决倒立摆问题.md
Normal file
175
codes/Docs/使用DDPG解决倒立摆问题.md
Normal file
@@ -0,0 +1,175 @@
|
||||
前面项目讲的环境都是离散动作的,但实际中也有很多连续动作的环境,比如Open AI Gym中的[Pendulum-v0](https://github.com/openai/gym/wiki/Pendulum-v0)环境,它解决的是一个倒立摆问题,我们先对该环境做一个简要说明。
|
||||
|
||||
## Pendulum-v0简介
|
||||
|
||||
如果说 CartPole-v0 是一个离散动作的经典入门环境的话,那么对应 Pendulum-v0 就是连续动作的经典入门环境,如下图,我们通过施加力矩使其向上摆动并保持直立。
|
||||
|
||||
<img src="../../easy_rl_book/res/ch12/assets/pendulum_1.png" alt="image-20210915161550713" style="zoom:50%;" />
|
||||
|
||||
该环境的状态数有三个,设摆针竖直方向上的顺时针旋转角为$\theta$,$\theta$设在$[-\pi,\pi]$之间,则相应的状态为$[cos\theta,sin\theta,\dot{\theta}]$,即表示角度和角速度,我们的动作则是一个-2到2之间的力矩,它是一个连续量,因而该环境不能用离散动作的算法比如 DQN 来解决。关于奖励是根据相关的物理原理而计算出的等式,如下:
|
||||
$$
|
||||
-\left(\theta^{2}+0.1 * \hat{\theta}^{2}+0.001 * \text { action }^{2}\right)
|
||||
$$
|
||||
对于每一步,其最低奖励为$-\left(\pi^{2}+0.1 * 8^{2}+0.001 * 2^{2}\right)= -16.2736044$,最高奖励为0。同 CartPole-v0 环境一样,达到最优算法的情况下,每回合的步数是无限的,因此这里设定每回合最大步数为200以便于训练。
|
||||
|
||||
## DDPG 基本接口
|
||||
|
||||
我们依然使用接口的概念,通过伪代码分析并实现 DDPG 的训练模式,如下:
|
||||
|
||||
> 初始化评论家网络$Q\left(s, a \mid \theta^{Q}\right)$和演员网络$\mu\left(s \mid \theta^{\mu}\right)$,其权重分别为$\theta^{Q}$和$\theta^{\mu}$
|
||||
>
|
||||
> 初始化目标网络$Q'$和$\mu'$,并复制权重$\theta^{Q^{\prime}} \leftarrow \theta^{Q}, \theta^{\mu^{\prime}} \leftarrow \theta^{\mu}$
|
||||
>
|
||||
> 初始化经验回放缓冲区$R$
|
||||
>
|
||||
> 执行$M$个回合循环,对于每个回合:
|
||||
>
|
||||
> * 初始化动作探索的的随机过程即噪声$\mathcal{N}$
|
||||
>
|
||||
> * 初始化状态$s_1$
|
||||
>
|
||||
> 循环$T$个时间步长,对于每个时步$
|
||||
>
|
||||
> * 根据当前策略和噪声选择动作$a_{t}=\mu\left(s_{t} \mid \theta^{\mu}\right)+\mathcal{N}_{t}$
|
||||
> * 执行动作$a_t$并得到反馈$r_t$和下一个状态$s_{t+1}$
|
||||
> * 存储转移$\left(s_{t}, a_{t}, r_{t}, s_{t+1}\right)$到经验缓冲$R$中
|
||||
> * (更新策略)从$D$随机采样一个小批量的转移
|
||||
> * (更新策略)计算实际的Q值$y_{i}=r_{i}+\gamma Q^{\prime}\left(s_{i+1}, \mu^{\prime}\left(s_{i+1} \mid \theta^{\mu^{\prime}}\right) \mid \theta^{Q^{\prime}}\right)$
|
||||
> * (更新策略)对损失函数$L=\frac{1}{N} \sum_{i}\left(y_{i}-Q\left(s_{i}, a_{i} \mid \theta^{Q}\right)\right)^{2}$关于参数$\theta$做梯度下降用于更新评论家网络
|
||||
> * (更新策略)使用采样梯度更新演员网络的策略:$\left.\left.\nabla_{\theta^{\mu}} J \approx \frac{1}{N} \sum_{i} \nabla_{a} Q\left(s, a \mid \theta^{Q}\right)\right|_{s=s_{i}, a=\mu\left(s_{i}\right)} \nabla_{\theta^{\mu}} \mu\left(s \mid \theta^{\mu}\right)\right|_{s_{i}}$
|
||||
> * (更新策略)更新目标网络:$\theta^{Q^{\prime}} \leftarrow \tau \theta^{Q}+(1-\tau) \theta^{Q^{\prime}}$,$\theta^{\mu^{\prime}} \leftarrow \tau \theta^{\mu}+(1-\tau) \theta^{\mu^{\prime}}$
|
||||
|
||||
代码如下:
|
||||
|
||||
```python
|
||||
ou_noise = OUNoise(env.action_space) # 动作噪声
|
||||
rewards = [] # 记录奖励
|
||||
ma_rewards = [] # 记录滑动平均奖励
|
||||
for i_ep in range(cfg.train_eps):
|
||||
state = env.reset()
|
||||
ou_noise.reset()
|
||||
done = False
|
||||
ep_reward = 0
|
||||
i_step = 0
|
||||
while not done:
|
||||
i_step += 1
|
||||
action = agent.choose_action(state)
|
||||
action = ou_noise.get_action(action, i_step)
|
||||
next_state, reward, done, _ = env.step(action)
|
||||
ep_reward += reward
|
||||
agent.memory.push(state, action, reward, next_state, done)
|
||||
agent.update()
|
||||
state = next_state
|
||||
if (i_ep+1)%10 == 0:
|
||||
print('回合:{}/{},奖励:{}'.format(i_ep+1, cfg.train_eps, ep_reward))
|
||||
rewards.append(ep_reward)
|
||||
if ma_rewards:
|
||||
ma_rewards.append(0.9*ma_rewards[-1]+0.1*ep_reward)
|
||||
else:
|
||||
ma_rewards.append(ep_reward)
|
||||
```
|
||||
|
||||
相比于 DQN ,DDPG 主要多了两处修改,一个是给动作施加噪声,另外一个是软更新策略,即最后一步。
|
||||
|
||||
## Ornstein-Uhlenbeck噪声
|
||||
|
||||
OU 噪声适用于惯性系统,尤其是时间离散化粒度较小的情况。 OU 噪声是一种随机过程,下面略去证明,直接给出公式:
|
||||
$$
|
||||
x(t+\Delta t)=x(t)-\theta(x(t)-\mu) \Delta t+\sigma W_t
|
||||
$$
|
||||
其中 $W_t$ 属于正太分布,进而代码实现如下:
|
||||
|
||||
```python
|
||||
class OUNoise(object):
|
||||
'''Ornstein–Uhlenbeck噪声
|
||||
'''
|
||||
def __init__(self, action_space, mu=0.0, theta=0.15, max_sigma=0.3, min_sigma=0.3, decay_period=100000):
|
||||
self.mu = mu # OU噪声的参数
|
||||
self.theta = theta # OU噪声的参数
|
||||
self.sigma = max_sigma # OU噪声的参数
|
||||
self.max_sigma = max_sigma
|
||||
self.min_sigma = min_sigma
|
||||
self.decay_period = decay_period
|
||||
self.action_dim = action_space.shape[0]
|
||||
self.low = action_space.low
|
||||
self.high = action_space.high
|
||||
self.reset()
|
||||
def reset(self):
|
||||
self.obs = np.ones(self.action_dim) * self.mu
|
||||
def evolve_obs(self):
|
||||
x = self.obs
|
||||
dx = self.theta * (self.mu - x) + self.sigma * np.random.randn(self.action_dim)
|
||||
self.obs = x + dx
|
||||
return self.obs
|
||||
def get_action(self, action, t=0):
|
||||
ou_obs = self.evolve_obs()
|
||||
self.sigma = self.max_sigma - (self.max_sigma - self.min_sigma) * min(1.0, t / self.decay_period) # sigma会逐渐衰减
|
||||
return np.clip(action + ou_obs, self.low, self.high) # 动作加上噪声后进行剪切
|
||||
```
|
||||
|
||||
## DDPG算法
|
||||
|
||||
DDPG算法主要也包括两个功能,一个是选择动作,另外一个是更新策略,首先看选择动作:
|
||||
|
||||
```python
|
||||
def choose_action(self, state):
|
||||
state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
|
||||
action = self.actor(state)
|
||||
return action.detach().cpu().numpy()[0, 0]
|
||||
```
|
||||
|
||||
由于DDPG是直接从演员网络取得动作,所以这里不用$\epsilon-greedy$策略。在更新策略函数中,也会跟DQN稍有不同,并且加入软更新:
|
||||
|
||||
```python
|
||||
def update(self):
|
||||
if len(self.memory) < self.batch_size: # 当 memory 中不满足一个批量时,不更新策略
|
||||
return
|
||||
# 从经验回放中(replay memory)中随机采样一个批量的转移(transition)
|
||||
state, action, reward, next_state, done = self.memory.sample(self.batch_size)
|
||||
# 转变为张量
|
||||
state = torch.FloatTensor(state).to(self.device)
|
||||
next_state = torch.FloatTensor(next_state).to(self.device)
|
||||
action = torch.FloatTensor(action).to(self.device)
|
||||
reward = torch.FloatTensor(reward).unsqueeze(1).to(self.device)
|
||||
done = torch.FloatTensor(np.float32(done)).unsqueeze(1).to(self.device)
|
||||
|
||||
policy_loss = self.critic(state, self.actor(state))
|
||||
policy_loss = -policy_loss.mean()
|
||||
next_action = self.target_actor(next_state)
|
||||
target_value = self.target_critic(next_state, next_action.detach())
|
||||
expected_value = reward + (1.0 - done) * self.gamma * target_value
|
||||
expected_value = torch.clamp(expected_value, -np.inf, np.inf)
|
||||
|
||||
value = self.critic(state, action)
|
||||
value_loss = nn.MSELoss()(value, expected_value.detach())
|
||||
|
||||
self.actor_optimizer.zero_grad()
|
||||
policy_loss.backward()
|
||||
self.actor_optimizer.step()
|
||||
self.critic_optimizer.zero_grad()
|
||||
value_loss.backward()
|
||||
self.critic_optimizer.step()
|
||||
# 软更新
|
||||
for target_param, param in zip(self.target_critic.parameters(), self.critic.parameters()):
|
||||
target_param.data.copy_(
|
||||
target_param.data * (1.0 - self.soft_tau) +
|
||||
param.data * self.soft_tau
|
||||
)
|
||||
for target_param, param in zip(self.target_actor.parameters(), self.actor.parameters()):
|
||||
target_param.data.copy_(
|
||||
target_param.data * (1.0 - self.soft_tau) +
|
||||
param.data * self.soft_tau
|
||||
)
|
||||
```
|
||||
|
||||
## 结果分析
|
||||
|
||||
实现算法之后,我们先看看训练效果:
|
||||
|
||||

|
||||
|
||||
可以看到算法整体上是达到收敛了的,但是稳定状态下波动还比较大,依然有提升的空间,限于笔者的精力,这里只是帮助赌注实现一个基础的代码演示,想要使得算法调到最优感兴趣的读者可以多思考实现。我们再来看看测试的结果:
|
||||
|
||||

|
||||
|
||||
从图中看出测试的平均奖励在-150左右,但其实训练的时候平均的稳态奖励在-300左右,这是因为测试的时候我们舍去了OU噪声的缘故。
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
在练习本项目之前,可以先回顾一下之前的项目实战,即使用Q学习解决悬崖寻路问题。本项目将具体实现DQN算法来解决推车杆问题,对应的模拟环境为Open AI Gym中的[CartPole-v0](https://datawhalechina.github.io/easy-rl/#/chapter7/project2?id=cartpole-v0),我们同样先对该环境做一个简要说明。
|
||||
|
||||
## CartPole-v0环境简介
|
||||
## CartPole-v0 简介
|
||||
|
||||
CartPole-v0是一个经典的入门环境,如下图,它通过向左(动作=0)或向右(动作=1)推动推车来实现竖直杆的平衡,每次实施一个动作后如果能够继续保持平衡就会得到一个+1的奖励,否则杆将无法保持平衡而导致游戏结束。
|
||||
|
||||
@@ -28,15 +28,64 @@ print(f"初始状态:{state}")
|
||||
初始状态:[ 0.03073904 0.00145001 -0.03088818 -0.03131252]
|
||||
```
|
||||
|
||||
该环境状态数是四个,分别为车的位置、车的速度、杆的角度以及杆顶部的速度,动作数为两个,并且是离散的向左或者向右。
|
||||
该环境状态数是四个,分别为车的位置、车的速度、杆的角度以及杆顶部的速度,动作数为两个,并且是离散的向左或者向右。理论上达到最优化算法的情况下,推车杆是一直能保持平衡的,也就是每回合的步数是无限,但是这不方便训练,所以环境内部设置了每回合的最大步数为200,也就是说理想情况下,只需要我们每回合的奖励达到200就算训练完成。
|
||||
|
||||
## DQN基本接口
|
||||
|
||||
介绍完环境之后,我们沿用接口的概念,通过分析伪代码来实现DQN的基本训练模式,以及一些要素比如建立什么网络需要什么模块等等。我们现在常用的DQN伪代码如下:
|
||||
|
||||

|
||||
> 初始化经验回放缓冲区(replay memory)$D$,容量(capacity)为$N$
|
||||
>
|
||||
> 初始化状态-动作函数,即带有初始随机权重$\theta$的$Q$网络
|
||||
>
|
||||
> 初始化目标状态-动作函数,即带有初始随机权重$\theta^-$的$\hat{Q}$网络,且$\theta^-=\theta$
|
||||
>
|
||||
> 执行$M$个回合循环,对于每个回合
|
||||
>
|
||||
> * 初始化环境,得到初始状态$s_1$
|
||||
> * 循环$T$个时间步长,对于每个时步$t$
|
||||
> * 使用$\epsilon-greedy$策略选择动作$a_t$
|
||||
> * 环境根据$a_t$反馈当前的奖励$r_t$和下一个状态$s_{t+1}$
|
||||
> * 更新状态$s_{t+1}=s_t$
|
||||
> * 存储转移(transition)即$(s_t,a_t,r-t,s_{t+1})$到经验回放$D$中
|
||||
> * (更新策略)从$D$随机采样一个小批量的转移
|
||||
> * (更新策略)计算实际的Q值$y_{j}=\left\{\begin{array}{cc}r_{j} & \text { 如果回合在时步 j+1终止 }\\ r_{j}+\gamma \max _{a^{\prime}} \hat{Q}\left(\phi_{j+1}, a^{\prime} ; \theta^{-}\right) & \text {否则 }\end{array}\right.$
|
||||
> * (更新策略)对损失函数$\left(y_{j}-Q\left(\phi_{j}, a_{j} ; \theta\right)\right)^{2}$关于参数$\theta$做梯度下降
|
||||
> * (更新策略)每$C$步重置$\hat{Q}=Q$
|
||||
|
||||
与传统的Q学习算法相比,DQN使用神经网络来代替之前的Q表格从而存储更多的信息,且由于使用了神经网络所以我们一般需要利用随机梯度下降来优化Q值的预测。此外多了经验回放缓冲区(replay memory),并且使用两个网络,即目标网络和当前网络。
|
||||
用代码来实现的话如下:
|
||||
|
||||
```python
|
||||
rewards = [] # 记录奖励
|
||||
ma_rewards = [] # 记录滑动平均奖励
|
||||
for i_ep in range(cfg.train_eps):
|
||||
state = env.reset()
|
||||
done = False
|
||||
ep_reward = 0
|
||||
while True:
|
||||
action = agent.choose_action(state)
|
||||
next_state, reward, done, _ = env.step(action)
|
||||
ep_reward += reward
|
||||
agent.memory.push(state, action, reward, next_state, done)
|
||||
state = next_state
|
||||
agent.update()
|
||||
if done:
|
||||
break
|
||||
if (i_ep+1) % cfg.target_update == 0:
|
||||
agent.target_net.load_state_dict(agent.policy_net.state_dict())
|
||||
if (i_ep+1)%10 == 0:
|
||||
print('回合:{}/{}, 奖励:{}'.format(i_ep+1, cfg.train_eps, ep_reward))
|
||||
rewards.append(ep_reward)
|
||||
# save ma_rewards
|
||||
if ma_rewards:
|
||||
ma_rewards.append(0.9*ma_rewards[-1]+0.1*ep_reward)
|
||||
else:
|
||||
ma_rewards.append(ep_reward)
|
||||
```
|
||||
|
||||
|
||||
|
||||
可以看到,DQN的训练模式其实和大多强化学习算法是一样的套路,但与传统的Q学习算法相比,DQN使用神经网络来代替之前的Q表格从而存储更多的信息,且由于使用了神经网络所以我们一般需要利用随机梯度下降来优化Q值的预测。此外多了经验回放缓冲区(replay memory),并且使用两个网络,即目标网络和当前网络。
|
||||
|
||||
## 经验回放缓冲区
|
||||
|
||||
@@ -62,5 +111,98 @@ class ReplayBuffer:
|
||||
batch = random.sample(self.buffer, batch_size) # 随机采出小批量转移
|
||||
state, action, reward, next_state, done = zip(*batch) # 解压成状态,动作等
|
||||
return state, action, reward, next_state, done
|
||||
def __len__(self):
|
||||
''' 返回当前存储的量
|
||||
'''
|
||||
return len(self.buffer)
|
||||
```
|
||||
|
||||
## Q网络
|
||||
|
||||
在DQN中我们使用神经网络替代原有的Q表,从而能够存储更多的Q值,实现更为高级的策略以便用于复杂的环境,这里我们用的是一个三层的感知机或者说全连接网络:
|
||||
|
||||
```python
|
||||
class MLP(nn.Module):
|
||||
def __init__(self, input_dim,output_dim,hidden_dim=128):
|
||||
""" 初始化q网络,为全连接网络
|
||||
input_dim: 输入的特征数即环境的状态数
|
||||
output_dim: 输出的动作维度
|
||||
"""
|
||||
super(MLP, self).__init__()
|
||||
self.fc1 = nn.Linear(input_dim, hidden_dim) # 输入层
|
||||
self.fc2 = nn.Linear(hidden_dim,hidden_dim) # 隐藏层
|
||||
self.fc3 = nn.Linear(hidden_dim, output_dim) # 输出层
|
||||
|
||||
def forward(self, x):
|
||||
# 各层对应的激活函数
|
||||
x = F.relu(self.fc1(x))
|
||||
x = F.relu(self.fc2(x))
|
||||
return self.fc3(x)
|
||||
```
|
||||
|
||||
学过深度学习的同学应该都对这个网络十分熟悉,在强化学习中,网络的输入一般是状态,输出则是一个动作,假如总共有两个动作,那么这里的动作维度就是2,可能的输出就是0或1,一般我们用ReLU作为激活函数。根据实际需要也可以改变神经网络的模型结构等等,比如若我们使用图像作为输入的话,这里可以使用卷积神经网络(CNN)。
|
||||
|
||||
## DQN算法
|
||||
|
||||
跟前面的项目实战一样,DQN算法一般也包括选择动作和更新策略两个函数,首先我们看选择动作:
|
||||
|
||||
```python
|
||||
def choose_action(self, state):
|
||||
'''选择动作
|
||||
'''
|
||||
self.frame_idx += 1
|
||||
if random.random() > self.epsilon(self.frame_idx):
|
||||
with torch.no_grad():
|
||||
state = torch.tensor([state], device=self.device, dtype=torch.float32)
|
||||
q_values = self.policy_net(state)
|
||||
action = q_values.max(1)[1].item() # 选择Q值最大的动作
|
||||
else:
|
||||
action = random.randrange(self.action_dim)
|
||||
```
|
||||
|
||||
可以看到跟Q学习算法其实是一样的,都是用的$\epsilon-greedy$策略,只是使用神经网络的话我们需要通过Torch或者Tensorflow工具来处理相应的数据。
|
||||
|
||||
而DQN更新策略的步骤稍微复杂一点,主要包括三个部分:随机采样,计算期望Q值和梯度下降,如下:
|
||||
|
||||
```python
|
||||
def update(self):
|
||||
if len(self.memory) < self.batch_size: # 当memory中不满足一个批量时,不更新策略
|
||||
return
|
||||
# 从经验回放中(replay memory)中随机采样一个批量的转移(transition)
|
||||
state_batch, action_batch, reward_batch, next_state_batch, done_batch = self.memory.sample(
|
||||
self.batch_size)
|
||||
# 转为张量
|
||||
state_batch = torch.tensor(
|
||||
state_batch, device=self.device, dtype=torch.float)
|
||||
action_batch = torch.tensor(action_batch, device=self.device).unsqueeze(
|
||||
1)
|
||||
reward_batch = torch.tensor(
|
||||
reward_batch, device=self.device, dtype=torch.float)
|
||||
next_state_batch = torch.tensor(
|
||||
next_state_batch, device=self.device, dtype=torch.float)
|
||||
done_batch = torch.tensor(np.float32(
|
||||
done_batch), device=self.device)
|
||||
q_values = self.policy_net(state_batch).gather(dim=1, index=action_batch) # 计算当前状态(s_t,a)对应的Q(s_t, a)
|
||||
next_q_values = self.target_net(next_state_batch).max(1)[0].detach() # 计算下一时刻的状态(s_t_,a)对应的Q值
|
||||
# 计算期望的Q值,对于终止状态,此时done_batch[0]=1, 对应的expected_q_value等于reward
|
||||
expected_q_values = reward_batch + self.gamma * next_q_values * (1-done_batch)
|
||||
loss = nn.MSELoss()(q_values, expected_q_values.unsqueeze(1)) # 计算均方根损失
|
||||
# 优化更新模型
|
||||
self.optimizer.zero_grad()
|
||||
loss.backward()
|
||||
for param in self.policy_net.parameters(): # clip防止梯度爆炸
|
||||
param.grad.data.clamp_(-1, 1)
|
||||
self.optimizer.step()
|
||||
```
|
||||
|
||||
## 结果分析
|
||||
|
||||
完成代码之后,我们先来看看DQN算法的训练效果,曲线如下:
|
||||
|
||||

|
||||
|
||||
从图中看出,算法其实已经在60回合左右达到收敛,最后一直维持在最佳奖励200左右,可能会有轻微的波动,这是因为我们在收敛的情况下依然保持了一定的探索率,即epsilon_end=0.01。现在我们可以载入模型看看测试的效果:
|
||||
|
||||

|
||||
|
||||
我们测试了30个回合,每回合都保持在200左右,说明我们的模型学习得不错了!
|
||||
Reference in New Issue
Block a user