强化学习|强化学习笔记(五)Pytorch实现简单DQN


强化学习笔记(五)Pytorch实现简单DQN

  • Q1:值函数近似的形式和意义?
  • Q2:梯度下降法公式中的真值 V π ( s ) V_{\pi}(s) Vπ?(s)和 q π ( S , A ) q_{\pi}(S,A) qπ?(S,A)是如何处理的?
  • Q3:如何理解DQN中的经验回放(Experience Relay)机制?
  • Q4:Pytorch实现一个简单的Q-Network

表格型的近似求解方法只适用于小规模的问题。对于复杂庞大状态-动作空间的问题,我们不可能有足够的内存去存放和维护这样一个Q_Table. 因此强化学习和深度学习的结合是发展历程中必经的道路,而在Atari游戏,围棋比赛等良好的表现让DNN+RL的模式充满了潜力。
UCL本节课的分标题是Incremental Methods和Batch Methods,区别也是一目了然。前者是阶段性(step by step)地优化函数,比如蒙卡走完一个episode就更新所有,TD直接每个状态就可以更新一下。而Batch的方法跟深度学习里的批处理是一样的,取用的是价值误差的最小二乘作为损失函数,批量地去更新参数。实际问题通常还是取后者,因为前者更新完一个数据就丢弃了,后者则是对总体做了一个总结,而且计算效率更高。
Q1:值函数近似的形式和意义? 强化学习|强化学习笔记(五)Pytorch实现简单DQN
文章图片
值函数的表示主要有以上三种,最常用的是最后一种,原因有二:1. 输入简单;2. 输出的是输入状态对应的动作价值函数,可以直接决策。
本质上这就是一个黑箱模型,整个流程就是对价值函数进行再编码的过程。只有w参数的个数远远小于q的个数,这个数据压缩才是有意义的。
上课时还提到了,Q表的方法跟值函数近似二者并不是分离的。Q表可以看成是一种特殊的Value Approximation. 针对第一个模型,我们如果把状态向量S用One-Hot向量表示,权重的维度即为状态维度n,而权重的值也就是各个状态价值。后两种模型其实是同理,相当于一个直接映射,没有做任何处理。
Q2:梯度下降法公式中的真值 V π ( s ) V_{\pi}(s) Vπ?(s)和 q π ( S , A ) q_{\pi}(S,A) qπ?(S,A)是如何处理的? 课上首先以线性函数近似为例,推导了梯度下降的增量公式。我们的目的就是要让黑箱模型尽可能准确地模拟Value Function。因此在优化函数 J ( w ) J(w) J(w)中使用了二范数去逼近真值 V π ( s ) V_{\pi}(s) Vπ?(s)。在监督学习里,我们有原始数据和标签,做分类我们有类别标签,做分割我们有分割的Ground Truth;但是强化学习里我们没有真值,只有Reward. 因此问题来了,我们根本不知道准确值是多少,怎么去逼近它?
强化学习|强化学习笔记(五)Pytorch实现简单DQN
文章图片
答案还是很简单,从经验中选择正确的结果。对于蒙特卡洛学习,使用收获作为返回值;对于TD,我们有TD(0)公式和TD( λ \lambda λ)公式。把这种返回值作为Supervisor信号。
强化学习|强化学习笔记(五)Pytorch实现简单DQN
文章图片
比如对一个蒙卡序列 < S 1 , G 1 > , < S 2 , G 2 > , . . . , < S T , G T > , ,... , ,,...,,我们可以把这些数据作为训练集,尽量让每个Gain都适用这个黑箱模型。
强化学习|强化学习笔记(五)Pytorch实现简单DQN
文章图片
对于TD模型来说,这是个有偏模型。因为我们要用贝尔曼方程,而下一个时刻的状态价值 v ^ ( S t + 1 , w ) \hat{v}(S_{t+1}, \textbf w) v^(St+1?,w)我们得通过黑箱模型去模拟,跟蒙卡相比并不是走一个Episode之后的真实收获。
强化学习|强化学习笔记(五)Pytorch实现简单DQN
文章图片
最后重申一点:一般使用动作价值函数比状态价值函数更好,因为它可以直接反映决策。
Q3:如何理解DQN中的经验回放(Experience Relay)机制? 经验回放机制对应子标题的Batch methods. 在普通的q-learning中,我们更新迭代的是所有的价值函数,而且是不连续地,把每个状态下的价值函数往TD Target处偏移。但是加入Deep Q Network,我们一次更新的参数变成了维数很高的权重 w \textbf w w. 因为要更新的项数很多,所以出现了梯度法。
经验回放即将每次和环境交互得到的奖励与状态更新情况都保存起来(即一个transition,{ ? ( S ) , A , R , ? ( S ′ ) \phi(S), A, R, \phi(S') ?(S),A,R,?(S′)}). 用于后面目标Q值的更新。在每次更新的时候,我们随机挑选经验回放中的m个序列计算当前目标Q值(监督信号),从而更新权重 w \textbf w w.
Q4:Pytorch实现一个简单的Q-Network 我参考了刘老师的代码,地址为:https://github.com/ljpzzz/machinelearning/blob/master/reinforcement-learning/dqn.py
刘老师使用的环境是python3.6 + tensorflow 1.8.0 .按道理说只要tensorflow是1x版本,同时装了gym的游戏库基本都可以顺利地运行。
我在这篇代码的基础上做了些改动,把网络用pytorch重新搭建了,输入的状态向量、动作向量等转化成了torch_tensor;添加了GPU训练方式。
这个游戏名叫CartPole-v0. 在控制领域其实就是一个单极倒立摆控制问题。其实很多经典控制理论已经将其解决了,像PID控制。控制方法解决的思路比较严谨,首先获取摆杆的质量、重心、导轨的摩擦力等等参数,对系统物理建模,然后再分析它的平衡状态,用控制理论去求解。
而强化学习显得简单粗暴,我直接去疯狂地尝试,最后总结经验。二者其实就是分别从正面和反面去求解问题。控制方法不会允许失败,它通过不断去分析问题的本源,以解释性和理论的角度直到问题破解。强化学习方法更像是一种复盘,从失败中学习,总结经验。 二者各有好坏吧。
这里有些标题党了,说是DQN,实际上网络就是个两层的感知机,一个输入层,一个隐藏层和一个输出层。隐藏层的节点有20个。
运行结果如下,首先训练的时候会调出一个游戏交互框,这样可以实时地看到快速的游戏过程。在运行窗口也会显示每100个episode游戏后获得的平均奖励,可以看到前400个epoch奖励是一直上升的,也就是游戏水平在不断进步。
强化学习|强化学习笔记(五)Pytorch实现简单DQN
文章图片
因为训练采样是随机的,所以结果好坏要看运气。有时候很快收敛了,有时候又很慢。另外使用CUDA加速后,如果同时添加了env.render()渲染画面可能出现卡死的情况。我猜测是游戏时间很快,有时候一把很快就死了,训练过程快于画面展示,所以要等待画面渲染的时间。建议训练的时候关掉render. 我这里使用env.unwrapped()解除了游戏最高为200分的限制,可以看到第400轮跑到了223.5分。
强化学习|强化学习笔记(五)Pytorch实现简单DQN
文章图片
【强化学习|强化学习笔记(五)Pytorch实现简单DQN】放一下代码,我使用的Conda环境是Python3.7 + Pytorch1.4.0 + Gym 0.17.1
""" @ Author: Peter Xiao @ Date: 2020.7.16 @ Filename: dqn.py @ Brief: 使用 DQN训练CartPole-v0 """import gym import torch import torch.nn as nn import torch.nn.functional as F import numpy as np import random import time from collections import deque# Hyper Parameters for DQN GAMMA = 0.9# discount factor for target Q INITIAL_EPSILON = 0.5# starting value of epsilon FINAL_EPSILON = 0.01# final value of epsilon REPLAY_SIZE = 10000# experience replay buffer size EXPLORE = 10000 BATCH_SIZE = 32# size of minibatch LR = 0.0001# learning rate# Use GPU device = torch.device("cuda" if torch.cuda.is_available() else "cpu") torch.backends.cudnn.enabled = False# 非确定性算法class QNetwork(nn.Module): def __init__(self, state_dim, action_dim): super(QNetwork, self).__init__() self.fc1 = nn.Linear(state_dim, 20) self.fc2 = nn.Linear(20, action_dim)def forward(self, x): out = F.relu(self.fc1(x)) out = self.fc2(out) return outdef initialize_weights(self): for m in self.modules(): nn.init.normal_(m.weight.data, 0, 0.1) m.bias.data.zero_()class DQN(object): # dqn Agent def __init__(self, env):# 初始化 # 状态空间和动作空间的维度 self.state_dim = env.observation_space.shape[0] self.action_dim = env.action_space.n# init experience replay self.replay_buffer = deque()# 经验回放池# init network parameters self.network = QNetwork(state_dim=self.state_dim, action_dim=self.action_dim).to(device) self.optimizer = torch.optim.Adam(self.network.parameters(), lr=LR) self.loss_func = nn.MSELoss()# init some parameters self.time_step = 0 self.epsilon = INITIAL_EPSILON# epsilon值是随机不断变小的def perceive(self, state, action, reward, next_state, done): one_hot_action = np.zeros(self.action_dim)# 用独热向量保存动作 one_hot_action[action] = 1# 选中的动作为1,其余为0 # 将该Transition保存到经验回放池 self.replay_buffer.append((state, one_hot_action, reward, next_state, done)) if len(self.replay_buffer) > REPLAY_SIZE:# 如果经验回放池溢出,扔掉左边的数据 self.replay_buffer.popleft()if len(self.replay_buffer) > BATCH_SIZE:# 只有经验回放池大于mini_batch数了才能采样训练 self.train_Q_network()def train_Q_network(self): self.time_step += 1# Step 1: obtain random minibatch from replay memory minibatch = random.sample(self.replay_buffer, BATCH_SIZE)# 32的list state_batch = torch.FloatTensor([data[0] for data in minibatch]).to(device)# 32*4 action_batch = torch.LongTensor([data[1] for data in minibatch]).to(device)# 32*2 reward_batch = torch.FloatTensor([data[2] for data in minibatch]).to(device)# 32*1 next_state_batch = torch.FloatTensor([data[3] for data in minibatch]).to(device)# 32*4 done = torch.FloatTensor([data[4] for data in minibatch]).to(device)done = done.unsqueeze(1) reward_batch = reward_batch.unsqueeze(1) # q_val = self.network.forward(state_batch)# 32*2 action_index = action_batch.argmax(dim=1).unsqueeze(1)# 32*1 eval_q = self.network.forward(state_batch).gather(1, action_index)# 32*1# Step 2: calculate y Q_value_batch = self.network.forward(next_state_batch) next_action_batch = torch.unsqueeze(torch.max(Q_value_batch, 1)[1], 1) next_q = self.network.forward(next_state_batch).gather(1, next_action_batch)y_batch = reward_batch + GAMMA * next_q * (1 - done) # y_batch = torch.tensor(y_batch).unsqueeze(1)# 更新网络 loss = self.loss_func(eval_q, y_batch) self.optimizer.zero_grad() loss.backward() self.optimizer.step()def egreedy_action(self, state):# epsilon-greedy策略 state = torch.unsqueeze(torch.FloatTensor(state).to(device), 0)# 给state加一个batch_size的维度,此时batch_size为1 Q_value = https://www.it610.com/article/self.network.forward(state.to(device)) # Q_value = self.Q_value.eval(feed_dict={ #self.state_input: [state] #})[0] if random.random() <= self.epsilon: self.epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / 10000 return random.randint(0, self.action_dim - 1) else: self.epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / 10000 return torch.max(Q_value, 1)[1].data.to('cpu').numpy()[0]def action(self, state):# 贪婪选择 state = torch.unsqueeze(torch.FloatTensor(state).to(device), 0)# 给state加一个batch_size的维度,此时batch_size为1 Q_value = https://www.it610.com/article/self.network.forward(state.to(device)) return torch.max(Q_value, 1)[1].data.to('cpu').numpy()[0]# --------------------------------------------------------- # Hyper Parameters ENV_NAME = 'CartPole-v0' EPISODE = 3000# Episode limitation STEP = 300# Step limitation in an episode TEST = 10# The number of experiment test every 100 episodedef main(): # initialize OpenAI Gym env and dqn agent env = gym.make(ENV_NAME) env = env.unwrapped# 打开限制操作 agent = DQN(env)for episode in range(EPISODE): # initialize task state = env.reset() # Train for step in range(STEP): action = agent.egreedy_action(state)# e-greedy action for train next_state, reward, done, _ = env.step(action) # Define reward for agent reward = -1 if done else 0.1 agent.perceive(state, action, reward, next_state, done) state = next_state if done: break # Test every 100 episodes if episode % 100 == 0: total_reward = 0 for i in range(TEST): state = env.reset() for j in range(STEP): env.render() action = agent.action(state)# direct action for test state, reward, done, _ = env.step(action) total_reward += reward if done: break ave_reward = total_reward/TEST print ('episode: ', episode, 'Evaluation Average Reward:', ave_reward)if __name__ == '__main__': time_start = time.time() main() time_end = time.time() print('The total time is ', time_end - time_start)

    推荐阅读