快捷方式

torchrl.data 包

回放缓冲区

回放缓冲区是离线策略 RL 算法的核心部分。TorchRL 提供了几种广泛使用的回放缓冲区的有效实现

ReplayBuffer(*[, storage, sampler, writer, ...])

一个通用的、可组合的回放缓冲区类。

PrioritizedReplayBuffer(*, alpha, beta[, ...])

优先回放缓冲区。

TensorDictReplayBuffer(*[, priority_key])

围绕 ReplayBuffer 类的 TensorDict 特定包装器。

TensorDictPrioritizedReplayBuffer(*, alpha, beta)

围绕 PrioritizedReplayBuffer 类的 TensorDict 特定包装器。

可组合的回放缓冲区

我们还为用户提供了组合回放缓冲区的功能。我们为回放缓冲区的使用提供了广泛的解决方案,包括支持几乎任何数据类型;内存、设备或物理内存中的存储;多种采样策略;转换的使用等等。

支持的数据类型和选择存储

理论上,回放缓冲区支持任何数据类型,但我们不能保证每个组件都支持任何数据类型。最原始的回放缓冲区实现是由 ReplayBuffer 基础类和 ListStorage 存储组成的。这非常低效,但它允许您存储具有非张量数据的复杂数据结构。连续内存中的存储包括 TensorStorageLazyTensorStorageLazyMemmapStorage。这些类支持 TensorDict 数据作为一等公民,但也支持任何 PyTree 数据结构(例如,元组、列表、字典以及这些的嵌套版本)。TensorStorage 存储要求您在构造时提供存储,而 TensorStorage (RAM, CUDA) 和 LazyMemmapStorage (物理内存) 将在首次扩展后为您预分配存储。

这里有一些示例,从通用的 ListStorage 开始

>>> from torchrl.data.replay_buffers import ReplayBuffer, ListStorage
>>> rb = ReplayBuffer(storage=ListStorage(10))
>>> rb.add("a string!") # first element will be a string
>>> rb.extend([30, None])  # element [1] is an int, [2] is None

写入缓冲区的主要入口点是 add()extend()__setitem__() 也可以使用,在这种情况下,数据将被写入指示的位置,而无需更新缓冲区的长度或游标。当从缓冲区中采样项目并在之后就地更新它们的值时,这可能很有用。

使用 TensorStorage 我们告诉我们的 RB,我们希望存储是连续的,这效率更高,但也更具限制性

>>> import torch
>>> from torchrl.data.replay_buffers import ReplayBuffer, TensorStorage
>>> container = torch.empty(10, 3, 64, 64, dtype=torch.unit8)
>>> rb = ReplayBuffer(storage=TensorStorage(container))
>>> img = torch.randint(255, (3, 64, 64), dtype=torch.uint8)
>>> rb.add(img)

接下来,我们可以避免创建容器,并要求存储自动执行此操作。当使用 PyTrees 和 tensordicts 时,这非常有用!对于 PyTrees 和其他数据结构,add() 将传递给它的采样视为该类型的单个实例。extend() 另一方面,将认为数据是可迭代的。对于张量、tensordicts 和列表(见下文),将在根级别查找可迭代对象。对于 PyTrees,我们假设树中所有叶子(张量)的前导维度匹配。如果它们不匹配,extend 将抛出异常。

>>> import torch
>>> from tensordict import TensorDict
>>> from torchrl.data.replay_buffers import ReplayBuffer, LazyMemmapStorage
>>> rb_td = ReplayBuffer(storage=LazyMemmapStorage(10), batch_size=1)  # max 10 elements stored
>>> rb_td.add(TensorDict({"img": torch.randint(255, (3, 64, 64), dtype=torch.unit8),
...     "labels": torch.randint(100, ())}, batch_size=[]))
>>> rb_pytree = ReplayBuffer(storage=LazyMemmapStorage(10))  # max 10 elements stored
>>> # extend with a PyTree where all tensors have the same leading dim (3)
>>> rb_pytree.extend({"a": {"b": torch.randn(3), "c": [torch.zeros(3, 2), (torch.ones(3, 10),)]}})
>>> assert len(rb_pytree) == 3  # the replay buffer has 3 elements!

注意

当处理值列表时,extend() 可能具有模糊的签名,这应该被解释为 PyTree(在这种情况下,列表中的所有元素都将放在存储在存储中的 PyTree 中的切片中)或一次添加一个值列表。为了解决这个问题,TorchRL 在列表和元组之间进行了明确的区分:元组将被视为 PyTree,列表(在根级别)将被解释为一次添加到缓冲区的值堆栈。

采样和索引

回放缓冲区可以被索引和采样。索引和采样在存储中给定索引处收集数据,然后通过一系列转换和 collate_fn 处理它们,这些转换和 collate_fn 可以传递给回放缓冲区的 __init__ 函数。collate_fn 带有默认值,这些默认值应在大多数情况下符合用户期望,因此您不必在大多数时候担心它。转换通常是 Transform 的实例,即使常规函数也有效(在后一种情况下,inv() 方法显然将被忽略,而在第一种情况下,它可以用于在数据传递到缓冲区之前对其进行预处理)。最后,可以通过将线程数通过 prefetch 关键字参数传递给构造函数来使用多线程实现采样。我们建议用户在采用此技术之前在实际环境中对其进行基准测试,因为不能保证它会在实践中带来更快的吞吐量,具体取决于机器和使用它的环境。

采样时,batch_size 可以是在构造期间传递(例如,如果它在整个训练过程中是恒定的)或传递给 sample() 方法。

为了进一步完善采样策略,我们建议您查看我们的采样器!

这里有一些关于如何从回放缓冲区获取数据的示例

>>> first_elt = rb_td[0]
>>> storage = rb_td[:] # returns all valid elements from the buffer
>>> sample = rb_td.sample(128)
>>> for data in rb_td:  # iterate over the buffer using the sampler -- batch-size was set in the constructor to 1
...     print(data)

使用以下组件

FlatStorageCheckpointer([done_keys, reward_keys])

以紧凑的形式保存存储,节省 TED 格式的空间。

H5StorageCheckpointer(*[, checkpoint_file, ...])

以紧凑的形式保存存储,节省 TED 格式的空间,并使用 H5 格式保存数据。

ImmutableDatasetWriter()

不可变数据集的阻塞写入器。

LazyMemmapStorage(max_size, *[, ...])

张量和 tensordicts 的内存映射存储。

LazyTensorStorage(max_size, *[, device, ndim])

张量和 tensordicts 的预分配张量存储。

ListStorage([max_size])

存储在列表中的存储。

ListStorageCheckpointer()

ListStorage 的存储检查点。

NestedStorageCheckpointer([done_keys, ...])

以紧凑的形式保存存储,节省 TED 格式的空间,并使用内存映射的嵌套张量。

PrioritizedSampler(max_capacity, alpha, beta)

优先级采样器,用于回放缓冲区。

PrioritizedSliceSampler(max_capacity, alpha, ...)

使用优先级采样,沿第一维度采样数据切片,给定起始和停止信号。

RandomSampler()

用于可组合回放缓冲区的均匀随机采样器。

RoundRobinWriter(**kw)

用于可组合回放缓冲区的 RoundRobin 写入器类。

Sampler()

用于可组合回放缓冲区的通用采样器基类。

SamplerWithoutReplacement([drop_last, shuffle])

一种数据消耗型采样器,确保同一样本不会连续出现在批次中。

SliceSampler(*[, num_slices, slice_len, ...])

沿第一维度采样数据切片,给定起始和停止信号。

SliceSamplerWithoutReplacement(*[, ...])

沿第一维度采样数据切片,给定起始和停止信号,不进行替换。

Storage(max_size[, checkpointer])

Storage 是回放缓冲区的容器。

StorageCheckpointerBase()

存储检查点的公共基类。

StorageEnsembleCheckpointer()

用于集成存储的检查点。

TensorDictMaxValueWriter([rank_key, reduction])

用于可组合回放缓冲区的写入器类,根据某些排名键保留顶部元素。

TensorDictRoundRobinWriter(**kw)

用于可组合的、基于 tensordict 的回放缓冲区的 RoundRobin 写入器类。

TensorStorage(storage[, max_size, device, ndim])

用于张量和 tensordict 的存储。

TensorStorageCheckpointer()

用于 TensorStorage 的存储检查点。

Writer()

回放缓冲区基类写入器。

存储选择对回放缓冲区采样延迟影响很大,尤其是在具有较大数据量的分布式强化学习设置中。由于 MemoryMappedTensors 的较低序列化成本以及为改进节点故障恢复而指定文件存储位置的能力,强烈建议在具有共享存储的分布式设置中使用 LazyMemmapStorage 。在 https://github.com/pytorch/rl/tree/main/benchmarks/storage 的粗略基准测试中,发现与使用 ListStorage 相比,以下平均采样延迟有所改进。

存储类型

加速

ListStorage

1 倍

LazyTensorStorage

1.83 倍

LazyMemmapStorage

3.44 倍

跨进程共享回放缓冲区

只要回放缓冲区的组件是可共享的,就可以在进程之间共享回放缓冲区。此功能允许多个进程收集数据并协作填充共享回放缓冲区,而不是将数据集中在主进程上,这可能会产生一些数据传输开销。

可共享存储包括 LazyMemmapStorageTensorStorage 的任何子类,只要它们被实例化并且它们的内容存储为内存映射张量。状态写入器(如 TensorDictRoundRobinWriter )目前不可共享,状态采样器(如 PrioritizedSampler )也是如此。

共享回放缓冲区可以在任何有权访问它的进程上读取和扩展,如下例所示

>>> from torchrl.data import TensorDictReplayBuffer, LazyMemmapStorage
>>> import torch
>>> from torch import multiprocessing as mp
>>> from tensordict import TensorDict
>>>
>>> def worker(rb):
...     # Updates the replay buffer with new data
...     td = TensorDict({"a": torch.ones(10)}, [10])
...     rb.extend(td)
...
>>> if __name__ == "__main__":
...     rb = TensorDictReplayBuffer(storage=LazyMemmapStorage(21))
...     td = TensorDict({"a": torch.zeros(10)}, [10])
...     rb.extend(td)
...
...     proc = mp.Process(target=worker, args=(rb,))
...     proc.start()
...     proc.join()
...     # the replay buffer now has a length of 20, since the worker updated it
...     assert len(rb) == 20
...     assert (rb["_data", "a"][:10] == 0).all()  # data from main process
...     assert (rb["_data", "a"][10:20] == 1).all()  # data from remote process

存储轨迹

在回放缓冲区中存储轨迹并不太困难。需要注意的一个要素是,回放缓冲区的大小默认是存储前导维度的大小:换句话说,当存储多维数据时,创建大小为 1M 的存储的回放缓冲区并不意味着存储 1M 帧,而是 1M 条轨迹。但是,如果在存储之前将轨迹(或 episode/rollout)展平,则容量仍为 1M 步。

有一种方法可以绕过这个问题,即告诉存储在保存数据时应考虑多少个维度。这可以通过所有连续存储(如 TensorStorage 及其类似物)接受的 ndim 关键字参数来完成。当将多维存储传递给缓冲区时,缓冲区将自动将最后一个维度视为“时间”维度,这在 TorchRL 中是惯例。这可以通过 ReplayBuffer 中的 dim_extend 关键字参数来覆盖。这是保存通过 ParallelEnv 或其串行对应物获得的轨迹的推荐方法,我们将在下面看到。

当采样轨迹时,可能希望采样子轨迹以使学习多样化或使采样更有效率。TorchRL 提供了两种不同的方法来实现此目的

  • SliceSampler 允许采样沿 TensorStorage 的前导维度首尾相连存储的轨迹的给定数量的切片。当使用离线数据集(使用该约定存储)时,这是在 TorchRL 中采样子轨迹的 __尤其__ 推荐的方法。此策略需要在扩展回放缓冲区之前展平轨迹,并在采样后重塑它们。SliceSampler 类文档字符串给出了关于此存储和采样策略的详细信息。请注意,SliceSampler 与多维存储兼容。以下示例展示了如何在 tensordict 展平和不展平的情况下使用此功能。在第一种情况下,我们正在从单个环境中收集数据。在这种情况下,我们很乐意使用一个存储,该存储将传入的数据沿第一维度连接起来,因为收集计划不会引入中断

    >>> from torchrl.envs import TransformedEnv, StepCounter, GymEnv
    >>> from torchrl.collectors import SyncDataCollector, RandomPolicy
    >>> from torchrl.data import ReplayBuffer, LazyTensorStorage, SliceSampler
    >>> env = TransformedEnv(GymEnv("CartPole-v1"), StepCounter())
    >>> collector = SyncDataCollector(env,
    ...     RandomPolicy(env.action_spec),
    ...     frames_per_batch=10, total_frames=-1)
    >>> rb = ReplayBuffer(
    ...     storage=LazyTensorStorage(100),
    ...     sampler=SliceSampler(num_slices=8, traj_key=("collector", "traj_ids"),
    ...         truncated_key=None, strict_length=False),
    ...     batch_size=64)
    >>> for i, data in enumerate(collector):
    ...     rb.extend(data)
    ...     if i == 10:
    ...         break
    >>> assert len(rb) == 100, len(rb)
    >>> print(rb[:]["next", "step_count"])
    tensor([[32],
            [33],
            [34],
            [35],
            [36],
            [37],
            [38],
            [39],
            [40],
            [41],
            [11],
            [12],
            [13],
            [14],
            [15],
            [16],
            [17],
            [...
    

    如果在一个批次中运行多个环境,我们仍然可以通过调用 data.reshape(-1) 将数据存储在与之前相同的缓冲区中,这将把 [B, T] 大小展平为 [B * T],但这意味着批次中第一个环境的轨迹将被其他环境的轨迹交错,这是 SliceSampler 无法处理的情况。为了解决这个问题,我们建议在存储构造函数中使用 ndim 参数

    >>> env = TransformedEnv(SerialEnv(2,
    ...     lambda: GymEnv("CartPole-v1")), StepCounter())
    >>> collector = SyncDataCollector(env,
    ...     RandomPolicy(env.action_spec),
    ...     frames_per_batch=1, total_frames=-1)
    >>> rb = ReplayBuffer(
    ...     storage=LazyTensorStorage(100, ndim=2),
    ...     sampler=SliceSampler(num_slices=8, traj_key=("collector", "traj_ids"),
    ...         truncated_key=None, strict_length=False),
    ...     batch_size=64)
    >>> for i, data in enumerate(collector):
    ...     rb.extend(data)
    ...     if i == 100:
    ...         break
    >>> assert len(rb) == 100, len(rb)
    >>> print(rb[:]["next", "step_count"].squeeze())
    tensor([[ 6,  5],
            [ 2,  2],
            [ 3,  3],
            [ 4,  4],
            [ 5,  5],
            [ 6,  6],
            [ 7,  7],
            [ 8,  8],
            [ 9,  9],
            [10, 10],
            [11, 11],
            [12, 12],
            [13, 13],
            [14, 14],
            [15, 15],
            [16, 16],
            [17, 17],
            [18,  1],
            [19,  2],
            [...
    
  • 轨迹也可以独立存储,前导维度的每个元素都指向不同的轨迹。这要求轨迹具有一致的形状(或被填充)。我们提供了一个名为 RandomCropTensorDict 的自定义 Transform 类,允许在缓冲区中采样子轨迹。请注意,与基于 SliceSampler 的策略不同,这里不需要 "episode""done" 键指向起始和停止信号。以下是如何使用此类的一个示例

回放缓冲区检查点

回放缓冲区的每个组件都可能是状态性的,因此需要一种专用的序列化方式。我们的回放缓冲区具有两个独立的 API,用于将其状态保存在磁盘上: dumps()loads() 将使用内存映射张量和 json 文件保存每个组件(存储、写入器、采样器)的数据,但转换除外。

这将适用于所有类,除了 ListStorage ,其内容无法预测(因此不符合内存映射数据结构,例如 tensordict 库中可以找到的数据结构)。

此 API 保证保存然后重新加载的缓冲区将处于完全相同的状态,无论我们查看其采样器(例如,优先级树)、其写入器(例如,最大写入器堆)还是其存储的状态。

在底层,对 dumps() 的简单调用将仅在其每个组件的特定文件夹中调用公共 dumps 方法(转换除外,我们通常不假设可以使用内存映射张量对其进行序列化)。

但是,以 TED 格式 保存数据可能会消耗比所需更多的内存。如果连续轨迹存储在缓冲区中,我们可以通过将所有观察结果保存在根目录中,并仅保存 “next” 子 tensordict 的观察结果的最后一个元素来避免保存重复的观察结果,这可以将存储消耗减少多达两倍。为了启用此功能,提供了三个检查点类: FlatStorageCheckpointer 将丢弃重复的观察结果以压缩 TED 格式。在加载时,此类将以正确的格式重新写入观察结果。如果缓冲区保存在磁盘上,则此检查点程序执行的操作将不需要任何额外的 RAM。 NestedStorageCheckpointer 将使用嵌套张量保存轨迹,以使数据表示更明显(沿第一维度的每个项目代表一个不同的轨迹)。最后, H5StorageCheckpointer 将以 H5DB 格式保存缓冲区,使用户能够压缩数据并节省更多空间。

警告

检查点程序对回放缓冲区做出了一些限制性假设。首先,假设 done 状态准确地表示轨迹的结束(除了写入的最后一个轨迹,写入器光标指示放置截断信号的位置)。对于 MARL 用法,应注意,只允许具有与根 tensordict 元素数量相同的 done 状态:如果 done 状态具有存储批次大小中未表示的额外元素,则这些检查点程序将失败。例如,在形状为 torch.Size([3, 4]) 的存储中,形状为 torch.Size([3, 4, 5]) 的 done 状态是不允许的。

这是一个如何在实践中使用 H5DB 检查点程序的具体示例

>>> from torchrl.data import ReplayBuffer, H5StorageCheckpointer, LazyMemmapStorage
>>> from torchrl.collectors import SyncDataCollector
>>> from torchrl.envs import GymEnv, SerialEnv
>>> import torch
>>> env = SerialEnv(3, lambda: GymEnv("CartPole-v1", device=None))
>>> env.set_seed(0)
>>> torch.manual_seed(0)
>>> collector = SyncDataCollector(
>>>     env, policy=env.rand_step, total_frames=200, frames_per_batch=22
>>> )
>>> rb = ReplayBuffer(storage=LazyMemmapStorage(100, ndim=2))
>>> rb_test = ReplayBuffer(storage=LazyMemmapStorage(100, ndim=2))
>>> rb.storage.checkpointer = H5StorageCheckpointer()
>>> rb_test.storage.checkpointer = H5StorageCheckpointer()
>>> for i, data in enumerate(collector):
...     rb.extend(data)
...     assert rb._storage.max_size == 102
...     rb.dumps(path_to_save_dir)
...     rb_test.loads(path_to_save_dir)
...     assert_allclose_td(rb_test[:], rb[:])

当使用 dumps() 无法保存数据时,另一种方法是使用 state_dict(),它返回一个可以使用 torch.save() 保存和使用 torch.load() 加载的数据结构,然后再调用 load_state_dict()。这种方法的缺点是它难以保存大型数据结构,这在使用回放缓冲区时是一种常见的情况。

TorchRL Episode Data Format (TED)

在 TorchRL 中,序列数据始终以特定的格式呈现,称为 TorchRL Episode Data Format (TED)。这种格式对于 TorchRL 中各种组件的无缝集成和运行至关重要。

某些组件,例如回放缓冲区,对数据格式有些不敏感。然而,其他组件,特别是环境,在很大程度上依赖于它来实现平稳运行。

因此,理解 TED、其目的以及如何与之交互至关重要。本指南将清晰地解释 TED、其用途以及如何有效地使用它。

TED 背后的原理

格式化序列数据可能是一项复杂的任务,尤其是在强化学习 (RL) 领域。作为从业者,我们经常遇到在重置时(虽然不总是)传递数据的情况,有时数据在轨迹的最后一步提供或丢弃。

这种可变性意味着我们可以在数据集中观察到不同长度的数据,并且并不总是立即清楚如何匹配此数据集中各个元素的每个时间步。考虑以下不明确的数据集结构

>>> observation.shape
[200, 3]
>>> action.shape
[199, 4]
>>> info.shape
[200, 3]

乍一看,信息和观测似乎是一起传递的(在重置时各一个 + 在每次 step 调用时各一个),正如动作比元素少一个所暗示的那样。然而,如果信息比元素少一个,我们必须假设它要么在重置时被省略,要么在轨迹的最后一步没有传递或记录。如果没有关于数据结构的适当文档,就不可能确定哪个信息对应于哪个时间步。

更复杂的是,某些数据集提供不一致的数据格式,其中 observationsinfos 在 rollout 的开始或结束时丢失,并且这种行为通常没有文档记录。TED 的主要目的是通过提供清晰且一致的数据表示来消除这些歧义。

TED 的结构

TED 构建于 RL 上下文中马尔可夫决策过程 (MDP) 的规范定义之上。在每个步骤中,观测会条件化一个动作,该动作导致 (1) 一个新的观测,(2) 任务完成的指示器(terminated、truncated、done),以及 (3) 一个奖励信号。

某些元素可能缺失(例如,在模仿学习上下文中奖励是可选的),或者额外的信息可以通过状态或信息容器传递。在某些情况下,需要额外的信息才能在调用 step 期间获得观测(例如,在无状态环境模拟器中)。此外,在某些情况下,“动作”(或任何其他数据)不能表示为单个张量,需要以不同的方式组织。例如,在多智能体 RL 设置中,动作、观测、奖励和完成信号可能是复合的。

TED 以单一、统一且明确的格式适应所有这些场景。我们通过在执行动作时设置一个限制来区分时间步 tt+1 之间发生的事情。换句话说,在调用 env.step 之前存在的所有内容都属于 t,之后出现的所有内容都属于 t+1

一般规则是,属于时间步 t 的所有内容都存储在 tensordict 的根目录中,而属于 t+1 的所有内容都存储在 tensordict 的 "next" 条目中。这是一个例子

>>> data = env.reset()
>>> data = policy(data)
>>> print(env.step(data))
TensorDict(
    fields={
        action: Tensor(...),  # The action taken at time t
        done: Tensor(...),  # The done state when the action was taken (at reset)
        next: TensorDict(  # all of this content comes from the call to `step`
            fields={
                done: Tensor(...),  # The done state after the action has been taken
                observation: Tensor(...),  # The observation resulting from the action
                reward: Tensor(...),  # The reward resulting from the action
                terminated: Tensor(...),  # The terminated state after the action has been taken
                truncated: Tensor(...),  # The truncated state after the action has been taken
            batch_size=torch.Size([]),
            device=cpu,
            is_shared=False),
        observation: Tensor(...),  # the observation at reset
        terminated: Tensor(...),  # the terminated at reset
        truncated: Tensor(...),  # the truncated at reset
    batch_size=torch.Size([]),
    device=cpu,
    is_shared=False)

在 rollout 期间(使用 EnvBaseSyncDataCollector),当 agent 重置其步数计数时,"next" tensordict 的内容通过 step_mdp() 函数带到根目录:t <- t+1。您可以在 此处 阅读更多关于环境 API 的信息。

在大多数情况下,根目录下没有 True 值的 "done" 状态,因为任何 done 状态都会触发(部分)重置,这将把 "done" 变为 False。然而,这仅在自动执行重置时才成立。在某些情况下,部分重置不会触发重置,因此我们保留这些数据,与观测相比,这些数据应该具有明显更低的内存占用。

这种格式消除了关于观测与其动作、信息或 done 状态匹配的任何歧义。

展平 TED 以减少内存消耗

TED 在内存中复制观测两次,这可能会影响在实践中使用此格式的可行性。由于它主要用于方便表示,因此可以将数据以扁平方式存储,但在训练期间将其表示为 TED。

这在序列化回放缓冲区时特别有用:例如,TED2Flat 类确保在将 TED 格式的数据结构写入磁盘之前将其展平,而 Flat2TED 加载钩子将在反序列化期间取消展平此结构。

Tensordict 的维度

在 rollout 期间,所有收集的 tensordict 将沿末尾的新维度堆叠。收集器和环境都将使用 "time" 名称标记此维度。这是一个例子

>>> rollout = env.rollout(10, policy)
>>> assert rollout.shape[-1] == 10
>>> assert rollout.names[-1] == "time"

这确保了时间维度被清晰地标记并在数据结构中易于识别。

特殊情况和脚注

多智能体数据表示

多智能体数据格式化文档可以在 MARL 环境 API 部分访问。

基于内存的策略(RNN 和 Transformer)

在上面提供的示例中,只有 env.step(data) 生成需要在下一步读取的数据。然而,在某些情况下,策略也会输出下一步需要的信息。对于基于 RNN 的策略来说,通常是这种情况,它输出一个动作以及一个需要在下一步中使用的循环状态。为了适应这一点,我们建议用户调整其 RNN 策略,将此数据写入 tensordict 的 "next" 条目下。这确保了此内容将在下一步中被带到根目录。更多信息可以在 GRUModuleLSTMModule 中找到。

多步

收集器允许用户在读取数据时跳过步骤,累积即将到来的 n 步的奖励。这项技术在像 Rainbow 这样的 DQN 类算法中很流行。MultiStep 类对来自收集器的批次执行此数据转换。在这些情况下,以下检查将失败,因为下一个观测值偏移了 n 步

>>> assert (data[..., 1:]["observation"] == data[..., :-1]["next", "observation"]).all()

内存需求如何?

以朴素的方式实现,这种数据格式消耗的内存大约是扁平表示的两倍。在某些内存密集型设置中(例如,在 AtariDQNExperienceReplay 数据集中),我们仅在磁盘上存储 T+1 观测值,并在获取时在线执行格式化。在其他情况下,我们假设 2 倍的内存成本是为了更清晰的表示而付出的少量代价。然而,推广离线数据集的惰性表示肯定会是一个有益的功能,我们欢迎在这方面的贡献!

数据集

TorchRL 提供了离线 RL 数据集的包装器。这些数据以 ReplayBuffer 实例的形式呈现,这意味着它们可以随意使用 transforms、samplers 和 storages 进行自定义。例如,可以使用 SelectTransformExcludeTransform 将条目过滤到数据集内或外。

默认情况下,数据集存储为内存映射张量,使其能够以几乎零内存占用快速采样。

这是一个例子

注意

安装依赖项是用户的责任。对于 D4RL,需要克隆 该存储库,因为最新的 wheels 没有发布在 PyPI 上。对于 OpenML,需要 scikit-learnpandas

转换数据集

在许多情况下,原始数据不会按原样使用。自然的解决方案可能是将 Transform 实例传递给数据集构造函数并在运行时修改样本。这将有效,但会为转换增加额外的运行时。如果转换可以(至少部分)预先应用于数据集,则可以节省相当大的磁盘空间和一些采样时产生的开销。为此,可以使用 preprocess()。此方法将对数据集的每个元素运行每个样本的预处理管道,并将现有数据集替换为其转换后的版本。

一旦转换,重新创建相同的数据集将产生另一个具有相同转换后存储的对象(除非使用 download="force"

>>> dataset = RobosetExperienceReplay(
...     "FK1-v4(expert)/FK1_MicroOpenRandom_v2d-v4", batch_size=32, download="force"
... )
>>>
>>> def func(data):
...     return data.set("obs_norm", data.get("observation").norm(dim=-1))
...
>>> dataset.preprocess(
...     func,
...     num_workers=max(1, os.cpu_count() - 2),
...     num_chunks=1000,
...     mp_start_method="fork",
... )
>>> sample = dataset.sample()
>>> assert "obs_norm" in sample.keys()
>>> # re-recreating the dataset gives us the transformed version back.
>>> dataset = RobosetExperienceReplay(
...     "FK1-v4(expert)/FK1_MicroOpenRandom_v2d-v4", batch_size=32
... )
>>> sample = dataset.sample()
>>> assert "obs_norm" in sample.keys()

BaseDatasetExperienceReplay(*[, priority_key])

离线数据集的父类。

AtariDQNExperienceReplay(dataset_id[, ...])

Atari DQN 经验回放类。

D4RLExperienceReplay(dataset_id, batch_size)

D4RL 的经验回放类。

GenDGRLExperienceReplay(dataset_id[, ...])

Gen-DGRL 经验回放数据集。

MinariExperienceReplay(dataset_id, batch_size, *)

Minari 经验回放数据集。

OpenMLExperienceReplay(name, batch_size[, ...])

OpenML 数据的经验回放。

OpenXExperienceReplay(dataset_id[, ...])

Open X-Embodiment 数据集经验回放。

RobosetExperienceReplay(dataset_id, ...[, ...])

Roboset 经验回放数据集。

VD4RLExperienceReplay(dataset_id, batch_size, *)

V-D4RL 经验回放数据集。

组合数据集

在离线 RL 中,通常同时处理多个数据集。此外,TorchRL 通常具有细粒度的数据集命名法,其中每个任务都单独表示,而其他库将以更紧凑的方式表示这些数据集。为了允许用户将多个数据集组合在一起,我们提出了一个 ReplayBufferEnsemble 原语,允许用户一次从多个数据集中采样。

如果各个数据集格式不同,则可以使用 Transform 实例。在以下示例中,我们创建了两个虚拟数据集,它们具有语义上相同的条目,但在名称(("some", "key")"another_key")上有所不同,并展示了如何重命名它们以使其具有匹配的名称。我们还调整了图像大小,以便在采样期间可以将它们堆叠在一起。

>>> from torchrl.envs import Comopse, ToTensorImage, Resize, RenameTransform
>>> from torchrl.data import TensorDictReplayBuffer, ReplayBufferEnsemble, LazyMemmapStorage
>>> from tensordict import TensorDict
>>> import torch
>>> rb0 = TensorDictReplayBuffer(
...     storage=LazyMemmapStorage(10),
...     transform=Compose(
...         ToTensorImage(in_keys=["pixels", ("next", "pixels")]),
...         Resize(32, in_keys=["pixels", ("next", "pixels")]),
...         RenameTransform([("some", "key")], ["renamed"]),
...     ),
... )
>>> rb1 = TensorDictReplayBuffer(
...     storage=LazyMemmapStorage(10),
...     transform=Compose(
...         ToTensorImage(in_keys=["pixels", ("next", "pixels")]),
...         Resize(32, in_keys=["pixels", ("next", "pixels")]),
...         RenameTransform(["another_key"], ["renamed"]),
...     ),
... )
>>> rb = ReplayBufferEnsemble(
...     rb0,
...     rb1,
...     p=[0.5, 0.5],
...     transform=Resize(33, in_keys=["pixels"], out_keys=["pixels33"]),
... )
>>> data0 = TensorDict(
...     {
...         "pixels": torch.randint(255, (10, 244, 244, 3)),
...         ("next", "pixels"): torch.randint(255, (10, 244, 244, 3)),
...         ("some", "key"): torch.randn(10),
...     },
...     batch_size=[10],
... )
>>> data1 = TensorDict(
...     {
...         "pixels": torch.randint(255, (10, 64, 64, 3)),
...         ("next", "pixels"): torch.randint(255, (10, 64, 64, 3)),
...         "another_key": torch.randn(10),
...     },
...     batch_size=[10],
... )
>>> rb[0].extend(data0)
>>> rb[1].extend(data1)
>>> for _ in range(2):
...     sample = rb.sample(10)
...     assert sample["next", "pixels"].shape == torch.Size([2, 5, 3, 32, 32])
...     assert sample["pixels"].shape == torch.Size([2, 5, 3, 32, 32])
...     assert sample["pixels33"].shape == torch.Size([2, 5, 3, 33, 33])
...     assert sample["renamed"].shape == torch.Size([2, 5])

ReplayBufferEnsemble(*rbs[, storages, ...])

回放缓冲区的集成。

SamplerEnsemble(*samplers[, p, ...])

采样器的集成。

StorageEnsemble(*storages[, transforms])

存储器的集成。

WriterEnsemble(*writers)

写入器的集成。

TensorSpec

TensorSpec 父类和子类定义了 TorchRL 中状态、观测、动作、奖励和完成状态的基本属性,例如它们的形状、设备、dtype 和域。

重要的是,您的环境规范与它发送和接收的输入和输出相匹配,因为 ParallelEnv 将从这些规范创建缓冲区,以便与 spawn 进程通信。请查看 torchrl.envs.utils.check_env_specs() 方法进行健全性检查。

如果需要,可以使用 make_composite_from_td() 函数从数据中自动生成规范。

规范分为两个主要类别:数值型和类别型。

数值型 TensorSpec 子类。

数值型

有界

无界

有界离散

有界连续

无界离散

无界连续

每当创建 Bounded 实例时,它的域(由其 dtype 隐式定义或由 “domain” 关键字参数显式定义)将确定实例化的类是 BoundedContinuous 类型还是 BoundedDiscrete 类型。这同样适用于 Unbounded 类。有关更多信息,请参阅这些类。

类别型 TensorSpec 子类。

类别型

OneHot

MultiOneHot

类别型

MultiCategorical

Binary

gymnasium 不同,TorchRL 没有任意规范列表的概念。如果必须将多个规范组合在一起,TorchRL 假定数据将以字典形式呈现(更具体地说,以 TensorDict 或相关格式)。在这些情况下,相应的 TensorSpec 类是 Composite 规范。

然而,可以使用 stack() 将规范堆叠在一起:如果它们相同,则它们的形状将相应地扩展。否则,将通过 Stacked 类创建一个惰性堆叠。

类似地,TensorSpecs 具有与 TensorTensorDict 相同的行为:它们可以像常规 Tensor 实例一样,被重塑形状、索引、压缩、解压缩、移动到另一个设备(to)或解绑(unbind)。

某些维度为 -1 的规范被称为“动态”规范,负维度表示相应的数据具有不一致的形状。当优化器或环境(例如,批量环境,如 ParallelEnv)看到这些负形状时,它们会告诉 TorchRL 避免使用缓冲区,因为张量形状是不可预测的。

TensorSpec(shape, space, ...)

张量元数据容器的父类。

Binary([n, shape, device, dtype])

二进制离散张量规范。

Bounded(*args, **kwargs)

有界张量规范。

Categorical(n[, shape, device, dtype, mask])

离散张量规范。

Composite(*args, **kwargs)

TensorSpec 的组合。

MultiCategorical(nvec[, shape, device, ...])

离散张量规范的串联。

MultiOneHot(nvec[, shape, device, dtype, ...])

one-hot 离散张量规范的串联。

NonTensor([shape, device, dtype])

用于非张量数据的规范。

Stacked(*specs, dim)

张量规范堆叠的惰性表示。

StackedComposite(*args, **kwargs)

复合规范堆叠的惰性表示。

Unbounded(*args, **kwargs)

无界张量规范。

UnboundedContinuous(*args, **kwargs)

torchrl.data.Unbounded 的专门版本,具有连续空间。

UnboundedDiscrete(*args, **kwargs)

torchrl.data.Unbounded 的专门版本,具有离散空间。

以下类已弃用,仅指向上述类

BinaryDiscreteTensorSpec(*args, **kwargs)

torchrl.data.Binary 的已弃用版本。

BoundedTensorSpec(*args, **kwargs)

torchrl.data.Bounded 的已弃用版本。

CompositeSpec(*args, **kwargs)

torchrl.data.Composite 的已弃用版本。

DiscreteTensorSpec(*args, **kwargs)

torchrl.data.Categorical 的已弃用版本。

LazyStackedCompositeSpec(*args, **kwargs)

torchrl.data.StackedComposite 的已弃用版本。

LazyStackedTensorSpec(*args, **kwargs)

torchrl.data.Stacked 的已弃用版本。

MultiDiscreteTensorSpec(*args, **kwargs)

torchrl.data.MultiCategorical 的已弃用版本。

MultiOneHotDiscreteTensorSpec(*args, **kwargs)

torchrl.data.MultiOneHot 的已弃用版本。

NonTensorSpec(*args, **kwargs)

torchrl.data.NonTensor 的已弃用版本。

OneHotDiscreteTensorSpec(*args, **kwargs)

torchrl.data.OneHot 的已弃用版本。

UnboundedContinuousTensorSpec(*args, **kwargs)

torchrl.data.Unbounded 的已弃用版本,具有连续空间。

UnboundedDiscreteTensorSpec(*args, **kwargs)

已弃用的 torchrl.data.Unbounded 版本,带有离散空间。

树和森林

TorchRL 提供了一组类和函数,可以用于高效地表示树和森林。

BinaryToDecimal(num_bits, device, dtype[, ...])

一个将二进制编码的张量转换为十进制的模块。

HashToInt()

将哈希值转换为可用于索引连续存储的整数。

QueryModule(*args, **kwargs)

一个用于为存储生成兼容索引的模块。

RandomProjectionHash(*[, n_components, ...])

一个模块,它结合了随机投影和 SipHash,以获得一个低维张量,更容易通过 SipHash 进行嵌入。

SipHash([as_tensor])

一个用于计算给定张量的 SipHash 值的模块。

TensorDictMap(*args, **kwargs)

TensorDict 的 Map-Storage。

TensorMap()

用于实现不同存储的抽象。

从人类反馈中进行强化学习 (RLHF)

数据在从人类反馈中进行强化学习 (RLHF) 中至关重要。鉴于这些技术通常应用于语言领域,而该领域在库中其他 RL 子域中很少涉及,我们提供了特定的实用程序来促进与外部库(如数据集)的交互。这些实用程序包括用于标记化数据、以适合 TorchRL 模块的方式格式化数据以及优化存储以实现高效采样的工具。

PairwiseDataset(chosen_data, rejected_data, ...)

PromptData(input_ids, attention_mask, ...[, ...])

PromptTensorDictTokenizer(tokenizer, max_length)

提示数据集的标记化配方。

RewardData(input_ids, attention_mask[, ...])

RolloutFromModel(model, ref_model, reward_model)

一个使用因果语言模型执行 rollout 的类。

TensorDictTokenizer(tokenizer, max_length[, ...])

用于应用 tokenizer 到文本示例的过程函数的工厂。

TokenizedDatasetLoader(split, max_length, ...)

加载标记化的数据集,并缓存其内存映射副本。

create_infinite_iterator(iterator)

无限期地迭代一个迭代器。

get_dataloader(batch_size, block_size, ...)

创建一个数据集并从中返回一个 dataloader。

ConstantKLController(*[, kl_coef, model])

恒定 KL 控制器。

AdaptiveKLController(*, init_kl_coef, ...[, ...])

自适应 KL 控制器,如 Ziegler 等人在“从人类偏好微调语言模型”中所述。

实用工具

MultiStep(gamma, n_steps)

多步奖励变换。

consolidate_spec(spec[, ...])

给定 TensorSpec,通过添加 0 形 Spec 来移除独占键。

check_no_exclusive_keys(spec[, recurse])

给定 TensorSpec,如果不存在独占键,则返回 true。

contains_lazy_spec(spec)

如果 spec 包含延迟堆叠的 spec,则返回 true。

Nested2TED([done_key, shift_key, ...])

将嵌套的 tensordict(其中每一行都是一个轨迹)转换为 TED 格式。

Flat2TED([done_key, shift_key, is_full_key, ...])

一个存储加载 hook,用于将扁平化的 TED 数据反序列化为 TED 格式。

H5Combine()

将持久性 tensordict 中的轨迹组合成存储在文件系统中的单个常驻 tensordict。

H5Split([done_key, shift_key, is_full_key, ...])

将使用 TED2Nested 准备的数据集拆分为 TensorDict,其中每个轨迹都存储为对其父嵌套张量的视图。

TED2Flat([done_key, shift_key, is_full_key, ...])

一个存储保存 hook,用于以紧凑格式序列化 TED 数据。

TED2Nested(*args, **kwargs)

将 TED 格式的数据集转换为填充了嵌套张量的 tensordict,其中每一行都是一个轨迹。

MultiStepTransform(n_steps, gamma, *[, ...])

ReplayBuffers 的 MultiStep 变换。

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取面向初学者和高级开发者的深入教程

查看教程

资源

查找开发资源并获得您的问题解答

查看资源