• 教程 >
  • 强化学习 (DQN) 教程
快捷方式

强化学习 (DQN) 教程

作者: Adam Paszke

Mark Towers

本教程演示了如何使用 PyTorch 在来自 Gymnasium 的 CartPole-v1 任务上训练一个深度 Q 学习 (DQN) 智能体。

你可能会发现阅读原始的 深度 Q 学习 (DQN) 论文很有帮助

任务

智能体必须在两个动作之间做出选择 - 将小车向左或向右移动 - 以使连接到它上面的杆保持直立。你可以在 Gymnasium 的网站 上找到有关环境和其他更具挑战性环境的更多信息。

CartPole

CartPole

当智能体观察环境的当前状态并选择一个动作时,环境会转换到一个新的状态,并且还会返回一个奖励,该奖励指示动作的结果。在此任务中,每个增量时间步长奖励为 +1,如果杆倾斜过大或小车距离中心超过 2.4 个单位,则环境终止。这意味着性能更好的场景将运行更长时间,累积更大的回报。

CartPole 任务的设计使得智能体的输入为 4 个表示环境状态的实数值(位置、速度等)。我们采用这 4 个输入,不进行任何缩放,并将其通过一个具有 2 个输出的小型全连接网络,每个动作对应一个输出。该网络经过训练以预测给定输入状态下每个动作的期望值。然后选择期望值最高的动作。

首先,让我们导入所需的包。首先,我们需要使用 gymnasium 来创建环境,可以通过 pip 安装。这是原始 OpenAI Gym 项目的一个分支,自 Gym v0.19 版本以来一直由同一个团队维护。如果您在 Google Colab 中运行此代码,请运行

%%bash
pip3 install gymnasium[classic_control]

我们还将使用 PyTorch 中的以下内容:

  • 神经网络 (torch.nn)

  • 优化器 (torch.optim)

  • 自动微分 (torch.autograd)

import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

env = gym.make("CartPole-v1")

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

# if GPU is to be used
device = torch.device(
    "cuda" if torch.cuda.is_available() else
    "mps" if torch.backends.mps.is_available() else
    "cpu"
)

经验回放内存

我们将使用经验回放内存来训练我们的 DQN。它存储代理观察到的转换,允许我们稍后重用这些数据。通过随机从中采样,构建批次的转换是去相关的。研究表明,这极大地稳定并改进了 DQN 训练过程。

为此,我们需要两个类:

  • Transition - 一个命名元组,表示我们环境中的单个转换。它本质上将 (状态,动作) 对映射到其 (下一个状态,奖励) 结果,其中状态是稍后描述的屏幕差异图像。

  • ReplayMemory - 一个有界大小的循环缓冲区,用于保存最近观察到的转换。它还实现了 .sample() 方法,用于选择用于训练的转换的随机批次。

Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

现在,让我们定义我们的模型。但首先,让我们快速回顾一下 DQN 是什么。

DQN 算法

我们的环境是确定性的,因此出于简单起见,此处提供的公式也都是确定性公式。在强化学习文献中,它们还包含对环境中随机转换的期望。

我们的目标是训练一个策略,该策略试图最大化折扣累积奖励 \(R_{t_0} = \sum_{t=t_0}^{\infty} \gamma^{t - t_0} r_t\),其中 \(R_{t_0}\) 也称为 回报。折扣因子 \(\gamma\) 应为 \(0\)\(1\) 之间的常数,以确保总和收敛。较低的 \(\gamma\) 使我们代理对来自不确定的遥远未来的奖励的重视程度低于对近期奖励的重视程度,因为代理可以对近期奖励有相当的把握。它还鼓励代理收集时间上更接近的奖励,而不是在未来时间上相隔较远的等效奖励。

Q 学习背后的主要思想是,如果我们有一个函数 \(Q^*: State \times Action \rightarrow \mathbb{R}\),它可以告诉我们如果我们在给定状态下采取某个动作,我们的回报会是多少,那么我们就可以轻松地构建一个最大化我们奖励的策略

\[\pi^*(s) = \arg\!\max_a \ Q^*(s, a) \]

但是,我们并不了解世界的一切,因此我们无法访问 \(Q^*\)。但是,由于神经网络是通用的函数逼近器,因此我们可以简单地创建一个并训练它来模拟 \(Q^*\)

对于我们的训练更新规则,我们将使用一个事实,即每个 \(Q\) 函数对于某个策略都满足贝尔曼方程

\[Q^{\pi}(s, a) = r + \gamma Q^{\pi}(s', \pi(s')) \]

等式两侧的差异称为时间差分误差 \(\delta\)

\[\delta = Q(s, a) - (r + \gamma \max_a' Q(s', a)) \]

为了最小化这个误差,我们将使用 Huber 损失。当误差较小时,Huber 损失的作用类似于均方误差,但当误差较大时,它类似于平均绝对误差 - 这使得它在 \(Q\) 的估计非常嘈杂时,对异常值更稳健。我们计算从回放内存中采样的转换批次 \(B\) 的此损失

\[\mathcal{L} = \frac{1}{|B|}\sum_{(s, a, s', r) \ \in \ B} \mathcal{L}(\delta)\]
\[\text{其中} \quad \mathcal{L}(\delta) = \begin{cases} \frac{1}{2}{\delta^2} & \text{for } |\delta| \le 1, \\ |\delta| - \frac{1}{2} & \text{otherwise.} \end{cases}\]

Q 网络

我们的模型将是一个前馈神经网络,它接收当前和先前屏幕补丁之间的差异作为输入。它有两个输出,分别表示 \(Q(s, \mathrm{left})\)\(Q(s, \mathrm{right})\)(其中 \(s\) 是网络的输入)。实际上,网络试图预测给定当前输入时采取每个动作的 预期回报

class DQN(nn.Module):

    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(n_observations, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, n_actions)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)

训练

超参数和实用程序

此单元格实例化我们的模型及其优化器,并定义一些实用程序

  • select_action - 将根据 epsilon 贪婪策略选择一个动作。简单来说,我们有时会使用我们的模型来选择动作,有时我们会随机均匀地采样一个动作。选择随机动作的概率将从 EPS_START 开始,并呈指数衰减到 EPS_ENDEPS_DECAY 控制衰减速率。

  • plot_durations - 用于绘制情节持续时间的辅助函数,以及过去 100 个情节的平均值(在官方评估中使用的度量)。该图将位于包含主要训练循环的单元格下方,并且将在每个情节结束后更新。

# BATCH_SIZE is the number of transitions sampled from the replay buffer
# GAMMA is the discount factor as mentioned in the previous section
# EPS_START is the starting value of epsilon
# EPS_END is the final value of epsilon
# EPS_DECAY controls the rate of exponential decay of epsilon, higher means a slower decay
# TAU is the update rate of the target network
# LR is the learning rate of the ``AdamW`` optimizer
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4

# Get number of actions from gym action space
n_actions = env.action_space.n
# Get the number of state observations
state, info = env.reset()
n_observations = len(state)

policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)


steps_done = 0


def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            # t.max(1) will return the largest column value of each row.
            # second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            return policy_net(state).max(1).indices.view(1, 1)
    else:
        return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)


episode_durations = []


def plot_durations(show_result=False):
    plt.figure(1)
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated
    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        else:
            display.display(plt.gcf())

训练循环

最后,用于训练我们模型的代码。

在这里,您可以找到一个 optimize_model 函数,该函数执行优化的一个步骤。它首先采样一个批次,将所有张量连接到一个张量中,计算 \(Q(s_t, a_t)\)\(V(s_{t+1}) = \max_a Q(s_{t+1}, a)\),并将它们组合到我们的损失中。根据定义,如果 \(s\) 是终止状态,则我们将其设置为 \(V(s) = 0\)。我们还使用目标网络来计算 \(V(s_{t+1})\) 以提高稳定性。目标网络在每一步都使用由超参数 TAU 控制的 软更新 进行更新,该超参数先前已定义。

def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
    # detailed explanation). This converts batch-array of Transitions
    # to Transition of batch-arrays.
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1).values
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    # In-place gradient clipping
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()

下面,您可以找到主要的训练循环。在开始时,我们重置环境并获取初始 state 张量。然后,我们采样一个动作,执行它,观察下一个状态和奖励(始终为 1),并优化我们的模型一次。当情节结束(我们的模型失败)时,我们重新启动循环。

下面,如果 GPU 可用,则 num_episodes 设置为 600,否则计划 50 个情节,以便训练不会花费太长时间。但是,50 个情节不足以观察到 CartPole 上的良好性能。您应该看到模型在 600 个训练情节内始终达到 500 步。训练 RL 代理可能是一个嘈杂的过程,因此如果未观察到收敛,则重新启动训练可能会产生更好的结果。

if torch.cuda.is_available() or torch.backends.mps.is_available():
    num_episodes = 600
else:
    num_episodes = 50

for i_episode in range(num_episodes):
    # Initialize the environment and get its state
    state, info = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    for t in count():
        action = select_action(state)
        observation, reward, terminated, truncated, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)
        done = terminated or truncated

        if terminated:
            next_state = None
        else:
            next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

        # Store the transition in memory
        memory.push(state, action, next_state, reward)

        # Move to the next state
        state = next_state

        # Perform one step of the optimization (on the policy network)
        optimize_model()

        # Soft update of the target network's weights
        # θ′ ← τ θ + (1 −τ )θ′
        target_net_state_dict = target_net.state_dict()
        policy_net_state_dict = policy_net.state_dict()
        for key in policy_net_state_dict:
            target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
        target_net.load_state_dict(target_net_state_dict)

        if done:
            episode_durations.append(t + 1)
            plot_durations()
            break

print('Complete')
plot_durations(show_result=True)
plt.ioff()
plt.show()
Result
/opt/conda/envs/py_3.10/lib/python3.10/site-packages/gymnasium/utils/passive_env_checker.py:249: DeprecationWarning:

`np.bool8` is a deprecated alias for `np.bool_`.  (Deprecated NumPy 1.24)

Complete

以下是说明整体结果数据流的图表。

../_images/reinforcement_learning_diagram.jpg

动作是随机选择或基于策略选择,从 gym 环境中获取下一步样本。我们将结果记录在回放内存中,并在每次迭代时运行优化步骤。优化从回放内存中随机选择一个批次来训练新的策略。“旧的”target_net 也用于优化以计算预期的 Q 值。在其权重上每一步都执行软更新。

脚本的总运行时间:(4 分 31.080 秒)

由 Sphinx-Gallery 生成的图库

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源