torchrl.collectors 包¶
数据收集器在某种程度上等同于 pytorch 数据加载器,但 (1) 它们收集非静态数据源的数据,并且 (2) 数据是使用模型(可能是正在训练的模型版本)收集的。
TorchRL 的数据收集器接受两个主要参数:环境(或环境构造器列表)和策略。它们将迭代执行环境步骤和策略查询,执行定义的步数,然后将收集的数据堆栈传递给用户。环境将在达到完成状态和/或在预定义的步数后重置。
由于数据收集可能是一个计算密集型过程,因此配置适当的执行超参数至关重要。首先要考虑的参数是数据收集应该与优化步骤串行还是并行发生。SyncDataCollector
类将在训练工作进程上执行数据收集。MultiSyncDataCollector
将在多个工作进程之间拆分工作负载,并聚合将传递给训练工作进程的结果。最后,MultiaSyncDataCollector
将在多个工作进程上执行数据收集,并传递它可以收集的第一批结果。此执行将持续且与网络训练同时发生:这意味着用于数据收集的策略权重可能略微滞后于训练工作进程上的策略配置。因此,尽管此类可能是收集数据最快的方式,但其代价是仅适用于可以接受异步收集数据的设置(例如,离策略 RL 或课程 RL)。对于远程执行的 rollout (MultiSyncDataCollector
或 MultiaSyncDataCollector
),必须使用 collector.update_policy_weights_() 或在构造函数中设置 update_at_each_batch=True 来同步远程策略的权重与训练工作进程的权重。
要考虑的第二个参数(在远程设置中)是数据将被收集的设备以及环境和策略操作将被执行的设备。例如,在 CPU 上执行的策略可能比在 CUDA 上执行的策略慢。当多个推理工作进程同时运行时,在可用设备之间分配计算工作负载可能会加快收集速度或避免 OOM 错误。最后,批大小的选择和传递设备(即数据在等待传递给收集工作进程时存储的设备)也可能影响内存管理。要控制的关键参数是 devices
,它控制执行设备(即策略的设备),以及 storing_device
,它控制在 rollout 期间环境和数据存储的设备。一个好的启发式方法通常是对存储和计算使用相同的设备,这是仅传递 devices 参数时的默认行为。
除了这些计算参数外,用户还可以选择配置以下参数
max_frames_per_traj:调用
env.reset()
后的帧数frames_per_batch:每次在收集器上迭代时传递的帧数
init_random_frames:随机步数(调用
env.rand_step()
的步数)reset_at_each_iter:如果为
True
,则在每次批收集后重置环境split_trajs:如果为
True
,则轨迹将被拆分并以填充的 tensordict 形式传递,并带有一个指向表示有效值的布尔掩码的"mask"
键。exploration_type:与策略一起使用的探索策略。
reset_when_done:是否在达到完成状态时重置环境。
收集器和批大小¶
由于每个收集器都有其自己的组织运行在其中的环境的方式,因此数据将带有不同的批大小,具体取决于收集器的具体特性。下表总结了收集数据时的预期情况
SyncDataCollector |
MultiSyncDataCollector (n=B) |
MultiaSyncDataCollector (n=B) |
|||
---|---|---|---|---|---|
cat_results |
NA |
“stack” |
0 |
-1 |
NA |
单环境 |
[T] |
[B, T] |
[B*(T//B) |
[B*(T//B)] |
[T] |
批量环境 (n=P) |
[P, T] |
[B, P, T] |
[B * P, T] |
[P, T * B] |
[P, T] |
在所有这些情况下,最后一个维度(T
代表 时间
)都经过调整,使批大小等于传递给收集器的 frames_per_batch
参数。
警告
MultiSyncDataCollector
不应与 cat_results=0
一起使用,因为对于批量环境,数据将沿批维度堆叠,或者对于单环境,数据将沿时间维度堆叠,这在两者之间交换时可能会引起一些混淆。cat_results="stack"
是一种更好且更一致的与环境交互的方式,因为它将保持每个维度分开,并在配置、收集器类和其他组件之间提供更好的互换性。
虽然 MultiSyncDataCollector
具有与正在运行的子收集器数量 (B
) 相对应的维度,但 MultiaSyncDataCollector
没有。当考虑到 MultiaSyncDataCollector
以先到先得的方式传递数据批次时,而 MultiSyncDataCollector
在传递数据之前从每个子收集器收集数据时,这一点很容易理解。
收集器和策略副本¶
当将策略传递给收集器时,我们可以选择运行此策略的设备。这可用于将策略的训练版本保留在一个设备上,而将推理版本保留在另一个设备上。例如,如果您有两个 CUDA 设备,则在其中一个设备上进行训练,而在另一个设备上执行策略进行推理可能是明智的。如果是这种情况,则可以使用 update_policy_weights_()
将参数从一个设备复制到另一个设备(如果不需要复制,则此方法是空操作)。
由于目标是避免显式调用 policy.to(policy_device),因此收集器将对策略结构进行深度复制,并在必要时在实例化期间复制放置在新设备上的参数。由于并非所有策略都支持深度复制(例如,使用 CUDA 图或依赖第三方库的策略),我们尝试限制将执行深度复制的情况。下表显示了何时会发生这种情况。

收集器中的策略复制决策树。¶
收集器和回放缓冲区互操作性¶
在必须从回放缓冲区中采样单个转换的最简单情况下,无需过多关注收集器的构建方式。在填充存储之前,展平收集后的数据将是足够的预处理步骤
>>> memory = ReplayBuffer(
... storage=LazyTensorStorage(N),
... transform=lambda data: data.reshape(-1))
>>> for data in collector:
... memory.extend(data)
如果必须收集轨迹切片,则实现此目的的推荐方法是创建一个多维缓冲区,并使用 SliceSampler
采样器类进行采样。必须确保传递给缓冲区的数据形状正确,time
和 batch
维度明确分开。在实践中,以下配置将起作用
>>> # Single environment: no need for a multi-dimensional buffer
>>> memory = ReplayBuffer(
... storage=LazyTensorStorage(N),
... sampler=SliceSampler(num_slices=4, trajectory_key=("collector", "traj_ids"))
... )
>>> collector = SyncDataCollector(env, policy, frames_per_batch=N, total_frames=-1)
>>> for data in collector:
... memory.extend(data)
>>> # Batched environments: a multi-dim buffer is required
>>> memory = ReplayBuffer(
... storage=LazyTensorStorage(N, ndim=2),
... sampler=SliceSampler(num_slices=4, trajectory_key=("collector", "traj_ids"))
... )
>>> env = ParallelEnv(4, make_env)
>>> collector = SyncDataCollector(env, policy, frames_per_batch=N, total_frames=-1)
>>> for data in collector:
... memory.extend(data)
>>> # MultiSyncDataCollector + regular env: behaves like a ParallelEnv iif cat_results="stack"
>>> memory = ReplayBuffer(
... storage=LazyTensorStorage(N, ndim=2),
... sampler=SliceSampler(num_slices=4, trajectory_key=("collector", "traj_ids"))
... )
>>> collector = MultiSyncDataCollector([make_env] * 4,
... policy,
... frames_per_batch=N,
... total_frames=-1,
... cat_results="stack")
>>> for data in collector:
... memory.extend(data)
>>> # MultiSyncDataCollector + parallel env: the ndim must be adapted accordingly
>>> memory = ReplayBuffer(
... storage=LazyTensorStorage(N, ndim=3),
... sampler=SliceSampler(num_slices=4, trajectory_key=("collector", "traj_ids"))
... )
>>> collector = MultiSyncDataCollector([ParallelEnv(2, make_env)] * 4,
... policy,
... frames_per_batch=N,
... total_frames=-1,
... cat_results="stack")
>>> for data in collector:
... memory.extend(data)
使用回放缓冲区采样轨迹与 MultiSyncDataCollector
当前未完全支持,因为数据批次可能来自任何工作进程,并且在大多数情况下,写入缓冲区的连续批次将不会来自同一来源(从而中断轨迹)。
单节点数据收集器¶
数据收集器的基类。 |
|
|
用于 RL 问题的通用数据收集器。 |
|
在单独的进程上同步运行给定数量的 DataCollector。 |
|
在单独的进程上异步运行给定数量的 DataCollector。 |
|
在单独的进程上运行单个 DataCollector。 |
分布式数据收集器¶
TorchRL 提供了一组分布式数据收集器。这些工具支持多种后端('gloo'
、 'nccl'
、 'mpi'
与 DistributedDataCollector
或 PyTorch RPC 与 RPCDataCollector
)和启动器('ray'
、 submitit
或 torch.multiprocessing
)。它们可以有效地用于同步或异步模式,在单节点或跨多节点上。
资源:在 专用文件夹 中查找这些收集器的示例。
注意
选择子收集器:所有分布式收集器都支持各种单机收集器。人们可能想知道为什么使用 MultiSyncDataCollector
或 ParallelEnv
代替。一般来说,多进程收集器的 IO 占用空间比并行环境低,并行环境需要在每一步进行通信。然而,模型规范在相反的方向上起作用,因为使用并行环境将导致策略(和/或转换)的执行速度更快,因为这些操作将被向量化。
注意
选择收集器(或并行环境)的设备:进程之间的数据共享是通过与并行环境和在 CPU 上执行的多进程环境共享内存缓冲区来实现的。根据所用机器的功能,与在 GPU 上共享数据(cuda 驱动程序原生支持)相比,这可能慢得令人难以接受。实际上,这意味着当构建并行环境或收集器时,使用 device="cpu"
关键字参数可能会导致比在可用时使用 device="cuda"
更慢的收集速度。
注意
鉴于库的许多可选依赖项(例如,Gym、Gymnasium 和许多其他项),警告在多进程/分布式设置中可能很快变得非常烦人。默认情况下,TorchRL 会过滤掉子进程中的这些警告。如果仍然希望看到这些警告,可以通过设置 torchrl.filter_warnings_subprocess=False
来显示它们。
|
具有 torch.distributed 后端的分布式数据收集器。 |
|
基于 RPC 的分布式数据收集器。 |
|
具有 torch.distributed 后端的分布式同步数据收集器。 |
|
submitit 的延迟启动器。 |
|
具有 Ray 后端的分布式数据收集器。 |
辅助函数¶
|
用于轨迹分离的实用函数。 |