torchrl.collectors 包¶
数据收集器与 pytorch 的数据加载器(dataloaders)有些类似,不同之处在于 (1) 它们从非静态数据源收集数据,(2) 数据是使用模型(很可能是正在训练的模型的某个版本)收集的。
TorchRL 的数据收集器接受两个主要参数:一个环境(或一个环境构造函数列表)和一个策略(policy)。它们将在预设的步数内迭代执行环境步进和策略查询,然后将收集到的数据堆栈交付给用户。环境在达到完成状态时和/或经过预设的步数后会被重置。
由于数据收集是一个潜在的计算密集型过程,因此适当配置执行超参数至关重要。首先需要考虑的参数是数据收集应该与优化步骤串行发生还是并行发生。SyncDataCollector
类将在训练工作进程上执行数据收集。MultiSyncDataCollector
将工作负载分配到多个工作进程并聚合结果,然后将结果交付给训练工作进程。最后,MultiaSyncDataCollector
将在多个工作进程上执行数据收集,并交付其能收集到的第一个批次结果。这种执行方式将与网络的训练持续并行进行:这意味着用于数据收集的策略权重可能略滞后于训练工作进程上的策略配置。因此,尽管此类可能是收集数据最快的,但其代价是仅适用于可以异步收集数据的场景(例如离策略强化学习或课程强化学习)。对于远程执行的 rollout(MultiSyncDataCollector
或 MultiaSyncDataCollector
),需要使用 collector.update_policy_weights_() 或在构造函数中设置 update_at_each_batch=True 来同步远程策略权重与训练工作进程上的权重。
需要考虑的第二个参数(在远程设置中)是数据收集设备以及环境和策略操作执行设备。例如,在 CPU 上执行的策略可能比在 CUDA 上执行的策略慢。当多个推理工作进程同时运行时,将计算工作负载分配到可用设备上可以加快收集速度或避免 OOM(内存不足)错误。最后,批次大小和传递设备(即数据在等待传递给收集工作进程时存储的设备)的选择也可能影响内存管理。关键控制参数是 devices
,它控制执行设备(即策略设备),以及 storing_device
,它控制 rollout 期间环境和数据存储的设备。一个好的启发式方法通常是为存储和计算使用相同的设备,这是只传递 devices 参数时的默认行为。
除了这些计算参数外,用户还可以选择配置以下参数
max_frames_per_traj: 触发
env.reset()
的帧数阈值frames_per_batch: 收集器每次迭代交付的帧数
init_random_frames: 随机步数(调用
env.rand_step()
的步数)reset_at_each_iter: 如果为
True
,环境将在每次批次收集后重置split_trajs: 如果为
True
,轨迹将被分割并以 padded tensordict 的形式交付,同时包含一个"mask"
键,该键指向一个布尔掩码,表示有效值。exploration_type: 与策略一起使用的探索策略。
reset_when_done: 环境在达到完成状态时是否应该重置。
收集器与批次大小¶
由于每个收集器组织其内部运行环境的方式不同,因此数据将根据收集器的具体特性而具有不同的批次大小。下表总结了数据收集时的预期结果
SyncDataCollector |
MultiSyncDataCollector (n=B) |
MultiaSyncDataCollector (n=B) |
|||
---|---|---|---|---|---|
cat_results |
不适用 |
“stack” |
0 |
-1 |
不适用 |
单个环境 |
[T] |
[B, T] |
[B*(T//B) |
[B*(T//B)] |
[T] |
批量环境 (n=P) |
[P, T] |
[B, P, T] |
[B * P, T] |
[P, T * B] |
[P, T] |
在这些情况下,最后一个维度(T
表示 时间
)会进行调整,使得批次大小等于传递给收集器的 frames_per_batch
参数值。
警告
MultiSyncDataCollector
不应与 cat_results=0
一起使用,因为对于批量环境,数据将沿批次维度堆叠,对于单个环境,数据将沿时间维度堆叠,这在相互切换时可能导致混淆。cat_results="stack"
是与环境交互的一种更好、更一致的方式,因为它会保持每个维度独立,并在配置、收集器类和其他组件之间提供更好的互换性。
考虑到 MultiaSyncDataCollector
以先到先得的方式交付数据批次,而 MultiSyncDataCollector
在交付数据之前会从每个子收集器收集数据,这一点很容易理解。MultiSyncDataCollector
包含一个对应于运行的子收集器数量 (B
) 的维度,而 MultiaSyncDataCollector
则没有。
收集器与策略副本¶
将策略传递给收集器时,我们可以选择运行该策略的设备。这可以用来将策略的训练版本保存在一个设备上,而推理版本保存在另一个设备上。例如,如果您有两个 CUDA 设备,明智的做法可能是在一个设备上进行训练,并在另一个设备上执行策略进行推理。如果是这种情况,可以使用 update_policy_weights_()
将参数从一个设备复制到另一个设备(如果不需要复制,此方法无效)。
由于目标是避免显式调用 policy.to(policy_device),收集器将在必要时对策略结构进行深拷贝,并在实例化期间将参数复制到新设备上。由于并非所有策略都支持深拷贝(例如,使用 CUDA 图或依赖第三方库的策略),我们尝试限制执行深拷贝的情况。下图显示了何时会发生这种情况。

收集器中的策略复制决策树。¶
收集器与经验回放缓冲区互操作性¶
在需要从经验回放缓冲区中采样单个转换(transitions)的最简单场景中,无需过多关注收集器的构建方式。收集后将数据展平是填充存储之前一个足够的预处理步骤。
>>> memory = ReplayBuffer(
... storage=LazyTensorStorage(N),
... transform=lambda data: data.reshape(-1))
>>> for data in collector:
... memory.extend(data)
如果需要收集轨迹切片(slices),推荐的方法是创建一个多维缓冲区并使用 SliceSampler
采样器类进行采样。必须确保传递给缓冲区的数据形状正确,并且 时间
和 批次
维度清晰分离。实践中,以下配置将生效:
>>> # Single environment: no need for a multi-dimensional buffer
>>> memory = ReplayBuffer(
... storage=LazyTensorStorage(N),
... sampler=SliceSampler(num_slices=4, trajectory_key=("collector", "traj_ids"))
... )
>>> collector = SyncDataCollector(env, policy, frames_per_batch=N, total_frames=-1)
>>> for data in collector:
... memory.extend(data)
>>> # Batched environments: a multi-dim buffer is required
>>> memory = ReplayBuffer(
... storage=LazyTensorStorage(N, ndim=2),
... sampler=SliceSampler(num_slices=4, trajectory_key=("collector", "traj_ids"))
... )
>>> env = ParallelEnv(4, make_env)
>>> collector = SyncDataCollector(env, policy, frames_per_batch=N, total_frames=-1)
>>> for data in collector:
... memory.extend(data)
>>> # MultiSyncDataCollector + regular env: behaves like a ParallelEnv if cat_results="stack"
>>> memory = ReplayBuffer(
... storage=LazyTensorStorage(N, ndim=2),
... sampler=SliceSampler(num_slices=4, trajectory_key=("collector", "traj_ids"))
... )
>>> collector = MultiSyncDataCollector([make_env] * 4,
... policy,
... frames_per_batch=N,
... total_frames=-1,
... cat_results="stack")
>>> for data in collector:
... memory.extend(data)
>>> # MultiSyncDataCollector + parallel env: the ndim must be adapted accordingly
>>> memory = ReplayBuffer(
... storage=LazyTensorStorage(N, ndim=3),
... sampler=SliceSampler(num_slices=4, trajectory_key=("collector", "traj_ids"))
... )
>>> collector = MultiSyncDataCollector([ParallelEnv(2, make_env)] * 4,
... policy,
... frames_per_batch=N,
... total_frames=-1,
... cat_results="stack")
>>> for data in collector:
... memory.extend(data)
目前使用 MultiSyncDataCollector
采样轨迹的经验回放缓冲区尚未完全支持,因为数据批次可能来自任何工作进程,并且在大多数情况下,写入缓冲区中的连续批次不会来自同一来源(从而中断轨迹)。
单节点数据收集器¶
数据收集器的基类。 |
|
|
用于强化学习问题的通用数据收集器。 |
|
在独立的进程上同步运行指定数量的数据收集器。 |
|
在独立的进程上异步运行指定数量的数据收集器。 |
|
在独立的进程上运行单个数据收集器。 |
分布式数据收集器¶
TorchRL 提供了一组分布式数据收集器。这些工具支持多种后端(使用 DistributedDataCollector
的 'gloo'
、'nccl'
、'mpi'
,或使用 RPCDataCollector
的 PyTorch RPC)和启动器('ray'
、submitit
或 torch.multiprocessing
)。它们可以在同步或异步模式下高效使用,无论是单节点还是跨多个节点。
资源:在专用文件夹中查找这些收集器的示例。
注意
选择子收集器:所有分布式收集器都支持各种单机收集器。您可能想知道为什么不使用 MultiSyncDataCollector
或 ParallelEnv
。一般来说,多进程收集器比需要每一步通信的并行环境具有更低的 IO 开销。然而,模型规格的作用方向相反,因为使用并行环境会使策略(和/或 transforms)的执行速度更快,因为这些操作是向量化的。
注意
选择收集器(或并行环境)的设备:进程之间的数据共享是通过共享内存缓冲区实现的,对于在 CPU 上执行的并行环境和多进程环境。根据所使用机器的能力,这可能比在 GPU 上共享数据慢得多,GPU 数据共享由 cuda 驱动程序原生支持。实践中,这意味着在构建并行环境或收集器时使用 device="cpu"
关键字参数,可能导致比在可用时使用 device="cuda"
更慢的数据收集。
注意
考虑到库的许多可选依赖项(例如 Gym、Gymnasium 等),在多进程/分布式设置中警告可能会很快变得非常烦人。默认情况下,TorchRL 会在子进程中过滤掉这些警告。如果仍然希望看到这些警告,可以通过设置 torchrl.filter_warnings_subprocess=False
来显示它们。
|
使用 torch.distributed 后端的分布式数据收集器。 |
|
基于 RPC 的分布式数据收集器。 |
|
使用 torch.distributed 后端的分布式同步数据收集器。 |
|
submitit 的延迟启动器。 |
|
使用 Ray 后端的分布式数据收集器。 |
辅助函数¶
|
一个用于轨迹分割的实用函数。 |