注意
转到末尾 下载完整的示例代码。
开始使用数据收集和存储¶
作者: Vincent Moens
注意
要在笔记本中运行本教程,请在开头添加一个安装单元格,其中包含
!pip install tensordict !pip install torchrl
没有数据就没有学习。在监督学习中,用户习惯使用 DataLoader
等来将数据整合到他们的训练循环中。数据加载器是可迭代对象,它们为您提供用于训练模型的数据。
TorchRL 以类似的方式处理数据加载问题,尽管它在 RL 库的生态系统中非常独特。TorchRL 的数据加载器被称为 DataCollectors
。大多数情况下,数据收集不会止步于原始数据的收集,因为数据需要暂时存储在缓冲区中(或在策略算法的等效结构中),然后再由 损失模块 使用。本教程将探讨这两个类。
数据收集器¶
这里讨论的主要数据收集器是 SyncDataCollector
,它是本文档的重点。在基本层面上,收集器是一个简单的类,负责在环境中执行您的策略,在必要时重置环境,并提供预定义大小的批次。与 rollout()
方法在 环境教程 中演示的不同,收集器在连续数据批次之间不会重置。因此,两个连续的数据批次可能包含来自同一轨迹的元素。
您需要传递给收集器的基本参数是您想要收集的批次的大小 (frames_per_batch
)、迭代器的长度(可能无限)、策略和环境。为简单起见,我们将在本示例中使用一个虚拟的随机策略。
import torch
torch.manual_seed(0)
from torchrl.collectors import SyncDataCollector
from torchrl.envs import GymEnv
from torchrl.envs.utils import RandomPolicy
env = GymEnv("CartPole-v1")
env.set_seed(0)
policy = RandomPolicy(env.action_spec)
collector = SyncDataCollector(env, policy, frames_per_batch=200, total_frames=-1)
现在,无论在收集过程中发生什么,我们都希望我们的收集器提供大小为 200
的批次。换句话说,我们可能在这个批次中有多个轨迹!total_frames
指示收集器应该运行多长时间。值为 -1
将产生一个永不结束的收集器。
让我们迭代收集器,了解这些数据的外观
for data in collector:
print(data)
break
TensorDict(
fields={
action: Tensor(shape=torch.Size([200, 2]), device=cpu, dtype=torch.int64, is_shared=False),
collector: TensorDict(
fields={
traj_ids: Tensor(shape=torch.Size([200]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([200]),
device=None,
is_shared=False),
done: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([200, 4]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([200]),
device=None,
is_shared=False),
observation: Tensor(shape=torch.Size([200, 4]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([200]),
device=None,
is_shared=False)
如您所见,我们的数据在 "collector"
子张量字典中添加了一些收集器特定的元数据,这些元数据在 环境回放 中没有看到。这对于跟踪轨迹 ID 很有用。在以下列表中,每个项目都标记了相应转换所属的轨迹编号
print(data["collector", "traj_ids"])
tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2,
2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3,
3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 5, 5, 5,
5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6,
6, 6, 6, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,
7, 7, 7, 7, 7, 7, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9,
9, 9, 9, 9, 9, 9, 9, 9])
数据收集器在编码最先进的算法时非常有用,因为性能通常通过特定技术在给定次数的环境交互中解决问题的能力来衡量(收集器中的 total_frames
参数)。出于这个原因,我们示例中的大多数训练循环如下所示
>>> for data in collector:
... # your algorithm here
回放缓冲区¶
现在我们已经探讨了如何收集数据,我们想知道如何存储数据。在 RL 中,典型的设置是收集数据、临时存储数据,然后根据一些启发式方法在一段时间后清除数据:先进先出或其他。典型的伪代码如下所示
>>> for data in collector:
... storage.store(data)
... for i in range(n_optim):
... sample = storage.sample()
... loss_val = loss_fn(sample)
... loss_val.backward()
... optim.step() # etc
在 TorchRL 中存储数据的父类被称为 ReplayBuffer
。TorchRL 的回放缓冲区是可组合的:您可以编辑存储类型、它们的采样技术、写入启发式方法或对其应用的变换。我们将把这些高级内容留到专门的深入教程中。通用回放缓冲区只需要知道它要使用什么存储。一般来说,我们推荐一个 TensorStorage
子类,这在大多数情况下都能正常工作。我们将在本教程中使用 LazyMemmapStorage
,它具有两个很好的属性:首先,它“懒惰”,您不需要事先明确告诉它数据的外观。其次,它使用 MemoryMappedTensor
作为后端,以高效的方式将数据保存在磁盘上。您唯一需要知道的是缓冲区需要多大。
from torchrl.data.replay_buffers import LazyMemmapStorage, ReplayBuffer
buffer = ReplayBuffer(storage=LazyMemmapStorage(max_size=1000))
可以通过 add()
(单个元素)或 extend()
(多个元素)方法填充缓冲区。使用我们刚刚收集的数据,我们一步到位地初始化并填充缓冲区
indices = buffer.extend(data)
我们可以检查缓冲区现在是否具有与从收集器获得的元素数量相同的元素
assert len(buffer) == collector.frames_per_batch
最后要了解的是如何从缓冲区中收集数据。当然,这依赖于 sample()
方法。因为我们没有指定采样必须在没有重复的情况下完成,所以不能保证从缓冲区收集的样本将是唯一的
sample = buffer.sample(batch_size=30)
print(sample)
TensorDict(
fields={
action: Tensor(shape=torch.Size([30, 2]), device=cpu, dtype=torch.int64, is_shared=False),
collector: TensorDict(
fields={
traj_ids: Tensor(shape=torch.Size([30]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([30]),
device=cpu,
is_shared=False),
done: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([30, 4]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([30]),
device=cpu,
is_shared=False),
observation: Tensor(shape=torch.Size([30, 4]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([30]),
device=cpu,
is_shared=False)
同样,我们的样本与我们从收集器收集的数据完全一样!
下一步¶
您可以查看其他多进程收集器,例如
MultiSyncDataCollector
或MultiaSyncDataCollector
。如果您有多个节点可用于推理,TorchRL 还提供分布式收集器。在 API 参考 中查看它们。
查看专门的 回放缓冲区教程,了解有关构建缓冲区时可使用选项的更多信息,或查看 API 参考,它详细介绍了所有功能。回放缓冲区具有无数功能,例如多线程采样、优先体验回放等等……
为了简单起见,我们省略了对重放缓冲区的迭代容量。您可以自己尝试:构建一个缓冲区并在构造函数中指定其批次大小,然后尝试对其进行迭代。这相当于在循环中调用
rb.sample()
!
脚本总运行时间:(0 分钟 20.853 秒)
估计内存使用量:318 MB