快捷方式

SliceSampler

class torchrl.data.replay_buffers.SliceSampler(*, num_slices: int = None, slice_len: int = None, end_key: NestedKey | None = None, traj_key: NestedKey | None = None, ends: torch.Tensor | None = None, trajectories: torch.Tensor | None = None, cache_values: bool = False, truncated_key: NestedKey | None =('next', 'truncated'), strict_length: bool = True, compile: bool | dict = False, span: bool | int | Tuple[bool | int, bool | int] = False, use_gpu: torch.device | bool = False)[source]

根据起始和停止信号,沿着第一个维度采样数据切片。

此类有放回地采样子轨迹。对于无放回采样版本,请参见 SliceSamplerWithoutReplacement

注意

SliceSampler 检索轨迹索引可能很慢。为了加速其执行,优先使用 end_key 而非 traj_key,并考虑以下关键字参数:compilecache_valuesuse_gpu

关键字参数:
  • num_slices (int) – 要采样的切片数量。批次大小必须大于或等于 num_slices 参数。与 slice_len 互斥。

  • slice_len (int) – 要采样的切片长度。批次大小必须大于或等于 slice_len 参数,并且必须能被其整除。与 num_slices 互斥。

  • end_key (NestedKey, 可选) – 指示轨迹(或片段)结束的键。默认为 ("next", "done")

  • traj_key (NestedKey, 可选) – 指示轨迹的键。默认为 "episode"(在 TorchRL 的数据集中常用)。

  • ends (torch.Tensor, 可选) – 包含运行结束信号的一维布尔张量。在获取 end_keytraj_key 成本较高或此信号易于获得时使用。必须与 cache_values=True 一起使用,且不能与 end_keytraj_key 同时使用。如果提供此参数,则假定存储已满,并且如果 ends 张量的最后一个元素为 False,则同一轨迹跨越结束和开始。

  • trajectories (torch.Tensor, 可选) – 包含运行 ID 的一维整数张量。在获取 end_keytraj_key 成本较高或此信号易于获得时使用。必须与 cache_values=True 一起使用,且不能与 end_keytraj_key 同时使用。如果提供此参数,则假定存储已满,并且如果轨迹张量的最后一个元素与第一个元素相同,则同一轨迹跨越结束和开始。

  • cache_values (bool, 可选) –

    与静态数据集一起使用。将缓存轨迹的起始和结束信号。即使在调用 extend 期间轨迹索引发生变化,此操作也会擦除缓存,因此可以安全使用。

    警告

    cache_values=True 在采样器与由另一个缓冲区扩展的存储一起使用时将无法正常工作。例如:

    >>> buffer0 = ReplayBuffer(storage=storage,
    ...     sampler=SliceSampler(num_slices=8, cache_values=True),
    ...     writer=ImmutableWriter())
    >>> buffer1 = ReplayBuffer(storage=storage,
    ...     sampler=other_sampler)
    >>> # Wrong! Does not erase the buffer from the sampler of buffer0
    >>> buffer1.extend(data)
    

    警告

    如果缓冲区在进程之间共享,且一个进程负责写入,一个进程负责采样,则 cache_values=True 将无法按预期工作,因为擦除缓存只能在本地完成。

  • truncated_key (NestedKey, 可选) – 如果不是 None,此参数指示应在输出数据中写入截断信号的位置。这用于向价值估计器指示提供的轨迹在哪里中断。默认为 ("next", "truncated")。此功能仅适用于 TensorDictReplayBuffer 实例(否则截断键在 sample() 方法返回的信息字典中返回)。

  • strict_length (bool, 可选) – 如果为 False,则允许长度短于 slice_len(或 batch_size // num_slices)的轨迹出现在批次中。如果为 True,则会过滤掉短于所需长度的轨迹。请注意,这可能导致实际 batch_size 小于请求的大小!可以使用 split_trajectories() 分割轨迹。默认为 True

  • compile (bool or dict of kwargs, 可选) – 如果为 True,则 sample() 方法的瓶颈将使用 compile() 进行编译。关键字参数也可以通过此参数传递给 torch.compile。默认为 False

  • span (bool, int, Tuple[bool | int, bool | int], 可选) – 如果提供此参数,采样的轨迹将跨越左侧和/或右侧。这意味着提供的元素可能少于所需数量。布尔值表示每个轨迹至少会采样一个元素。整数 i 表示每个采样轨迹至少会收集 slice_len - i 个样本。使用元组可以对左侧(存储轨迹的开头)和右侧(存储轨迹的末尾)的跨越进行细粒度控制。

  • use_gpu (bool or torch.device) – 如果为 True(或传递了设备),则将使用加速器来检索轨迹起始的索引。当缓冲区内容较大时,这可以显著加速采样。默认为 False

注意

为了恢复存储中的轨迹分割,SliceSampler 将首先尝试在存储中找到 traj_key 条目。如果找不到,将使用 end_key 重建片段。

注意

当使用 strict_length=False 时,建议使用 split_trajectories() 来分割采样到的轨迹。然而,如果来自同一片段的两个样本相邻放置,这可能会产生错误的结果。为了避免这个问题,请考虑以下解决方案之一:

  • TensorDictReplayBuffer 实例与切片采样器一起使用

    >>> import torch
    >>> from tensordict import TensorDict
    >>> from torchrl.collectors.utils import split_trajectories
    >>> from torchrl.data import TensorDictReplayBuffer, ReplayBuffer, LazyTensorStorage, SliceSampler, SliceSamplerWithoutReplacement
    >>>
    >>> rb = TensorDictReplayBuffer(storage=LazyTensorStorage(max_size=1000),
    ...                   sampler=SliceSampler(
    ...                       slice_len=5, traj_key="episode",strict_length=False,
    ...                   ))
    ...
    >>> ep_1 = TensorDict(
    ...     {"obs": torch.arange(100),
    ...     "episode": torch.zeros(100),},
    ...     batch_size=[100]
    ... )
    >>> ep_2 = TensorDict(
    ...     {"obs": torch.arange(4),
    ...     "episode": torch.ones(4),},
    ...     batch_size=[4]
    ... )
    >>> rb.extend(ep_1)
    >>> rb.extend(ep_2)
    >>>
    >>> s = rb.sample(50)
    >>> print(s)
    TensorDict(
        fields={
            episode: Tensor(shape=torch.Size([46]), device=cpu, dtype=torch.float32, is_shared=False),
            index: Tensor(shape=torch.Size([46, 1]), device=cpu, dtype=torch.int64, is_shared=False),
            next: TensorDict(
                fields={
                    done: Tensor(shape=torch.Size([46, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                    terminated: Tensor(shape=torch.Size([46, 1]), device=cpu, dtype=torch.bool, is_shared=False),
                    truncated: Tensor(shape=torch.Size([46, 1]), device=cpu, dtype=torch.bool, is_shared=False)},
                batch_size=torch.Size([46]),
                device=cpu,
                is_shared=False),
            obs: Tensor(shape=torch.Size([46]), device=cpu, dtype=torch.int64, is_shared=False)},
        batch_size=torch.Size([46]),
        device=cpu,
        is_shared=False)
    >>> t = split_trajectories(s, done_key="truncated")
    >>> print(t["obs"])
    tensor([[73, 74, 75, 76, 77],
            [ 0,  1,  2,  3,  0],
            [ 0,  1,  2,  3,  0],
            [41, 42, 43, 44, 45],
            [ 0,  1,  2,  3,  0],
            [67, 68, 69, 70, 71],
            [27, 28, 29, 30, 31],
            [80, 81, 82, 83, 84],
            [17, 18, 19, 20, 21],
            [ 0,  1,  2,  3,  0]])
    >>> print(t["episode"])
    tensor([[0., 0., 0., 0., 0.],
            [1., 1., 1., 1., 0.],
            [1., 1., 1., 1., 0.],
            [0., 0., 0., 0., 0.],
            [1., 1., 1., 1., 0.],
            [0., 0., 0., 0., 0.],
            [0., 0., 0., 0., 0.],
            [0., 0., 0., 0., 0.],
            [0., 0., 0., 0., 0.],
            [1., 1., 1., 1., 0.]])
    
  • 使用 SliceSamplerWithoutReplacement

    >>> import torch
    >>> from tensordict import TensorDict
    >>> from torchrl.collectors.utils import split_trajectories
    >>> from torchrl.data import ReplayBuffer, LazyTensorStorage, SliceSampler, SliceSamplerWithoutReplacement
    >>>
    >>> rb = ReplayBuffer(storage=LazyTensorStorage(max_size=1000),
    ...                   sampler=SliceSamplerWithoutReplacement(
    ...                       slice_len=5, traj_key="episode",strict_length=False
    ...                   ))
    ...
    >>> ep_1 = TensorDict(
    ...     {"obs": torch.arange(100),
    ...     "episode": torch.zeros(100),},
    ...     batch_size=[100]
    ... )
    >>> ep_2 = TensorDict(
    ...     {"obs": torch.arange(4),
    ...     "episode": torch.ones(4),},
    ...     batch_size=[4]
    ... )
    >>> rb.extend(ep_1)
    >>> rb.extend(ep_2)
    >>>
    >>> s = rb.sample(50)
    >>> t = split_trajectories(s, trajectory_key="episode")
    >>> print(t["obs"])
    tensor([[75, 76, 77, 78, 79],
            [ 0,  1,  2,  3,  0]])
    >>> print(t["episode"])
    tensor([[0., 0., 0., 0., 0.],
            [1., 1., 1., 1., 0.]])
    

示例

>>> import torch
>>> from tensordict import TensorDict
>>> from torchrl.data.replay_buffers import LazyMemmapStorage, TensorDictReplayBuffer
>>> from torchrl.data.replay_buffers.samplers import SliceSampler
>>> torch.manual_seed(0)
>>> rb = TensorDictReplayBuffer(
...     storage=LazyMemmapStorage(1_000_000),
...     sampler=SliceSampler(cache_values=True, num_slices=10),
...     batch_size=320,
... )
>>> episode = torch.zeros(1000, dtype=torch.int)
>>> episode[:300] = 1
>>> episode[300:550] = 2
>>> episode[550:700] = 3
>>> episode[700:] = 4
>>> data = TensorDict(
...     {
...         "episode": episode,
...         "obs": torch.randn((3, 4, 5)).expand(1000, 3, 4, 5),
...         "act": torch.randn((20,)).expand(1000, 20),
...         "other": torch.randn((20, 50)).expand(1000, 20, 50),
...     }, [1000]
... )
>>> rb.extend(data)
>>> sample = rb.sample()
>>> print("sample:", sample)
>>> print("episodes", sample.get("episode").unique())
episodes tensor([1, 2, 3, 4], dtype=torch.int32)

SliceSampler 与大多数 TorchRL 数据集默认兼容

示例

>>> import torch
>>>
>>> from torchrl.data.datasets import RobosetExperienceReplay
>>> from torchrl.data import SliceSampler
>>>
>>> torch.manual_seed(0)
>>> num_slices = 10
>>> dataid = list(RobosetExperienceReplay.available_datasets)[0]
>>> data = RobosetExperienceReplay(dataid, batch_size=320, sampler=SliceSampler(num_slices=num_slices))
>>> for batch in data:
...     batch = batch.reshape(num_slices, -1)
...     break
>>> print("check that each batch only has one episode:", batch["episode"].unique(dim=1))
check that each batch only has one episode: tensor([[19],
        [14],
        [ 8],
        [10],
        [13],
        [ 4],
        [ 2],
        [ 3],
        [22],
        [ 8]])

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取面向初学者和高级开发者的深度教程

查看教程

资源

查找开发资源并解答您的疑问

查看资源