Files
easy-rl/codes/DQN/task0.py
2022-05-31 01:20:58 +08:00

178 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import sys
import os
import torch.nn as nn
import torch.nn.functional as F
curr_path = os.path.dirname(os.path.abspath(__file__)) # 当前文件所在绝对路径
parent_path = os.path.dirname(curr_path) # 父路径
sys.path.append(parent_path) # 添加路径到系统路径
import gym
import torch
import datetime
import numpy as np
from common.utils import save_results_1, make_dir
from common.utils import plot_rewards
from dqn import DQN
curr_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") # 获取当前时间
class MLP(nn.Module):
def __init__(self, n_states,n_actions,hidden_dim=128):
""" 初始化q网络为全连接网络
n_states: 输入的特征数即环境的状态维度
n_actions: 输出的动作维度
"""
super(MLP, self).__init__()
self.fc1 = nn.Linear(n_states, hidden_dim) # 输入层
self.fc2 = nn.Linear(hidden_dim,hidden_dim) # 隐藏层
self.fc3 = nn.Linear(hidden_dim, n_actions) # 输出层
def forward(self, x):
# 各层对应的激活函数
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
return self.fc3(x)
class Config:
'''超参数
'''
def __init__(self):
############################### hyperparameters ################################
self.algo_name = 'DQN' # algorithm name
self.env_name = 'CartPole-v0' # environment name
self.device = torch.device(
"cuda" if torch.cuda.is_available() else "cpu") # check GPU
self.seed = 10 # 随机种子置0则不设置随机种子
self.train_eps = 200 # 训练的回合数
self.test_eps = 20 # 测试的回合数
################################################################################
################################## 算法超参数 ###################################
self.gamma = 0.95 # 强化学习中的折扣因子
self.epsilon_start = 0.90 # e-greedy策略中初始epsilon
self.epsilon_end = 0.01 # e-greedy策略中的终止epsilon
self.epsilon_decay = 500 # e-greedy策略中epsilon的衰减率
self.lr = 0.0001 # 学习率
self.memory_capacity = 100000 # 经验回放的容量
self.batch_size = 64 # mini-batch SGD中的批量大小
self.target_update = 4 # 目标网络的更新频率
self.hidden_dim = 256 # 网络隐藏层
################################################################################
################################# 保存结果相关参数 ################################
self.result_path = curr_path + "/outputs/" + self.env_name + \
'/' + curr_time + '/results/' # 保存结果的路径
self.model_path = curr_path + "/outputs/" + self.env_name + \
'/' + curr_time + '/models/' # 保存模型的路径
self.save = True # 是否保存图片
################################################################################
def env_agent_config(cfg):
''' 创建环境和智能体
'''
env = gym.make(cfg.env_name) # 创建环境
n_states = env.observation_space.shape[0] # 状态维度
n_actions = env.action_space.n # 动作维度
print(f"n states: {n_states}, n actions: {n_actions}")
model = MLP(n_states,n_actions)
agent = DQN(n_actions, model, cfg) # 创建智能体
if cfg.seed !=0: # 设置随机种子
torch.manual_seed(cfg.seed)
env.seed(cfg.seed)
np.random.seed(cfg.seed)
return env, agent
def train(cfg, env, agent):
''' 训练
'''
print('开始训练!')
print(f'环境:{cfg.env_name}, 算法:{cfg.algo_name}, 设备:{cfg.device}')
rewards = [] # 记录所有回合的奖励
ma_rewards = [] # 记录所有回合的滑动平均奖励
steps = []
for i_ep in range(cfg.train_eps):
ep_reward = 0 # 记录一回合内的奖励
ep_step = 0
state = env.reset() # 重置环境,返回初始状态
while True:
ep_step += 1
action = agent.choose_action(state) # 选择动作
next_state, reward, done, _ = env.step(action) # 更新环境返回transition
agent.memory.push(state, action, reward,
next_state, done) # 保存transition
state = next_state # 更新下一个状态
agent.update() # 更新智能体
ep_reward += reward # 累加奖励
if done:
break
if (i_ep + 1) % cfg.target_update == 0: # 智能体目标网络更新
agent.target_net.load_state_dict(agent.policy_net.state_dict())
steps.append(ep_step)
rewards.append(ep_reward)
if ma_rewards:
ma_rewards.append(0.9 * ma_rewards[-1] + 0.1 * ep_reward)
else:
ma_rewards.append(ep_reward)
if (i_ep + 1) % 1 == 0:
print(f'Episode{i_ep+1}/{cfg.test_eps}, Reward:{ep_reward:.2f}, Step:{ep_step:.2f} Epislon:{agent.epsilon(agent.frame_idx):.3f}')
print('Finish training!')
env.close()
res_dic = {'rewards':rewards,'ma_rewards':ma_rewards,'steps':steps}
return res_dic
def test(cfg, env, agent):
print('开始测试!')
print(f'环境:{cfg.env_name}, 算法:{cfg.algo_name}, 设备:{cfg.device}')
############# 由于测试不需要使用epsilon-greedy策略所以相应的值设置为0 ###############
cfg.epsilon_start = 0.0 # e-greedy策略中初始epsilon
cfg.epsilon_end = 0.0 # e-greedy策略中的终止epsilon
################################################################################
rewards = [] # 记录所有回合的奖励
ma_rewards = [] # 记录所有回合的滑动平均奖励
steps = []
for i_ep in range(cfg.test_eps):
ep_reward = 0 # 记录一回合内的奖励
ep_step = 0
state = env.reset() # 重置环境,返回初始状态
while True:
ep_step+=1
action = agent.choose_action(state) # 选择动作
next_state, reward, done, _ = env.step(action) # 更新环境返回transition
state = next_state # 更新下一个状态
ep_reward += reward # 累加奖励
if done:
break
steps.append(ep_step)
rewards.append(ep_reward)
if ma_rewards:
ma_rewards.append(ma_rewards[-1] * 0.9 + ep_reward * 0.1)
else:
ma_rewards.append(ep_reward)
print(f'Episode{i_ep+1}/{cfg.train_eps}, Reward:{ep_reward:.2f}, Step:{ep_step:.2f}')
print('完成测试!')
env.close()
return {'rewards':rewards,'ma_rewards':ma_rewards,'steps':steps}
if __name__ == "__main__":
cfg = Config()
# 训练
env, agent = env_agent_config(cfg)
res_dic = train(cfg, env, agent)
make_dir(cfg.result_path, cfg.model_path) # 创建保存结果和模型路径的文件夹
agent.save(path=cfg.model_path) # 保存模型
save_results_1(res_dic, tag='train',
path=cfg.result_path) # 保存结果
plot_rewards(res_dic['rewards'], res_dic['ma_rewards'], cfg, tag="train") # 画出结果
# 测试
env, agent = env_agent_config(cfg)
agent.load(path=cfg.model_path) # 导入模型
res_dic = test(cfg, env, agent)
save_results_1(res_dic, tag='test',
path=cfg.result_path) # 保存结果
plot_rewards(res_dic['rewards'], res_dic['ma_rewards'],cfg, tag="test") # 画出结果