快捷方式

torchrl.data 包

回放缓冲区

回放缓冲区是离策略 RL 算法的核心部分。TorchRL 提供了一些广泛使用的回放缓冲区的有效实现。

ReplayBuffer(*[, storage, sampler, writer, ...])

一个通用且可组合的回放缓冲区类。

PrioritizedReplayBuffer(*, alpha, beta[, ...])

优先回放缓冲区。

TensorDictReplayBuffer(*[, priority_key])

ReplayBuffer 类的 TensorDict 特定包装器。

TensorDictPrioritizedReplayBuffer(*, alpha, beta)

PrioritizedReplayBuffer 类的 TensorDict 特定包装器。

可组合的回放缓冲区

我们还允许用户组合回放缓冲区。我们提供了多种回放缓冲区使用方案,包括对几乎所有数据类型的支持;内存、设备或物理内存中的存储;多种采样策略;转换的使用等。

支持的数据类型和存储选择

理论上,回放缓冲区支持任何数据类型,但我们无法保证每个组件都支持任何数据类型。最简单的回放缓冲区实现由一个具有 ListStorage 存储的 ReplayBuffer 基础组成。这非常低效,但它允许您存储具有非张量数据的复杂数据结构。连续内存中的存储包括 TensorStorageLazyTensorStorageLazyMemmapStorage。这些类将 TensorDict 数据作为一等公民支持,但也支持任何 PyTree 数据结构(例如,元组、列表、字典和这些结构的嵌套版本)。TensorStorage 存储需要您在构造时提供存储,而 TensorStorage(RAM、CUDA)和 LazyMemmapStorage(物理内存)将在第一次扩展后为您预分配存储。

以下是一些示例,从通用的 ListStorage 开始

>>> from torchrl.data.replay_buffers import ReplayBuffer, ListStorage
>>> rb = ReplayBuffer(storage=ListStorage(10))
>>> rb.add("a string!") # first element will be a string
>>> rb.extend([30, None])  # element [1] is an int, [2] is None

写入缓冲区的入口点主要是 add()extend()。还可以使用 __setitem__(),在这种情况下,数据将写入指定位置,而不会更新缓冲区的长度或游标。这在从缓冲区采样项目并在之后就地更新其值时非常有用。

使用 TensorStorage,我们告诉我们的 RB 我们希望存储是连续的,这效率要高得多,但也更具限制性。

>>> import torch
>>> from torchrl.data.replay_buffers import ReplayBuffer, TensorStorage
>>> container = torch.empty(10, 3, 64, 64, dtype=torch.unit8)
>>> rb = ReplayBuffer(storage=TensorStorage(container))
>>> img = torch.randint(255, (3, 64, 64), dtype=torch.uint8)
>>> rb.add(img)

接下来,我们可以避免创建容器并要求存储自动执行此操作。在使用 PyTrees 和 tensordicts 时这非常有用!对于 PyTrees 等其他数据结构,add() 将传递给它的采样视为该类型的一个实例。extend() 另一方面会认为数据是可迭代的。对于张量、tensordicts 和列表(见下文),可迭代项在根级别查找。对于 PyTrees,我们假设树中所有叶子(张量)的领先维度都匹配。如果它们不匹配,extend 将抛出异常。

>>> import torch
>>> from tensordict import TensorDict
>>> from torchrl.data.replay_buffers import ReplayBuffer, LazyMemmapStorage
>>> rb_td = ReplayBuffer(storage=LazyMemmapStorage(10), batch_size=1)  # max 10 elements stored
>>> rb_td.add(TensorDict({"img": torch.randint(255, (3, 64, 64), dtype=torch.unit8),
...     "labels": torch.randint(100, ())}, batch_size=[]))
>>> rb_pytree = ReplayBuffer(storage=LazyMemmapStorage(10))  # max 10 elements stored
>>> # extend with a PyTree where all tensors have the same leading dim (3)
>>> rb_pytree.extend({"a": {"b": torch.randn(3), "c": [torch.zeros(3, 2), (torch.ones(3, 10),)]}})
>>> assert len(rb_pytree) == 3  # the replay buffer has 3 elements!

注意

在处理值列表时,extend() 可能具有模棱两可的签名,这些值列表应解释为 PyTree(在这种情况下,列表中的所有元素都将被放入存储的 PyTree 中的切片)或要逐个添加的值列表。为了解决这个问题,TorchRL 对列表和元组做出了明确的区分:元组将被视为 PyTree,列表(在根级别)将被解释为要逐个添加到缓冲区的值的堆栈。

采样和索引

回放缓冲区可以被索引和采样。索引和采样在存储中的给定索引处收集数据,然后通过一系列转换和collate_fn进行处理,这些转换和collate_fn可以传递给回放缓冲区的__init__函数。collate_fn带有默认值,在大多数情况下应该符合用户的预期,因此在大多数情况下您不必担心它。转换通常是Transform的实例,尽管常规函数也可以工作(在后一种情况下,inv()方法显然会被忽略,而在前一种情况下,它可以用来在数据传递到缓冲区之前对其进行预处理)。最后,可以通过将线程数通过prefetch关键字参数传递给构造函数来使用多线程实现采样。我们建议用户在实际应用中对这种技术进行基准测试后再采用,因为不能保证它在实践中会带来更快的吞吐量,这取决于使用的机器和环境。

在采样时,batch_size可以在构造期间传递(例如,如果它在整个训练过程中是恒定的),也可以传递给sample()方法。

为了进一步优化采样策略,我们建议您查看我们的采样器!

以下是一些从回放缓冲区获取数据的示例

>>> first_elt = rb_td[0]
>>> storage = rb_td[:] # returns all valid elements from the buffer
>>> sample = rb_td.sample(128)
>>> for data in rb_td:  # iterate over the buffer using the sampler -- batch-size was set in the constructor to 1
...     print(data)

使用以下组件

FlatStorageCheckpointer([done_keys, reward_keys])

以紧凑的形式保存存储,节省 TED 格式的空间。

H5StorageCheckpointer(*[, checkpoint_file, ...])

以紧凑的形式保存存储,节省 TED 格式的空间,并使用 H5 格式保存数据。

ImmutableDatasetWriter()

用于不可变数据集的阻塞写入器。

LazyMemmapStorage(max_size, *[, ...])

用于张量和张量字典的内存映射存储。

LazyTensorStorage(max_size, *[, device, ndim])

用于张量和张量字典的预分配张量存储。

ListStorage(max_size)

存储在列表中的存储。

ListStorageCheckpointer()

ListStoage 的存储检查点。

NestedStorageCheckpointer([done_keys, ...])

以紧凑的形式保存存储,节省 TED 格式的空间并使用内存映射嵌套张量。

PrioritizedSampler(max_capacity, alpha, beta)

回放缓冲区的优先采样器。

PrioritizedSliceSampler(max_capacity, alpha, ...)

根据开始和停止信号,沿第一个维度对数据进行切片采样,使用优先采样。

RandomSampler()

用于可组合回放缓冲区的均匀随机采样器。

RoundRobinWriter(**kw)

用于可组合回放缓冲区的循环写入器类。

Sampler()

用于可组合回放缓冲区的通用采样器基类。

SamplerWithoutReplacement([drop_last, shuffle])

一个消耗数据的采样器,确保连续批次中不存在相同的样本。

SliceSampler(*[, num_slices, slice_len, ...])

根据开始和停止信号,沿第一个维度对数据进行切片采样。

SliceSamplerWithoutReplacement(*[, ...])

根据开始和停止信号,沿第一个维度对数据进行切片采样,不放回。

Storage(max_size[, checkpointer])

存储是回放缓冲区的容器。

StorageCheckpointerBase()

存储检查点的公共基类。

StorageEnsembleCheckpointer()

集成存储的检查点。

TensorDictMaxValueWriter([rank_key, reduction])

用于可组合回放缓冲区的写入器类,根据某些排序键保留顶部元素。

TensorDictRoundRobinWriter(**kw)

用于可组合、基于张量字典的回放缓冲区的循环写入器类。

TensorStorage(storage[, max_size, device, ndim])

用于张量和张量字典的存储。

TensorStorageCheckpointer()

TensorStorages 的存储检查点。

Writer()

回放缓冲区的基写入器类。

存储的选择对回放缓冲区的采样延迟影响很大,尤其是在具有更大数据量的分布式强化学习环境中。LazyMemmapStorage 在具有共享存储的分布式环境中非常建议使用,因为 MemoryMappedTensors 的序列化成本更低,并且能够指定文件存储位置以改进节点故障恢复。以下是在 https://github.com/pytorch/rl/tree/main/benchmarks/storage 中进行的粗略基准测试中发现的,与使用 ListStorage 相比,平均采样延迟的改进。

存储类型

加速

ListStorage

1x

LazyTensorStorage

1.83x

LazyMemmapStorage

3.44x

跨进程共享回放缓冲区

只要回放缓冲区的组件是可共享的,它们就可以在进程之间共享。此功能允许多个进程协作收集数据并填充共享的回放缓冲区,而不是将数据集中在主进程上,这可能会产生一些数据传输开销。

可共享的存储包括 LazyMemmapStorageTensorStorage 的任何子类,只要它们被实例化并且其内容存储为内存映射张量。有状态的写入器,例如 TensorDictRoundRobinWriter,目前不可共享,有状态的采样器(如 PrioritizedSampler)也是如此。

共享的回放缓冲区可以在任何有权访问它的进程上读取和扩展,如下例所示

>>> from torchrl.data import TensorDictReplayBuffer, LazyMemmapStorage
>>> import torch
>>> from torch import multiprocessing as mp
>>> from tensordict import TensorDict
>>>
>>> def worker(rb):
...     # Updates the replay buffer with new data
...     td = TensorDict({"a": torch.ones(10)}, [10])
...     rb.extend(td)
...
>>> if __name__ == "__main__":
...     rb = TensorDictReplayBuffer(storage=LazyMemmapStorage(21))
...     td = TensorDict({"a": torch.zeros(10)}, [10])
...     rb.extend(td)
...
...     proc = mp.Process(target=worker, args=(rb,))
...     proc.start()
...     proc.join()
...     # the replay buffer now has a length of 20, since the worker updated it
...     assert len(rb) == 20
...     assert (rb["_data", "a"][:10] == 0).all()  # data from main process
...     assert (rb["_data", "a"][10:20] == 1).all()  # data from remote process

存储轨迹

将轨迹存储在回放缓冲区中并不困难。需要注意的一点是,回放缓冲区的大小默认情况下是存储的领先维度的尺寸:换句话说,当存储多维数据时,创建一个存储大小为 1M 的回放缓冲区并不意味着存储 1M 帧,而是存储 1M 条轨迹。但是,如果在存储之前先将轨迹(或剧集/展开)展平,则容量仍然是 1M 步。

有一种方法可以规避这个问题,那就是告诉存储在保存数据时应该考虑多少维度。这可以通过 ndim 关键字参数来实现,所有连续存储(例如 TensorStorage 及类似存储)都接受该参数。当将多维存储传递给缓冲区时,缓冲区会自动将最后一个维度视为“时间”维度,这在 TorchRL 中是惯例。这可以通过 ReplayBuffer 中的 dim_extend 关键字参数来覆盖。这是保存通过 ParallelEnv 或其串行对应版本获得轨迹的推荐方法,我们将在下面看到。

在采样轨迹时,可能希望采样子轨迹以使学习多样化或提高采样效率。TorchRL 提供了两种不同的方法来实现这一点。

  • SliceSampler 允许沿 TensorStorage 的前导维度依次采样给定数量的轨迹片段。这是在 TorchRL 中采样子轨迹的推荐方法,__尤其__是在使用离线数据集(使用该约定进行存储)时。此策略要求在扩展回放缓冲区之前展平轨迹,并在采样后重新整形它们。 SliceSampler 类文档字符串提供了有关此存储和采样策略的详细说明。请注意,SliceSampler 与多维存储兼容。以下示例展示了如何在使用和不使用 tensordict 展平的情况下使用此功能。在第一种情况下,我们正在从单个环境收集数据。在这种情况下,我们对沿第一个维度连接传入数据的存储感到满意,因为收集计划不会引入中断。

    >>> from torchrl.envs import TransformedEnv, StepCounter, GymEnv
    >>> from torchrl.collectors import SyncDataCollector, RandomPolicy
    >>> from torchrl.data import ReplayBuffer, LazyTensorStorage, SliceSampler
    >>> env = TransformedEnv(GymEnv("CartPole-v1"), StepCounter())
    >>> collector = SyncDataCollector(env,
    ...     RandomPolicy(env.action_spec),
    ...     frames_per_batch=10, total_frames=-1)
    >>> rb = ReplayBuffer(
    ...     storage=LazyTensorStorage(100),
    ...     sampler=SliceSampler(num_slices=8, traj_key=("collector", "traj_ids"),
    ...         truncated_key=None, strict_length=False),
    ...     batch_size=64)
    >>> for i, data in enumerate(collector):
    ...     rb.extend(data)
    ...     if i == 10:
    ...         break
    >>> assert len(rb) == 100, len(rb)
    >>> print(rb[:]["next", "step_count"])
    tensor([[32],
            [33],
            [34],
            [35],
            [36],
            [37],
            [38],
            [39],
            [40],
            [41],
            [11],
            [12],
            [13],
            [14],
            [15],
            [16],
            [17],
            [...
    

    如果在一个批次中运行多个环境,我们仍然可以通过调用 data.reshape(-1) 将数据存储在与之前相同的缓冲区中,这会将 [B, T] 大小展平为 [B * T],但这意味着批次中第一个环境的轨迹将与其他环境的轨迹交错,SliceSampler 无法处理这种情况。为了解决这个问题,我们建议在存储构造函数中使用 ndim 参数。

    >>> env = TransformedEnv(SerialEnv(2,
    ...     lambda: GymEnv("CartPole-v1")), StepCounter())
    >>> collector = SyncDataCollector(env,
    ...     RandomPolicy(env.action_spec),
    ...     frames_per_batch=1, total_frames=-1)
    >>> rb = ReplayBuffer(
    ...     storage=LazyTensorStorage(100, ndim=2),
    ...     sampler=SliceSampler(num_slices=8, traj_key=("collector", "traj_ids"),
    ...         truncated_key=None, strict_length=False),
    ...     batch_size=64)
    >>> for i, data in enumerate(collector):
    ...     rb.extend(data)
    ...     if i == 100:
    ...         break
    >>> assert len(rb) == 100, len(rb)
    >>> print(rb[:]["next", "step_count"].squeeze())
    tensor([[ 6,  5],
            [ 2,  2],
            [ 3,  3],
            [ 4,  4],
            [ 5,  5],
            [ 6,  6],
            [ 7,  7],
            [ 8,  8],
            [ 9,  9],
            [10, 10],
            [11, 11],
            [12, 12],
            [13, 13],
            [14, 14],
            [15, 15],
            [16, 16],
            [17, 17],
            [18,  1],
            [19,  2],
            [...
    
  • 轨迹也可以独立存储,其中前导维度的每个元素都指向不同的轨迹。这要求轨迹具有一致的形状(或进行填充)。我们提供了一个名为 RandomCropTensorDict 的自定义 Transform 类,允许在缓冲区中采样子轨迹。请注意,与基于 SliceSampler 的策略不同,这里不需要具有指向开始和停止信号的 "episode""done" 键。以下是如何使用此类的示例。

回放缓冲区的检查点

回放缓冲区的每个组件都可能是有状态的,因此需要一种专门的方法来进行序列化。我们的回放缓冲区提供了两个独立的 API 来将其状态保存到磁盘:dumps()loads() 将使用内存映射张量保存每个组件(除了转换(存储、写入器、采样器))的数据,并使用 JSON 文件保存元数据。

这适用于所有类,除了 ListStorage,因为其内容无法预测(因此不符合内存映射数据结构,例如 tensordict 库中的数据结构)。

此 API 保证保存然后重新加载的缓冲区将处于完全相同的状态,无论我们查看其采样器(例如,优先级树)的状态、写入器(例如,最大写入器堆)还是存储的状态。

在底层,对 dumps() 的简单调用只会为其每个组件(除了我们通常不假设使用内存映射张量进行序列化的转换)在特定文件夹中调用公共的 dumps 方法。

然而,以 TED 格式 保存数据可能比需要消耗更多的内存。如果连续轨迹存储在缓冲区中,我们可以通过在根目录保存所有观察结果,以及仅保存 “next” 子 tensordict 的观察结果的最后一个元素来避免保存重复的观察结果,这可以将存储消耗减少多达两倍。为了启用此功能,提供了三个检查点类:FlatStorageCheckpointer 将丢弃重复的观察结果以压缩 TED 格式。在加载时,此类将以正确的格式重新写入观察结果。如果缓冲区保存在磁盘上,则此检查点执行的操作不需要任何额外的 RAM。 NestedStorageCheckpointer 将使用嵌套张量保存轨迹,使数据表示更清晰(第一个维度上的每个项目都表示一个不同的轨迹)。最后, H5StorageCheckpointer 将以 H5DB 格式保存缓冲区,使用户能够压缩数据并节省更多空间。

警告

检查点对回放缓冲区做了一些限制性假设。首先,假设 done 状态准确地表示轨迹的结束(除了写入的最后一个轨迹,写入器光标指示在何处放置截断信号)。对于 MARL 用法,需要注意的是,只有与根 tensordict 具有相同元素数量的 done 状态才允许:如果 done 状态具有存储的批次大小中未表示的其他元素,则这些检查点将失败。例如,在形状为 torch.Size([3, 4, 5]) 的存储中,形状为 torch.Size([3, 4]) 的 done 状态是不允许的。

以下是如何在实践中使用 H5DB 检查点的具体示例。

>>> from torchrl.data import ReplayBuffer, H5StorageCheckpointer, LazyMemmapStorage
>>> from torchrl.collectors import SyncDataCollector
>>> from torchrl.envs import GymEnv, SerialEnv
>>> import torch
>>> env = SerialEnv(3, lambda: GymEnv("CartPole-v1", device=None))
>>> env.set_seed(0)
>>> torch.manual_seed(0)
>>> collector = SyncDataCollector(
>>>     env, policy=env.rand_step, total_frames=200, frames_per_batch=22
>>> )
>>> rb = ReplayBuffer(storage=LazyMemmapStorage(100, ndim=2))
>>> rb_test = ReplayBuffer(storage=LazyMemmapStorage(100, ndim=2))
>>> rb.storage.checkpointer = H5StorageCheckpointer()
>>> rb_test.storage.checkpointer = H5StorageCheckpointer()
>>> for i, data in enumerate(collector):
...     rb.extend(data)
...     assert rb._storage.max_size == 102
...     rb.dumps(path_to_save_dir)
...     rb_test.loads(path_to_save_dir)
...     assert_allclose_td(rb_test[:], rb[:])

每当无法使用 dumps() 保存数据时,可以使用 state_dict() 作为替代方法,它返回一个可以使用 torch.save() 保存并使用 torch.load() 加载的数据结构,然后再调用 load_state_dict()。此方法的缺点是它难以保存大型数据结构,这在使用回放缓冲区时很常见。

TorchRL 剧集数据格式 (TED)

在 TorchRL 中,顺序数据始终以特定格式呈现,称为 TorchRL 剧集数据格式 (TED)。此格式对于 TorchRL 中各个组件的无缝集成和功能至关重要。

某些组件(例如回放缓冲区)对数据格式有些漠不关心。但是,其他组件,特别是环境,在很大程度上依赖于它才能平稳运行。

因此,了解 TED、其用途以及如何与之交互至关重要。本指南将提供对 TED 的清晰解释,说明其使用原因以及如何有效地使用它。

TED 背后的原理

格式化顺序数据可能是一项复杂的任务,尤其是在强化学习 (RL) 领域。作为从业人员,我们经常遇到在重置时间(尽管并非总是如此)传递数据的情况,有时在轨迹的最后一步提供或丢弃数据。

这种可变性意味着我们可以在数据集中观察到不同长度的数据,并且并不总是立即清楚如何在该数据集的各个元素之间匹配每个时间步长。考虑以下模棱两可的数据集结构。

>>> observation.shape
[200, 3]
>>> action.shape
[199, 4]
>>> info.shape
[200, 3]

乍一看,似乎 info 和 observation 是同时传递的(在重置时各有一个 + 在每次 step 调用时各有一个),如 action 元素少一个所暗示的那样。但是,如果 info 元素少一个,我们必须假设它是在重置时被省略了,或者没有传递或记录轨迹的最后一步。如果没有对数据结构的正确文档,就无法确定哪个 info 对应于哪个时间步长。

更复杂的是,一些数据集提供了不一致的数据格式,其中在 rollouts 的开始或结束时缺少 observationsinfos,这种行为通常没有记录在案。TED 的主要目标是通过提供清晰一致的数据表示来消除这些歧义。

TED 的结构

TED 构建在强化学习 (RL) 上下文中马尔可夫决策过程 (MDP) 的规范定义之上。在每个步骤中,观察结果会决定一个动作,该动作会导致 (1) 新的观察结果,(2) 任务完成的指示器(终止、截断、完成)以及 (3) 奖励信号。

某些元素可能缺失(例如,在模仿学习上下文中,奖励是可选的),或者其他信息可以通过状态或信息容器传递。在某些情况下,需要其他信息才能在调用 step 时获取观察结果(例如,在无状态环境模拟器中)。此外,在某些情况下,“动作”(或任何其他数据)不能表示为单个张量,需要以不同的方式组织。例如,在多智能体 RL 设置中,动作、观察结果、奖励和完成信号可能是复合的。

TED 通过单一、统一且明确的格式来适应所有这些场景。我们通过在执行动作时设置一个限制来区分时间步长 tt+1 发生的情况。换句话说,在调用 env.step 之前存在的所有内容都属于 t,之后出现的所有内容都属于 t+1

一般规则是,属于时间步长 t 的所有内容都存储在 tensordict 的根目录下,而属于 t+1 的所有内容都存储在 tensordict 的 "next" 条目中。以下是一个示例

>>> data = env.reset()
>>> data = policy(data)
>>> print(env.step(data))
TensorDict(
    fields={
        action: Tensor(...),  # The action taken at time t
        done: Tensor(...),  # The done state when the action was taken (at reset)
        next: TensorDict(  # all of this content comes from the call to `step`
            fields={
                done: Tensor(...),  # The done state after the action has been taken
                observation: Tensor(...),  # The observation resulting from the action
                reward: Tensor(...),  # The reward resulting from the action
                terminated: Tensor(...),  # The terminated state after the action has been taken
                truncated: Tensor(...),  # The truncated state after the action has been taken
            batch_size=torch.Size([]),
            device=cpu,
            is_shared=False),
        observation: Tensor(...),  # the observation at reset
        terminated: Tensor(...),  # the terminated at reset
        truncated: Tensor(...),  # the truncated at reset
    batch_size=torch.Size([]),
    device=cpu,
    is_shared=False)

在进行 rollout(使用 EnvBaseSyncDataCollector)时,当智能体重置其步数计数时,"next" tensordict 的内容通过 step_mdp() 函数被带到根目录:t <- t+1。您可以在 此处 了解更多有关环境 API 的信息。

在大多数情况下,根目录中没有 True 值的 "done" 状态,因为任何 done 状态都会触发(部分)重置,这会将 "done" 更改为 False。但是,这仅在自动执行重置时才成立。在某些情况下,部分重置不会触发重置,因此我们保留这些数据,例如,这些数据应该比观察结果具有更低的内存占用。

此格式消除了关于观察结果与其动作、信息或 done 状态匹配的任何歧义。

展平 TED 以减少内存消耗

TED 在内存中复制了两次观察结果,这可能会影响在实践中使用此格式的可行性。由于它主要用于表示的方便性,因此可以以扁平的方式存储数据,但在训练期间将其表示为 TED。

这在序列化回放缓冲区时特别有用:例如,TED2Flat 类确保 TED 格式的数据结构在写入磁盘之前被展平,而 Flat2TED 加载钩子将在反序列化期间展开此结构。

Tensordict 的维度

在 rollout 期间,所有收集到的 tensordict 将沿着一个新的维度堆叠,该维度位于末尾。收集器和环境都将此维度标记为 "time" 名称。以下是一个示例

>>> rollout = env.rollout(10, policy)
>>> assert rollout.shape[-1] == 10
>>> assert rollout.names[-1] == "time"

这确保了时间维度被清晰地标记并在数据结构中易于识别。

特殊情况和脚注

多智能体数据表示

可以在 MARL 环境 API 部分访问多智能体数据格式化文档。

基于内存的策略(RNN 和 Transformer)

在上面提供的示例中,只有 env.step(data) 生成需要在下一步读取的数据。但是,在某些情况下,策略也会输出下一步需要的信息。对于基于 RNN 的策略,这种情况很常见,它们输出动作以及下一步需要使用的循环状态。为了适应这一点,我们建议用户调整其 RNN 策略以在 tensordict 的 "next" 条目下写入此数据。这确保了此内容将在下一步被带到根目录。更多信息可以在 GRUModuleLSTMModule 中找到。

多步

收集器允许用户在读取数据时跳过步骤,累积未来 n 步的奖励。此技术在 DQN 类算法(如 Rainbow)中很流行。 MultiStep 类对来自收集器的批次执行此数据转换。在这些情况下,以下检查将失败,因为下一个观察结果会向后移动 n 步

>>> assert (data[..., 1:]["observation"] == data[..., :-1]["next", "observation"]).all()

内存需求如何?

如果简单地实现,此数据格式消耗的内存大约是扁平表示的两倍。在某些内存密集型设置(例如,在 AtariDQNExperienceReplay 数据集中),我们仅将 T+1 观察结果存储在磁盘上,并在获取时在线执行格式化。在其他情况下,我们假设 2 倍的内存成本是为更清晰的表示而付出的少量代价。但是,将延迟表示推广到离线数据集无疑是一个有益的功能,我们欢迎对此方向的贡献!

数据集

TorchRL 提供了围绕离线 RL 数据集的包装器。这些数据以 ReplayBuffer 实例的形式呈现,这意味着可以使用转换、采样器和存储器对其进行自定义。例如,可以使用 SelectTransformExcludeTransform 将条目过滤进或过滤出数据集。

默认情况下,数据集存储为内存映射张量,允许它们以几乎没有内存占用的方式快速采样。

以下是一个示例

注意

安装依赖项是用户的责任。对于 D4RL,需要克隆 存储库,因为最新的轮子没有发布在 PyPI 上。对于 OpenML,需要 scikit-learnpandas

转换数据集

在许多情况下,原始数据不会按原样使用。自然解决方案可能是将 Transform 实例传递给数据集构造函数并在运行时修改样本。这将起作用,但会产生额外的转换运行时。如果转换可以(至少部分)预先应用于数据集,则可以节省大量的磁盘空间和一些在采样时产生的开销。为此,可以使用 preprocess()。此方法将在数据集的每个元素上运行每个样本的预处理管道,并将其转换后的版本替换现有数据集。

转换后,重新创建相同的数据集将生成另一个具有相同转换存储的对象(除非使用 download="force")。

>>> dataset = RobosetExperienceReplay(
...     "FK1-v4(expert)/FK1_MicroOpenRandom_v2d-v4", batch_size=32, download="force"
... )
>>>
>>> def func(data):
...     return data.set("obs_norm", data.get("observation").norm(dim=-1))
...
>>> dataset.preprocess(
...     func,
...     num_workers=max(1, os.cpu_count() - 2),
...     num_chunks=1000,
...     mp_start_method="fork",
... )
>>> sample = dataset.sample()
>>> assert "obs_norm" in sample.keys()
>>> # re-recreating the dataset gives us the transformed version back.
>>> dataset = RobosetExperienceReplay(
...     "FK1-v4(expert)/FK1_MicroOpenRandom_v2d-v4", batch_size=32
... )
>>> sample = dataset.sample()
>>> assert "obs_norm" in sample.keys()

BaseDatasetExperienceReplay(*[, priority_key])

离线数据集的父类。

AtariDQNExperienceReplay(dataset_id[, ...])

Atari DQN 回放经验类。

D4RLExperienceReplay(dataset_id, batch_size)

D4RL 的回放经验类。

GenDGRLExperienceReplay(dataset_id[, ...])

Gen-DGRL 回放经验数据集。

MinariExperienceReplay(dataset_id, batch_size, *)

Minari 回放经验数据集。

OpenMLExperienceReplay(name, batch_size[, ...])

OpenML 数据的回放经验。

OpenXExperienceReplay(dataset_id[, ...])

Open X-Embodiment 数据集回放经验。

RobosetExperienceReplay(dataset_id, ...[, ...])

Roboset 回放经验数据集。

VD4RLExperienceReplay(dataset_id, batch_size, *)

V-D4RL 回放经验数据集。

组合数据集

在离线强化学习中,同时使用多个数据集是很常见的做法。此外,TorchRL 通常具有细粒度的数据集命名法,其中每个任务在表示时都是分开的,而其他库则以更紧凑的方式表示这些数据集。为了允许用户将多个数据集组合在一起,我们提出了一个ReplayBufferEnsemble 原语,允许用户同时从多个数据集采样。

如果各个数据集的格式不同,则可以使用 Transform 实例。在以下示例中,我们创建了两个具有语义相同但名称不同的虚拟数据集(("some", "key")"another_key"),并展示了如何重命名它们以使其具有匹配的名称。我们还调整图像大小,以便在采样期间可以将其堆叠在一起。

>>> from torchrl.envs import Comopse, ToTensorImage, Resize, RenameTransform
>>> from torchrl.data import TensorDictReplayBuffer, ReplayBufferEnsemble, LazyMemmapStorage
>>> from tensordict import TensorDict
>>> import torch
>>> rb0 = TensorDictReplayBuffer(
...     storage=LazyMemmapStorage(10),
...     transform=Compose(
...         ToTensorImage(in_keys=["pixels", ("next", "pixels")]),
...         Resize(32, in_keys=["pixels", ("next", "pixels")]),
...         RenameTransform([("some", "key")], ["renamed"]),
...     ),
... )
>>> rb1 = TensorDictReplayBuffer(
...     storage=LazyMemmapStorage(10),
...     transform=Compose(
...         ToTensorImage(in_keys=["pixels", ("next", "pixels")]),
...         Resize(32, in_keys=["pixels", ("next", "pixels")]),
...         RenameTransform(["another_key"], ["renamed"]),
...     ),
... )
>>> rb = ReplayBufferEnsemble(
...     rb0,
...     rb1,
...     p=[0.5, 0.5],
...     transform=Resize(33, in_keys=["pixels"], out_keys=["pixels33"]),
... )
>>> data0 = TensorDict(
...     {
...         "pixels": torch.randint(255, (10, 244, 244, 3)),
...         ("next", "pixels"): torch.randint(255, (10, 244, 244, 3)),
...         ("some", "key"): torch.randn(10),
...     },
...     batch_size=[10],
... )
>>> data1 = TensorDict(
...     {
...         "pixels": torch.randint(255, (10, 64, 64, 3)),
...         ("next", "pixels"): torch.randint(255, (10, 64, 64, 3)),
...         "another_key": torch.randn(10),
...     },
...     batch_size=[10],
... )
>>> rb[0].extend(data0)
>>> rb[1].extend(data1)
>>> for _ in range(2):
...     sample = rb.sample(10)
...     assert sample["next", "pixels"].shape == torch.Size([2, 5, 3, 32, 32])
...     assert sample["pixels"].shape == torch.Size([2, 5, 3, 32, 32])
...     assert sample["pixels33"].shape == torch.Size([2, 5, 3, 33, 33])
...     assert sample["renamed"].shape == torch.Size([2, 5])

ReplayBufferEnsemble(*rbs[, storages, ...])

一个回放缓冲区的集合。

SamplerEnsemble(*samplers[, p, ...])

一个采样器的集合。

StorageEnsemble(*storages[, transforms])

一个存储的集合。

WriterEnsemble(*writers)

一个写入器的集合。

TensorSpec

TensorSpec 父类及其子类定义了 TorchRL 中观察和动作的基本属性,例如形状、设备、数据类型和域。确保您的环境规范与它发送和接收的输入和输出匹配非常重要,因为 ParallelEnv 将根据这些规范创建缓冲区来与生成进程通信。检查 torchrl.envs.utils.check_env_specs 方法以进行完整性检查。

TensorSpec(shape, space, Box], device, ...)

观察、动作和奖励的张量元数据容器的父类。

BinaryDiscreteTensorSpec(n[, shape, device, ...])

二进制离散张量规范。

BoundedTensorSpec([low, high, shape, ...])

有界连续张量规范。

CompositeSpec(*args, **kwargs)

TensorSpec 的组合。

DiscreteTensorSpec(n[, shape, device, ...])

离散张量规范。

MultiDiscreteTensorSpec(nvec[, shape, ...])

离散张量规范的连接。

MultiOneHotDiscreteTensorSpec(nvec[, shape, ...])

独热离散张量规范的连接。

NonTensorSpec([shape, device, dtype])

非张量数据的规范。

OneHotDiscreteTensorSpec(n[, shape, device, ...])

一维独热离散张量规范。

UnboundedContinuousTensorSpec([shape, ...])

无界连续张量规范。

UnboundedDiscreteTensorSpec([shape, device, ...])

无界离散张量规范。

LazyStackedTensorSpec(*specs, dim)

张量规范堆栈的延迟表示。

LazyStackedCompositeSpec(*specs, dim)

复合规范堆栈的延迟表示。

NonTensorSpec([shape, device, dtype])

非张量数据的规范。

来自人类反馈的强化学习 (RLHF)

数据在来自人类反馈的强化学习 (RLHF) 中至关重要。鉴于这些技术通常用于语言领域,而语言在库中其他 RL 子领域中很少涉及,因此我们提供了特定的实用程序来促进与外部库(如数据集)的交互。这些实用程序包括用于标记化数据、以适合 TorchRL 模块的方式格式化数据以及优化存储以实现高效采样的工具。

PairwiseDataset(chosen_data, rejected_data, ...)

PromptData(input_ids, attention_mask, ...[, ...])

PromptTensorDictTokenizer(tokenizer, max_length)

提示数据集的标记化方法。

RewardData(input_ids, attention_mask[, ...])

RolloutFromModel(model, ref_model, reward_model)

用于使用因果语言模型执行 rollout 的类。

TensorDictTokenizer(tokenizer, max_length[, ...])

用于在文本示例上应用标记器的过程函数的工厂。

TokenizedDatasetLoader(split, max_length, ...)

加载标记化数据集,并缓存其内存映射副本。

create_infinite_iterator(iterator)

无限迭代迭代器。

get_dataloader(batch_size, block_size, ...)

创建数据集并从中返回数据加载器。

ConstantKLController(*[, kl_coef, model])

恒定 KL 控制器。

AdaptiveKLController(*, init_kl_coef, ...[, ...])

自适应 KL 控制器,如 Ziegler 等人“根据人类偏好微调语言模型”中所述。

实用程序

MultiStep(gamma, n_steps)

多步奖励转换。

consolidate_spec(spec[, ...])

给定一个 TensorSpec,通过添加 0 形状的规范来删除排他性键。

check_no_exclusive_keys(spec[, recurse])

给定一个 TensorSpec,如果不存在排他性键则返回 true。

contains_lazy_spec(spec)

如果规范包含延迟堆叠规范,则返回 true。

Nested2TED([done_key, shift_key, ...])

将每行都是轨迹的嵌套 tensordict 转换为 TED 格式。

Flat2TED([done_key, shift_key, is_full_key, ...])

一个存储加载钩子,用于将扁平化的 TED 数据反序列化为 TED 格式。

H5Combine()

将持久性 tensordict 中的轨迹组合成单个存储在文件系统中的独立 tensordict。

H5Split([done_key, shift_key, is_full_key, ...])

将使用 TED2Nested 准备的数据集拆分为 TensorDict,其中每个轨迹都存储为其父嵌套张量的视图。

TED2Flat([done_key, shift_key, is_full_key, ...])

一个用于将TED数据序列化为紧凑格式的节省存储空间的钩子。

TED2Nested(*args, **kwargs)

将TED格式的数据集转换为一个tensordict,其中包含嵌套张量,每行代表一条轨迹。

MultiStepTransform(n_steps, gamma, *[, ...])

ReplayBuffers的多步转换。

文档

访问PyTorch的全面开发者文档

查看文档

教程

获取针对初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源