注意
单击此处下载完整的示例代码
强化学习 (DQN) 教程¶
创建于:2017 年 3 月 24 日 | 最后更新:2024 年 6 月 18 日 | 最后验证:2024 年 11 月 05 日
本教程展示了如何使用 PyTorch 在来自 Gymnasium 的 CartPole-v1 任务上训练深度 Q 学习 (DQN) 智能体。
您可能会发现阅读原始的 深度 Q 学习 (DQN) 论文很有帮助
任务
智能体必须在两个动作之间做出决定 - 向左或向右移动小车 - 以使连接在其上的杆保持直立。您可以在 Gymnasium 的网站上找到有关环境和其他更具挑战性的环境的更多信息。

CartPole¶
当智能体观察环境的当前状态并选择一个动作时,环境会转换到新状态,并返回一个奖励,指示动作的后果。在此任务中,每个增量时间步的奖励为 +1,如果杆子倒下太远或小车从中心移动超过 2.4 个单位,则环境终止。这意味着性能更好的场景将运行更长时间,累积更大的回报。
CartPole 任务的设计使得智能体的输入是 4 个实数值,代表环境状态(位置、速度等)。我们采用这 4 个输入,不进行任何缩放,并将它们传递到一个小型全连接网络,该网络具有 2 个输出,每个动作一个输出。该网络经过训练,可以预测给定输入状态下每个动作的预期值。然后选择具有最高预期值的动作。
软件包
首先,让我们导入所需的软件包。首先,我们需要用于环境的 gymnasium,通过使用 pip 安装。这是原始 OpenAI Gym 项目的一个分支,自 Gym v0.19 以来由同一团队维护。如果您在 Google Colab 中运行此代码,请运行
%%bash
pip3 install gymnasium[classic_control]
我们还将使用 PyTorch 中的以下内容
神经网络 (
torch.nn
)优化 (
torch.optim
)自动微分 (
torch.autograd
)
import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
env = gym.make("CartPole-v1")
# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
from IPython import display
plt.ion()
# if GPU is to be used
device = torch.device(
"cuda" if torch.cuda.is_available() else
"mps" if torch.backends.mps.is_available() else
"cpu"
)
回放记忆¶
我们将使用经验回放记忆来训练我们的 DQN。它存储智能体观察到的转换,允许我们稍后重用此数据。通过从中随机抽样,构建批次的转换是不相关的。已经证明,这大大稳定和改进了 DQN 训练过程。
为此,我们将需要两个类
Transition
- 一个命名元组,表示我们环境中的单个转换。它本质上将(状态,动作)对映射到它们的(next_state,奖励)结果,状态是稍后描述的屏幕差异图像。ReplayMemory
- 一个有界大小的循环缓冲区,用于保存最近观察到的转换。它还实现了.sample()
方法,用于选择随机批次的转换以进行训练。
Transition = namedtuple('Transition',
('state', 'action', 'next_state', 'reward'))
class ReplayMemory(object):
def __init__(self, capacity):
self.memory = deque([], maxlen=capacity)
def push(self, *args):
"""Save a transition"""
self.memory.append(Transition(*args))
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
def __len__(self):
return len(self.memory)
现在,让我们定义我们的模型。但首先,让我们快速回顾一下什么是 DQN。
DQN 算法¶
我们的环境是确定性的,因此为了简单起见,此处呈现的所有方程式也以确定性方式制定。在强化学习文献中,它们还将包含对环境中随机转换的期望。
我们的目标是训练一个策略,该策略试图最大化折扣累积奖励 \(R_{t_0} = \sum_{t=t_0}^{\infty} \gamma^{t - t_0} r_t\),其中 \(R_{t_0}\) 也称为回报。折扣 \(\gamma\) 应为 \(0\) 到 \(1\) 之间的常数,以确保总和收敛。较低的 \(\gamma\) 使来自不确定的遥远未来的奖励对于我们的智能体来说不如近未来的奖励重要,因为智能体对近未来的奖励相当有信心。它还鼓励智能体收集时间上更接近的奖励,而不是在时间上遥远未来的等效奖励。
Q 学习背后的主要思想是,如果我们有一个函数 \(Q^*: State \times Action \rightarrow \mathbb{R}\),它可以告诉我们如果在给定状态下采取一个动作,我们的回报会是多少,那么我们可以轻松地构建一个策略来最大化我们的奖励
但是,我们并不了解关于世界的一切,因此我们无法访问 \(Q^*\)。但是,由于神经网络是通用函数逼近器,我们可以简单地创建一个神经网络并训练它以类似于 \(Q^*\)。
对于我们的训练更新规则,我们将使用一个事实,即某些策略的每个 \(Q\) 函数都遵守贝尔曼方程
等式两边之间的差异称为时间差误差 \(\delta\)
为了最小化此误差,我们将使用 Huber 损失。当误差较小时,Huber 损失的行为类似于均方误差,但当误差较大时,Huber 损失的行为类似于平均绝对误差 - 这使得它在 \(Q\) 的估计非常嘈杂时,对异常值更加鲁棒。我们对从回放记忆中采样的批量转换 \(B\) 计算此值
Q 网络¶
我们的模型将是一个前馈神经网络,它接收当前屏幕补丁和先前屏幕补丁之间的差异。它有两个输出,分别代表 \(Q(s, \mathrm{left})\) 和 \(Q(s, \mathrm{right})\)(其中 \(s\) 是网络的输入)。实际上,该网络正在尝试预测给定当前输入的情况下采取每个动作的预期回报。
class DQN(nn.Module):
def __init__(self, n_observations, n_actions):
super(DQN, self).__init__()
self.layer1 = nn.Linear(n_observations, 128)
self.layer2 = nn.Linear(128, 128)
self.layer3 = nn.Linear(128, n_actions)
# Called with either one element to determine next action, or a batch
# during optimization. Returns tensor([[left0exp,right0exp]...]).
def forward(self, x):
x = F.relu(self.layer1(x))
x = F.relu(self.layer2(x))
return self.layer3(x)
训练¶
超参数和实用程序¶
此单元实例化我们的模型及其优化器,并定义一些实用程序
select_action
- 将根据 epsilon 贪婪策略选择一个动作。简而言之,我们有时会使用我们的模型来选择动作,有时我们只会均匀地采样一个动作。选择随机动作的概率将从EPS_START
开始,并将向EPS_END
指数衰减。EPS_DECAY
控制衰减率。plot_durations
- 一个用于绘制情节持续时间的助手,以及最后 100 个情节的平均值(官方评估中使用的度量)。该图将位于包含主训练循环的单元格下方,并将在每个情节后更新。
# BATCH_SIZE is the number of transitions sampled from the replay buffer
# GAMMA is the discount factor as mentioned in the previous section
# EPS_START is the starting value of epsilon
# EPS_END is the final value of epsilon
# EPS_DECAY controls the rate of exponential decay of epsilon, higher means a slower decay
# TAU is the update rate of the target network
# LR is the learning rate of the ``AdamW`` optimizer
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4
# Get number of actions from gym action space
n_actions = env.action_space.n
# Get the number of state observations
state, info = env.reset()
n_observations = len(state)
policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())
optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)
steps_done = 0
def select_action(state):
global steps_done
sample = random.random()
eps_threshold = EPS_END + (EPS_START - EPS_END) * \
math.exp(-1. * steps_done / EPS_DECAY)
steps_done += 1
if sample > eps_threshold:
with torch.no_grad():
# t.max(1) will return the largest column value of each row.
# second column on max result is index of where max element was
# found, so we pick action with the larger expected reward.
return policy_net(state).max(1).indices.view(1, 1)
else:
return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)
episode_durations = []
def plot_durations(show_result=False):
plt.figure(1)
durations_t = torch.tensor(episode_durations, dtype=torch.float)
if show_result:
plt.title('Result')
else:
plt.clf()
plt.title('Training...')
plt.xlabel('Episode')
plt.ylabel('Duration')
plt.plot(durations_t.numpy())
# Take 100 episode averages and plot them too
if len(durations_t) >= 100:
means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
means = torch.cat((torch.zeros(99), means))
plt.plot(means.numpy())
plt.pause(0.001) # pause a bit so that plots are updated
if is_ipython:
if not show_result:
display.display(plt.gcf())
display.clear_output(wait=True)
else:
display.display(plt.gcf())
训练循环¶
最后,用于训练我们模型的代码。
在这里,您可以找到一个 optimize_model
函数,该函数执行优化的单个步骤。它首先对批次进行采样,将所有张量连接成一个张量,计算 \(Q(s_t, a_t)\) 和 \(V(s_{t+1}) = \max_a Q(s_{t+1}, a)\),并将它们组合到我们的损失中。根据定义,如果 \(s\) 是终端状态,我们将 \(V(s) = 0\) 设置为 0。我们还使用目标网络来计算 \(V(s_{t+1})\) 以增加稳定性。目标网络在每个步骤都使用由超参数 TAU
控制的 软更新进行更新,该超参数先前已定义。
def optimize_model():
if len(memory) < BATCH_SIZE:
return
transitions = memory.sample(BATCH_SIZE)
# Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
# detailed explanation). This converts batch-array of Transitions
# to Transition of batch-arrays.
batch = Transition(*zip(*transitions))
# Compute a mask of non-final states and concatenate the batch elements
# (a final state would've been the one after which simulation ended)
non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
batch.next_state)), device=device, dtype=torch.bool)
non_final_next_states = torch.cat([s for s in batch.next_state
if s is not None])
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
# Compute Q(s_t, a) - the model computes Q(s_t), then we select the
# columns of actions taken. These are the actions which would've been taken
# for each batch state according to policy_net
state_action_values = policy_net(state_batch).gather(1, action_batch)
# Compute V(s_{t+1}) for all next states.
# Expected values of actions for non_final_next_states are computed based
# on the "older" target_net; selecting their best reward with max(1).values
# This is merged based on the mask, such that we'll have either the expected
# state value or 0 in case the state was final.
next_state_values = torch.zeros(BATCH_SIZE, device=device)
with torch.no_grad():
next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values
# Compute the expected Q values
expected_state_action_values = (next_state_values * GAMMA) + reward_batch
# Compute Huber loss
criterion = nn.SmoothL1Loss()
loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))
# Optimize the model
optimizer.zero_grad()
loss.backward()
# In-place gradient clipping
torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
optimizer.step()
下面,您可以找到主训练循环。在开始时,我们重置环境并获得初始 state
张量。然后,我们采样一个动作,执行它,观察下一个状态和奖励(始终为 1),并优化我们的模型一次。当情节结束(我们的模型失败)时,我们重新启动循环。
下面,如果 GPU 可用,num_episodes 设置为 600,否则计划 50 个情节,以便训练不会花费太长时间。但是,50 个情节不足以观察到 CartPole 的良好性能。您应该看到模型在 600 个训练情节内不断实现 500 步。训练 RL 智能体可能是一个嘈杂的过程,因此如果未观察到收敛,则重新启动训练可以产生更好的结果。
if torch.cuda.is_available() or torch.backends.mps.is_available():
num_episodes = 600
else:
num_episodes = 50
for i_episode in range(num_episodes):
# Initialize the environment and get its state
state, info = env.reset()
state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
for t in count():
action = select_action(state)
observation, reward, terminated, truncated, _ = env.step(action.item())
reward = torch.tensor([reward], device=device)
done = terminated or truncated
if terminated:
next_state = None
else:
next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)
# Store the transition in memory
memory.push(state, action, next_state, reward)
# Move to the next state
state = next_state
# Perform one step of the optimization (on the policy network)
optimize_model()
# Soft update of the target network's weights
# θ′ ← τ θ + (1 −τ )θ′
target_net_state_dict = target_net.state_dict()
policy_net_state_dict = policy_net.state_dict()
for key in policy_net_state_dict:
target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
target_net.load_state_dict(target_net_state_dict)
if done:
episode_durations.append(t + 1)
plot_durations()
break
print('Complete')
plot_durations(show_result=True)
plt.ioff()
plt.show()

/usr/local/lib/python3.10/dist-packages/gymnasium/utils/passive_env_checker.py:249: DeprecationWarning:
`np.bool8` is a deprecated alias for `np.bool_`. (Deprecated NumPy 1.24)
Complete
下图说明了整体结果数据流。

动作是随机选择的,或者基于策略选择的,从 gym 环境中获取下一步样本。我们将结果记录在回放记忆中,并在每次迭代中运行优化步骤。优化从回放记忆中选择一个随机批次来训练新策略。“较旧”的 target_net 也用于优化以计算预期的 Q 值。其权重的软更新在每个步骤执行。
脚本总运行时间:(4 分 54.566 秒)