• 文档 >
  • TorchRL 训练器:DQN 示例
快捷方式

TorchRL 训练器:DQN 示例

作者Vincent Moens

TorchRL 提供了一个通用的 Trainer 类来处理你的训练循环。训练器执行一个嵌套循环,其中外层循环是数据收集,内层循环消耗这些数据或从回放缓冲区中检索到的数据来训练模型。在此训练循环的各个点,可以附加钩子(hook)并在给定的间隔执行。

在本教程中,我们将使用训练器类从头开始训练一个 DQN 算法来解决 CartPole(倒立摆)任务。

主要收获

  • 构建训练器及其基本组件:数据收集器、损失模块、回放缓冲区和优化器。

  • 向训练器添加钩子(hook),例如日志记录器、目标网络更新器等。

训练器是完全可定制的,并提供大量功能。本教程围绕其构建展开。我们将首先详细介绍如何构建库的每个组件,然后使用 Trainer 类将这些部分组合起来。

在此过程中,我们还将关注库的其他一些方面

  • 如何在 TorchRL 中构建环境,包括转换(例如数据归一化、帧拼接、调整大小和转换为灰度图)和并行执行。与我们在 DDPG 教程中所做的不同,我们将对像素而不是状态向量进行归一化。

  • 如何设计 QValueActor 对象,即一个估算动作值并选取估算回报最高动作的 actor;

  • 如何高效地从环境中收集数据并将其存储在回放缓冲区中;

  • 如何使用多步(multi-step),这是一种用于离策略算法的简单预处理步骤;

  • 最后是如何评估你的模型。

先决条件:我们建议你先通过 PPO 教程熟悉 torchrl。

DQN

DQN (深度 Q-学习) 是深度强化学习的开创性工作。

从高层次上看,该算法相当简单:Q-学习包括学习一个状态-动作值表,这样当遇到任何特定状态时,我们只需查找值最高的动作就知道应该选择哪个动作。这种简单的设置要求动作和状态是离散的,否则无法构建查找表。

DQN 使用一个神经网络,它将状态-动作空间映射到一个值(标量)空间,这摊销了存储和探索所有可能状态-动作组合的成本:如果过去没有见过某个状态,我们仍然可以通过我们的神经网络将它与各种可用动作一起传入,并获得每个可用动作的插值值。

我们将解决经典的控制问题——倒立摆(CartPole)。根据检索该环境的 Gymnasium 文档所述

一根杆子通过一个未驱动的关节连接到一个小车上,小车沿着一条
无摩擦轨道移动。摆锤垂直放置在小车上,目标是
通过向左和向右施加力来平衡杆子
作用在小车上。
Cart Pole

我们的目标不是提供该算法的 SOTA(最先进)实现,而是为了在该算法的背景下提供 TorchRL 功能的高级说明。

import os
import uuid

import torch
from torch import nn
from torchrl.collectors import MultiaSyncDataCollector, SyncDataCollector
from torchrl.data import LazyMemmapStorage, MultiStep, TensorDictReplayBuffer
from torchrl.envs import (
    EnvCreator,
    ExplorationType,
    ParallelEnv,
    RewardScaling,
    StepCounter,
)
from torchrl.envs.libs.gym import GymEnv
from torchrl.envs.transforms import (
    CatFrames,
    Compose,
    GrayScale,
    ObservationNorm,
    Resize,
    ToTensorImage,
    TransformedEnv,
)
from torchrl.modules import DuelingCnnDQNet, EGreedyModule, QValueActor

from torchrl.objectives import DQNLoss, SoftUpdate
from torchrl.record.loggers.csv import CSVLogger
from torchrl.trainers import (
    LogScalar,
    LogValidationReward,
    ReplayBufferTrainer,
    Trainer,
    UpdateWeights,
)


def is_notebook() -> bool:
    try:
        shell = get_ipython().__class__.__name__
        if shell == "ZMQInteractiveShell":
            return True  # Jupyter notebook or qtconsole
        elif shell == "TerminalInteractiveShell":
            return False  # Terminal running IPython
        else:
            return False  # Other type (?)
    except NameError:
        return False  # Probably standard Python interpreter

让我们开始了解我们的算法所需的各种部分

  • 环境;

  • 策略(以及我们归类到“模型”下的相关模块);

  • 数据收集器,它使策略在环境中进行互动并提供训练数据;

  • 用于存储训练数据的回放缓冲区;

  • 损失模块,它计算目标函数以训练我们的策略最大化回报;

  • 优化器,它根据我们的损失执行参数更新。

附加模块包括日志记录器、记录器(在“eval”模式下执行策略)和目标网络更新器。有了所有这些组件,很容易看出在训练脚本中可能会放错或误用某个组件。训练器就在那里为你协调一切!

构建环境

首先,让我们编写一个辅助函数来输出一个环境。像往常一样,“原始”环境在实践中可能过于简单,我们需要一些数据转换来将其输出暴露给策略。

我们将使用七种转换

  • StepCounter 用于计算每个轨迹中的步数;

  • ToTensorImage 会将 [W, H, C] uint8 张量转换为形状为 [C, W, H] 且值在 [0, 1] 范围内的浮点张量;

  • RewardScaling 以减小回报的尺度;

  • GrayScale 会将我们的图像转换为灰度图;

  • Resize 会将图像大小调整为 64x64 格式;

  • CatFrames 会将任意数量的连续帧(N=4)沿通道维度拼接成单个张量。这很有用,因为单张图像不包含关于倒立摆运动的信息。需要通过循环神经网络或使用帧堆栈来记录过去的观察和动作。

  • ObservationNorm 会根据一些自定义的汇总统计信息对我们的观察进行归一化。

实际上,我们的环境构建器有两个参数

  • parallel:决定是否必须并行运行多个环境。我们在 ParallelEnv 之后堆叠转换,以利用设备上操作的向量化,尽管这在技术上也可以用于每个环境连接到其自己的转换集的情况。

  • obs_norm_sd 将包含用于 ObservationNorm 转换的归一化常数。

def make_env(
    parallel=False,
    obs_norm_sd=None,
    num_workers=1,
):
    if obs_norm_sd is None:
        obs_norm_sd = {"standard_normal": True}
    if parallel:

        def maker():
            return GymEnv(
                "CartPole-v1",
                from_pixels=True,
                pixels_only=True,
                device=device,
            )

        base_env = ParallelEnv(
            num_workers,
            EnvCreator(maker),
            # Don't create a sub-process if we have only one worker
            serial_for_single=True,
            mp_start_method=mp_context,
        )
    else:
        base_env = GymEnv(
            "CartPole-v1",
            from_pixels=True,
            pixels_only=True,
            device=device,
        )

    env = TransformedEnv(
        base_env,
        Compose(
            StepCounter(),  # to count the steps of each trajectory
            ToTensorImage(),
            RewardScaling(loc=0.0, scale=0.1),
            GrayScale(),
            Resize(64, 64),
            CatFrames(4, in_keys=["pixels"], dim=-3),
            ObservationNorm(in_keys=["pixels"], **obs_norm_sd),
        ),
    )
    return env

计算归一化常数

为了归一化图像,我们不希望使用完整的 [C, W, H] 归一化掩码独立归一化每个像素,而是使用形状更简单的 [C, 1, 1] 归一化常数集(位置和尺度参数)。我们将使用 init_stats()reduce_dim 参数来指示哪些维度必须被约简,并使用 keep_dims 参数来确保不是所有维度都在过程中消失。

def get_norm_stats():
    test_env = make_env()
    test_env.transform[-1].init_stats(
        num_iter=1000, cat_dim=0, reduce_dim=[-1, -2, -4], keep_dims=(-1, -2)
    )
    obs_norm_sd = test_env.transform[-1].state_dict()
    # let's check that normalizing constants have a size of ``[C, 1, 1]`` where
    # ``C=4`` (because of :class:`~torchrl.envs.CatFrames`).
    print("state dict of the observation norm:", obs_norm_sd)
    test_env.close()
    del test_env
    return obs_norm_sd

构建模型(深度 Q-网络)

以下函数构建一个 DuelingCnnDQNet 对象,它是一个简单的 CNN,后接一个两层 MLP。这里使用的唯一技巧是动作值(即向左和向右动作值)是使用以下公式计算的

\[\mathbb{v} = b(obs) + v(obs) - \mathbb{E}[v(obs)]\]

其中 \(\mathbb{v}\) 是我们的动作值向量,\(b\) 是一个 \(\mathbb{R}^n \rightarrow 1\) 函数,\(v\) 是一个 \(\mathbb{R}^n \rightarrow \mathbb{R}^m\) 函数,其中 \(n = \# obs\)(观测数)且 \(m = \# actions\)(动作数)。

我们的网络被包装在 QValueActor 中,它将读取状态-动作值,选取具有最大值的那个,并将所有这些结果写入输入的 tensordict.TensorDict 中。

def make_model(dummy_env):
    cnn_kwargs = {
        "num_cells": [32, 64, 64],
        "kernel_sizes": [6, 4, 3],
        "strides": [2, 2, 1],
        "activation_class": nn.ELU,
        # This can be used to reduce the size of the last layer of the CNN
        # "squeeze_output": True,
        # "aggregator_class": nn.AdaptiveAvgPool2d,
        # "aggregator_kwargs": {"output_size": (1, 1)},
    }
    mlp_kwargs = {
        "depth": 2,
        "num_cells": [
            64,
            64,
        ],
        "activation_class": nn.ELU,
    }
    net = DuelingCnnDQNet(
        dummy_env.action_spec.shape[-1], 1, cnn_kwargs, mlp_kwargs
    ).to(device)
    net.value[-1].bias.data.fill_(init_bias)

    actor = QValueActor(net, in_keys=["pixels"], spec=dummy_env.action_spec).to(device)
    # init actor: because the model is composed of lazy conv/linear layers,
    # we must pass a fake batch of data through it to instantiate them.
    tensordict = dummy_env.fake_tensordict()
    actor(tensordict)

    # we join our actor with an EGreedyModule for data collection
    exploration_module = EGreedyModule(
        spec=dummy_env.action_spec,
        annealing_num_steps=total_frames,
        eps_init=eps_greedy_val,
        eps_end=eps_greedy_val_env,
    )
    actor_explore = TensorDictSequential(actor, exploration_module)

    return actor, actor_explore

数据收集与存储

回放缓冲区

回放缓冲区在离策略强化学习算法(如 DQN)中扮演着核心角色。它们构成了我们在训练期间进行采样的数据集。

在这里,我们将使用常规采样策略,尽管优先级回放缓冲区 (RB) 可以显著提高性能。

我们使用 LazyMemmapStorage 类将存储放在磁盘上。这种存储以惰性方式创建:只有当第一个数据批次传递给它时,它才会被实例化。

这种存储的唯一要求是,写入时传递给它的数据必须始终具有相同的形状。

buffer_scratch_dir = tempfile.TemporaryDirectory().name


def get_replay_buffer(buffer_size, n_optim, batch_size, device):
    replay_buffer = TensorDictReplayBuffer(
        batch_size=batch_size,
        storage=LazyMemmapStorage(buffer_size, scratch_dir=buffer_scratch_dir),
        prefetch=n_optim,
        transform=lambda td: td.to(device),
    )
    return replay_buffer

数据收集器

PPODDPG 中一样,我们将使用数据收集器作为外层循环中的数据加载器。

我们选择以下配置:我们将在一系列不同的收集器中并行同步运行多个并行环境,这些收集器本身并行但异步运行。

注意

此功能仅在使用 Python 多进程库的“spawn”启动方法运行代码时可用。如果本教程直接作为脚本运行(因此使用“fork”方法),我们将使用常规的 SyncDataCollector

这种配置的优势在于,我们可以平衡批量执行的计算量与我们希望异步执行的计算量。我们鼓励读者通过修改收集器的数量(即传递给收集器的环境构造器的数量)以及每个收集器中并行执行的环境数量(由 num_workers 超参数控制)来试验收集速度受到的影响。

收集器的设备可以通过 device(通用)、policy_deviceenv_devicestoring_device 参数完全参数化。 storing_device 参数将修改收集数据的存储位置:如果我们收集的批次数据量相当大,我们可能希望将它们存储在与计算发生设备不同的位置。对于像我们这样的异步数据收集器,不同的存储设备意味着我们收集的数据每次不会驻留在同一设备上,这是我们的训练循环必须考虑的问题。为简单起见,我们将所有子收集器的设备设置为相同的值。

def get_collector(
    stats,
    num_collectors,
    actor_explore,
    frames_per_batch,
    total_frames,
    device,
):
    # We can't use nested child processes with mp_start_method="fork"
    if is_fork:
        cls = SyncDataCollector
        env_arg = make_env(parallel=True, obs_norm_sd=stats, num_workers=num_workers)
    else:
        cls = MultiaSyncDataCollector
        env_arg = [
            make_env(parallel=True, obs_norm_sd=stats, num_workers=num_workers)
        ] * num_collectors
    data_collector = cls(
        env_arg,
        policy=actor_explore,
        frames_per_batch=frames_per_batch,
        total_frames=total_frames,
        # this is the default behavior: the collector runs in ``"random"`` (or explorative) mode
        exploration_type=ExplorationType.RANDOM,
        # We set the all the devices to be identical. Below is an example of
        # heterogeneous devices
        device=device,
        storing_device=device,
        split_trajs=False,
        postproc=MultiStep(gamma=gamma, n_steps=5),
    )
    return data_collector

损失函数

构建我们的损失函数很简单:我们只需要向 DQNLoss 类提供模型和一些超参数即可。

目标参数

许多离策略强化学习算法在估计下一状态或状态-动作对的值时都使用了“目标参数”的概念。目标参数是模型参数的延迟副本。由于它们的预测与当前模型配置的预测不匹配,它们通过对估计值设置悲观界限来帮助学习。这是一种强大的技巧(称为“双 Q-学习”),在类似算法中无处不在。

def get_loss_module(actor, gamma):
    loss_module = DQNLoss(actor, delay_value=True)
    loss_module.make_value_estimator(gamma=gamma)
    target_updater = SoftUpdate(loss_module, eps=0.995)
    return loss_module, target_updater

超参数

让我们从超参数开始。以下设置在实践中应该能很好地工作,并且算法的性能希望不会对这些参数的微小变化过于敏感。

is_fork = multiprocessing.get_start_method() == "fork"
device = (
    torch.device(0)
    if torch.cuda.is_available() and not is_fork
    else torch.device("cpu")
)

优化器

# the learning rate of the optimizer
lr = 2e-3
# weight decay
wd = 1e-5
# the beta parameters of Adam
betas = (0.9, 0.999)
# Optimization steps per batch collected (aka UPD or updates per data)
n_optim = 8

DQN 参数

gamma 衰减因子

gamma = 0.99

平滑目标网络更新衰减参数。这大致对应于硬目标网络更新的 1/tau 间隔

tau = 0.02

数据收集和回放缓冲区

注意

已注释掉用于适当训练的值。

在环境中收集的总帧数。在其他实现中,用户定义最大episode数。对于我们的数据收集器来说,这更难做到,因为它们返回 N 帧收集数据的批次,其中 N 是常数。然而,当收集到一定数量的 episode 时,可以通过中断训练循环来轻松实现相同的 episode 数限制。

total_frames = 5_000  # 500000

用于初始化回放缓冲区的随机帧数。

init_random_frames = 100  # 1000

每个收集批次中的帧数。

frames_per_batch = 32  # 128

每个优化步骤从回放缓冲区采样的帧数

batch_size = 32  # 256

回放缓冲区的帧数大小

buffer_size = min(total_frames, 100000)

每个数据收集器中并行运行的环境数量

num_workers = 2  # 8
num_collectors = 2  # 4

环境和探索

我们设置了 Epsilon-greedy 探索中 epsilon 因子的初始值和最终值。由于我们的策略是确定性的,探索至关重要:没有它,唯一的随机性来源将是环境重置。

eps_greedy_val = 0.1
eps_greedy_val_env = 0.005

为了加快学习速度,我们将价值网络的最后一层偏差设置为预定义的值(这不是强制的)

init_bias = 2.0

注意

为了加快教程的渲染速度,total_frames 超参数被设置为一个非常小的值。要获得合理的性能,请使用更大的值,例如 500000

构建训练器

TorchRL 的 Trainer 类构造函数接受以下仅关键字参数

  • collector

  • loss_module

  • optimizer

  • logger:日志记录器可以是

  • total_frames:此参数定义了训练器的生命周期。

  • frame_skip:使用 frame-skip 时,必须告知收集器,以便准确计算收集的帧数等。告知训练器此参数并非强制,但有助于在总帧数(预算)固定但 frame-skip 可变的情况下进行更公平的比较。

stats = get_norm_stats()
test_env = make_env(parallel=False, obs_norm_sd=stats)
# Get model
actor, actor_explore = make_model(test_env)
loss_module, target_net_updater = get_loss_module(actor, gamma)

collector = get_collector(
    stats=stats,
    num_collectors=num_collectors,
    actor_explore=actor_explore,
    frames_per_batch=frames_per_batch,
    total_frames=total_frames,
    device=device,
)
optimizer = torch.optim.Adam(
    loss_module.parameters(), lr=lr, weight_decay=wd, betas=betas
)
exp_name = f"dqn_exp_{uuid.uuid1()}"
tmpdir = tempfile.TemporaryDirectory()
logger = CSVLogger(exp_name=exp_name, log_dir=tmpdir.name)
warnings.warn(f"log dir: {logger.experiment.log_dir}")

我们可以控制标量应该多久记录一次。在这里,我们将其设置为一个低值,因为我们的训练循环很短

log_interval = 500

trainer = Trainer(
    collector=collector,
    total_frames=total_frames,
    frame_skip=1,
    loss_module=loss_module,
    optimizer=optimizer,
    logger=logger,
    optim_steps_per_batch=n_optim,
    log_interval=log_interval,
)

注册钩子

注册钩子可以通过两种不同的方式实现

  • 如果钩子有,register() 方法是首选。只需将训练器作为输入提供,钩子将以默认名称注册到默认位置。对于某些钩子,注册可能相当复杂:ReplayBufferTrainer 需要 3 个钩子(extendsampleupdate_priority),实现起来可能很麻烦。

buffer_hook = ReplayBufferTrainer(
    get_replay_buffer(buffer_size, n_optim, batch_size=batch_size, device=device),
    flatten_tensordicts=True,
)
buffer_hook.register(trainer)
weight_updater = UpdateWeights(collector, update_weights_interval=1)
weight_updater.register(trainer)
recorder = LogValidationReward(
    record_interval=100,  # log every 100 optimization steps
    record_frames=1000,  # maximum number of frames in the record
    frame_skip=1,
    policy_exploration=actor_explore,
    environment=test_env,
    exploration_type=ExplorationType.DETERMINISTIC,
    log_keys=[("next", "reward")],
    out_keys={("next", "reward"): "rewards"},
    log_pbar=True,
)
recorder.register(trainer)

探索模块的 epsilon 因子也会进行退火

trainer.register_op("post_steps", actor_explore[1].step, frames=frames_per_batch)
  • 任何可调用对象(包括 TrainerHookBase 的子类)都可以使用 register_op() 进行注册。在这种情况下,必须显式传递一个位置 ()。此方法提供了对钩子位置的更多控制,但也需要对训练器机制有更深入的了解。请查阅训练器文档以获取训练器钩子的详细描述。

trainer.register_op("post_optim", target_net_updater.step)

我们也可以记录训练奖励。请注意,对于 CartPole 来说,这只有有限的意义,因为奖励总是 1。折扣奖励总和的最大化不是通过获得更高的奖励,而是通过让倒立摆存活更长时间。这将在进度条中显示的 total_rewards 值中反映出来。

log_reward = LogScalar(log_pbar=True)
log_reward.register(trainer)

注意

如果需要,可以将多个优化器链接到训练器。在这种情况下,每个优化器将与损失字典中的一个字段绑定。请查阅 OptimizerHook 以了解更多信息。

现在我们准备好训练我们的算法了!只需简单调用 trainer.train(),我们将获得日志记录的结果。

trainer.train()

现在我们可以快速检查包含结果的 CSV 文件。

def print_csv_files_in_folder(folder_path):
    """
    Find all CSV files in a folder and prints the first 10 lines of each file.

    Args:
        folder_path (str): The relative path to the folder.

    """
    csv_files = []
    output_str = ""
    for dirpath, _, filenames in os.walk(folder_path):
        for file in filenames:
            if file.endswith(".csv"):
                csv_files.append(os.path.join(dirpath, file))
    for csv_file in csv_files:
        output_str += f"File: {csv_file}\n"
        with open(csv_file, "r") as f:
            for i, line in enumerate(f):
                if i == 10:
                    break
                output_str += line.strip() + "\n"
        output_str += "\n"
    print(output_str)


print_csv_files_in_folder(logger.experiment.log_dir)

trainer.shutdown()
del trainer

结论与潜在改进

在本教程中,我们学习了

  • 如何编写训练器,包括构建其组件并在训练器中注册它们;

  • 如何编写 DQN 算法,包括如何使用 QValueNetwork 创建一个选取价值最高动作的策略;

  • 如何构建多进程数据收集器;

本教程可能的改进包括

  • 还可以使用优先级回放缓冲区。这将为价值准确性最差的样本赋予更高的优先级。在文档的回放缓冲区部分了解更多信息。

  • 分布式损失(更多信息请参阅 DistributionalDQNLoss)。

  • 更高级的探索技术,例如 NoisyLinear 层等。

Gallery 由 Sphinx-Gallery 生成

文档

查阅 PyTorch 的全面开发者文档

查看文档

教程

获取面向初学者和高级开发者的深度教程

查看教程

资源

查找开发资源并获得答疑

查看资源