注意
前往末尾以下载完整示例代码。
数据收集和存储入门¶
作者: Vincent Moens
注意
若要在 notebook 中运行本教程,请在开头添加一个包含以下内容的安装单元格:
!pip install tensordict !pip install torchrl
没有数据就没有学习。在监督学习中,用户习惯于使用 DataLoader
等工具将数据集成到他们的训练循环中。Dataloader 是可迭代对象,它们为你提供将用于训练模型的数据。
TorchRL 以类似的方式处理数据加载问题,尽管在 RL 库的生态系统中它非常独特。TorchRL 的数据加载器被称为 DataCollectors
。大多数情况下,数据收集不会止步于原始数据的收集,因为数据需要临时存储在缓冲区(或用于在线策略算法的等效结构)中,然后才能被 损失模块 使用。本教程将探讨这两个类。
数据收集器¶
这里讨论的主要数据收集器是 SyncDataCollector
,这是本文档的重点。从根本上讲,收集器是一个简单的类,负责在环境中执行你的策略,在必要时重置环境,并提供预定义大小的批次。与 rollout()
方法(在 环境教程 中演示)不同,收集器在连续的数据批次之间不会重置。因此,两个连续的数据批次可能包含来自同一轨迹的元素。
你需要传递给收集器的基本参数是你想要收集的批次大小 (frames_per_batch
)、迭代器的长度(可能是无限的)、策略和环境。为了简单起见,在此示例中我们将使用一个虚拟的随机策略。
import torch
torch.manual_seed(0)
from torchrl.collectors import SyncDataCollector
from torchrl.envs import GymEnv
from torchrl.envs.utils import RandomPolicy
env = GymEnv("CartPole-v1")
env.set_seed(0)
policy = RandomPolicy(env.action_spec)
collector = SyncDataCollector(env, policy, frames_per_batch=200, total_frames=-1)
现在我们期望我们的收集器无论收集期间发生什么,都会交付大小为 200
的批次。换句话说,在此批次中我们可能有多个轨迹!total_frames
指示收集器应该运行多长时间。值 -1
将产生一个永不结束的收集器。
让我们迭代收集器,以了解这些数据的样子
for data in collector:
print(data)
break
TensorDict(
fields={
action: Tensor(shape=torch.Size([200, 2]), device=cpu, dtype=torch.int64, is_shared=False),
collector: TensorDict(
fields={
traj_ids: Tensor(shape=torch.Size([200]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([200]),
device=None,
is_shared=False),
done: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([200, 4]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([200]),
device=None,
is_shared=False),
observation: Tensor(shape=torch.Size([200, 4]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([200, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([200]),
device=None,
is_shared=False)
如你所见,我们的数据被一些收集器特定的元数据增强了,这些元数据分组在一个 "collector"
子 tensordict 中,我们在 环境 rollout 期间没有看到。这对于跟踪轨迹 ID 非常有用。在以下列表中,每个项目标记了相应转换所属的轨迹编号
print(data["collector", "traj_ids"])
tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2,
2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3,
3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4,
4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 5, 5, 5,
5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6,
6, 6, 6, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,
7, 7, 7, 7, 7, 7, 7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 9, 9,
9, 9, 9, 9, 9, 9, 9, 9])
当涉及到编写最先进的算法时,数据收集器非常有用,因为性能通常是通过特定技术在给定数量的环境交互(收集器中的 total_frames
参数)中解决问题的能力来衡量的。因此,我们示例中的大多数训练循环看起来都像这样
>>> for data in collector:
... # your algorithm here
回放缓冲区¶
现在我们已经探讨了如何收集数据,我们想知道如何存储它。在 RL 中,典型的设置是数据被收集、临时存储并在一段时间后根据一些启发式方法清除:先进先出或其他。一个典型的伪代码看起来像这样
>>> for data in collector:
... storage.store(data)
... for i in range(n_optim):
... sample = storage.sample()
... loss_val = loss_fn(sample)
... loss_val.backward()
... optim.step() # etc
在 TorchRL 中存储数据的父类被称为 ReplayBuffer
。TorchRL 的回放缓冲区是可组合的:你可以编辑存储类型、它们的采样技术、写入启发式方法或应用于它们的转换。我们将把花哨的东西留给专门的深入教程。通用回放缓冲区只需要知道它必须使用什么存储。一般来说,我们建议使用 TensorStorage
子类,在大多数情况下它都能正常工作。在本教程中,我们将使用 LazyMemmapStorage
,它具有两个不错的特性:首先,由于是“惰性的”,你不需要提前明确告诉它你的数据是什么样的。其次,它使用 MemoryMappedTensor
作为后端,以有效的方式将你的数据保存在磁盘上。你唯一需要知道的是你希望缓冲区有多大。
from torchrl.data.replay_buffers import LazyMemmapStorage, ReplayBuffer
buffer = ReplayBuffer(storage=LazyMemmapStorage(max_size=1000))
填充缓冲区可以通过 add()
(单个元素)或 extend()
(多个元素)方法完成。使用我们刚刚收集的数据,我们一次性初始化并填充缓冲区
indices = buffer.extend(data)
我们可以检查缓冲区现在拥有的元素数量与我们从收集器中获得的元素数量相同
assert len(buffer) == collector.frames_per_batch
剩下唯一需要知道的是如何从缓冲区收集数据。自然,这依赖于 sample()
方法。因为我们没有指定采样必须在没有重复的情况下进行,所以不能保证从我们的缓冲区收集的样本是唯一的
sample = buffer.sample(batch_size=30)
print(sample)
TensorDict(
fields={
action: Tensor(shape=torch.Size([30, 2]), device=cpu, dtype=torch.int64, is_shared=False),
collector: TensorDict(
fields={
traj_ids: Tensor(shape=torch.Size([30]), device=cpu, dtype=torch.int64, is_shared=False)},
batch_size=torch.Size([30]),
device=cpu,
is_shared=False),
done: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
next: TensorDict(
fields={
done: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
observation: Tensor(shape=torch.Size([30, 4]), device=cpu, dtype=torch.float32, is_shared=False),
reward: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([30]),
device=cpu,
is_shared=False),
observation: Tensor(shape=torch.Size([30, 4]), device=cpu, dtype=torch.float32, is_shared=False),
terminated: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False),
truncated: Tensor(shape=torch.Size([30, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
batch_size=torch.Size([30]),
device=cpu,
is_shared=False)
再次,我们的样本看起来与我们从收集器收集的数据完全相同!
下一步¶
你可以看看其他多进程收集器,例如
MultiSyncDataCollector
或MultiaSyncDataCollector
。如果你有多个节点用于推理,TorchRL 还提供分布式收集器。在 API 参考 中查看它们。
查看专门的 回放缓冲区教程,以了解更多关于构建缓冲区时可用的选项,或者查看 API 参考,其中详细介绍了所有功能。回放缓冲区有无数功能,例如多线程采样、优先经验回放等等……
为了简单起见,我们省略了回放缓冲区可迭代的容量。自己尝试一下:构建一个缓冲区并在构造函数中指示其批次大小,然后尝试迭代它。这相当于在循环中调用
rb.sample()
!
脚本总运行时间: (0 分 22.051 秒)
估计内存使用量: 321 MB