SliceSampler¶
- class torchrl.data.replay_buffers.SliceSampler(*, num_slices: int = None, slice_len: int = None, end_key: NestedKey | None = None, traj_key: NestedKey | None = None, ends: torch.Tensor | None = None, trajectories: torch.Tensor | None = None, cache_values: bool = False, truncated_key: NestedKey | None =('next', 'truncated'), strict_length: bool = True, compile: bool | dict = False, span: bool | int | Tuple[bool | int, bool | int] = False, use_gpu: torch.device | bool = False)[source]¶
根据起始和停止信号,沿着第一个维度采样数据切片。
此类有放回地采样子轨迹。对于无放回采样版本,请参见
SliceSamplerWithoutReplacement
。注意
SliceSampler 检索轨迹索引可能很慢。为了加速其执行,优先使用 end_key 而非 traj_key,并考虑以下关键字参数:
compile
、cache_values
和use_gpu
。- 关键字参数:
num_slices (int) – 要采样的切片数量。批次大小必须大于或等于
num_slices
参数。与slice_len
互斥。slice_len (int) – 要采样的切片长度。批次大小必须大于或等于
slice_len
参数,并且必须能被其整除。与num_slices
互斥。end_key (NestedKey, 可选) – 指示轨迹(或片段)结束的键。默认为
("next", "done")
。traj_key (NestedKey, 可选) – 指示轨迹的键。默认为
"episode"
(在 TorchRL 的数据集中常用)。ends (torch.Tensor, 可选) – 包含运行结束信号的一维布尔张量。在获取
end_key
或traj_key
成本较高或此信号易于获得时使用。必须与cache_values=True
一起使用,且不能与end_key
或traj_key
同时使用。如果提供此参数,则假定存储已满,并且如果ends
张量的最后一个元素为False
,则同一轨迹跨越结束和开始。trajectories (torch.Tensor, 可选) – 包含运行 ID 的一维整数张量。在获取
end_key
或traj_key
成本较高或此信号易于获得时使用。必须与cache_values=True
一起使用,且不能与end_key
或traj_key
同时使用。如果提供此参数,则假定存储已满,并且如果轨迹张量的最后一个元素与第一个元素相同,则同一轨迹跨越结束和开始。cache_values (bool, 可选) –
与静态数据集一起使用。将缓存轨迹的起始和结束信号。即使在调用
extend
期间轨迹索引发生变化,此操作也会擦除缓存,因此可以安全使用。警告
cache_values=True
在采样器与由另一个缓冲区扩展的存储一起使用时将无法正常工作。例如:>>> buffer0 = ReplayBuffer(storage=storage, ... sampler=SliceSampler(num_slices=8, cache_values=True), ... writer=ImmutableWriter()) >>> buffer1 = ReplayBuffer(storage=storage, ... sampler=other_sampler) >>> # Wrong! Does not erase the buffer from the sampler of buffer0 >>> buffer1.extend(data)
警告
如果缓冲区在进程之间共享,且一个进程负责写入,一个进程负责采样,则
cache_values=True
将无法按预期工作,因为擦除缓存只能在本地完成。truncated_key (NestedKey, 可选) – 如果不是
None
,此参数指示应在输出数据中写入截断信号的位置。这用于向价值估计器指示提供的轨迹在哪里中断。默认为("next", "truncated")
。此功能仅适用于TensorDictReplayBuffer
实例(否则截断键在sample()
方法返回的信息字典中返回)。strict_length (bool, 可选) – 如果为
False
,则允许长度短于 slice_len(或 batch_size // num_slices)的轨迹出现在批次中。如果为True
,则会过滤掉短于所需长度的轨迹。请注意,这可能导致实际 batch_size 小于请求的大小!可以使用split_trajectories()
分割轨迹。默认为True
。compile (bool or dict of kwargs, 可选) – 如果为
True
,则sample()
方法的瓶颈将使用compile()
进行编译。关键字参数也可以通过此参数传递给 torch.compile。默认为False
。span (bool, int, Tuple[bool | int, bool | int], 可选) – 如果提供此参数,采样的轨迹将跨越左侧和/或右侧。这意味着提供的元素可能少于所需数量。布尔值表示每个轨迹至少会采样一个元素。整数 i 表示每个采样轨迹至少会收集 slice_len - i 个样本。使用元组可以对左侧(存储轨迹的开头)和右侧(存储轨迹的末尾)的跨越进行细粒度控制。
use_gpu (bool or torch.device) – 如果为
True
(或传递了设备),则将使用加速器来检索轨迹起始的索引。当缓冲区内容较大时,这可以显著加速采样。默认为False
。
注意
为了恢复存储中的轨迹分割,
SliceSampler
将首先尝试在存储中找到traj_key
条目。如果找不到,将使用end_key
重建片段。注意
当使用 strict_length=False 时,建议使用
split_trajectories()
来分割采样到的轨迹。然而,如果来自同一片段的两个样本相邻放置,这可能会产生错误的结果。为了避免这个问题,请考虑以下解决方案之一:将
TensorDictReplayBuffer
实例与切片采样器一起使用>>> import torch >>> from tensordict import TensorDict >>> from torchrl.collectors.utils import split_trajectories >>> from torchrl.data import TensorDictReplayBuffer, ReplayBuffer, LazyTensorStorage, SliceSampler, SliceSamplerWithoutReplacement >>> >>> rb = TensorDictReplayBuffer(storage=LazyTensorStorage(max_size=1000), ... sampler=SliceSampler( ... slice_len=5, traj_key="episode",strict_length=False, ... )) ... >>> ep_1 = TensorDict( ... {"obs": torch.arange(100), ... "episode": torch.zeros(100),}, ... batch_size=[100] ... ) >>> ep_2 = TensorDict( ... {"obs": torch.arange(4), ... "episode": torch.ones(4),}, ... batch_size=[4] ... ) >>> rb.extend(ep_1) >>> rb.extend(ep_2) >>> >>> s = rb.sample(50) >>> print(s) TensorDict( fields={ episode: Tensor(shape=torch.Size([46]), device=cpu, dtype=torch.float32, is_shared=False), index: Tensor(shape=torch.Size([46, 1]), device=cpu, dtype=torch.int64, is_shared=False), next: TensorDict( fields={ done: Tensor(shape=torch.Size([46, 1]), device=cpu, dtype=torch.bool, is_shared=False), terminated: Tensor(shape=torch.Size([46, 1]), device=cpu, dtype=torch.bool, is_shared=False), truncated: Tensor(shape=torch.Size([46, 1]), device=cpu, dtype=torch.bool, is_shared=False)}, batch_size=torch.Size([46]), device=cpu, is_shared=False), obs: Tensor(shape=torch.Size([46]), device=cpu, dtype=torch.int64, is_shared=False)}, batch_size=torch.Size([46]), device=cpu, is_shared=False) >>> t = split_trajectories(s, done_key="truncated") >>> print(t["obs"]) tensor([[73, 74, 75, 76, 77], [ 0, 1, 2, 3, 0], [ 0, 1, 2, 3, 0], [41, 42, 43, 44, 45], [ 0, 1, 2, 3, 0], [67, 68, 69, 70, 71], [27, 28, 29, 30, 31], [80, 81, 82, 83, 84], [17, 18, 19, 20, 21], [ 0, 1, 2, 3, 0]]) >>> print(t["episode"]) tensor([[0., 0., 0., 0., 0.], [1., 1., 1., 1., 0.], [1., 1., 1., 1., 0.], [0., 0., 0., 0., 0.], [1., 1., 1., 1., 0.], [0., 0., 0., 0., 0.], [0., 0., 0., 0., 0.], [0., 0., 0., 0., 0.], [0., 0., 0., 0., 0.], [1., 1., 1., 1., 0.]])
使用
SliceSamplerWithoutReplacement
>>> import torch >>> from tensordict import TensorDict >>> from torchrl.collectors.utils import split_trajectories >>> from torchrl.data import ReplayBuffer, LazyTensorStorage, SliceSampler, SliceSamplerWithoutReplacement >>> >>> rb = ReplayBuffer(storage=LazyTensorStorage(max_size=1000), ... sampler=SliceSamplerWithoutReplacement( ... slice_len=5, traj_key="episode",strict_length=False ... )) ... >>> ep_1 = TensorDict( ... {"obs": torch.arange(100), ... "episode": torch.zeros(100),}, ... batch_size=[100] ... ) >>> ep_2 = TensorDict( ... {"obs": torch.arange(4), ... "episode": torch.ones(4),}, ... batch_size=[4] ... ) >>> rb.extend(ep_1) >>> rb.extend(ep_2) >>> >>> s = rb.sample(50) >>> t = split_trajectories(s, trajectory_key="episode") >>> print(t["obs"]) tensor([[75, 76, 77, 78, 79], [ 0, 1, 2, 3, 0]]) >>> print(t["episode"]) tensor([[0., 0., 0., 0., 0.], [1., 1., 1., 1., 0.]])
示例
>>> import torch >>> from tensordict import TensorDict >>> from torchrl.data.replay_buffers import LazyMemmapStorage, TensorDictReplayBuffer >>> from torchrl.data.replay_buffers.samplers import SliceSampler >>> torch.manual_seed(0) >>> rb = TensorDictReplayBuffer( ... storage=LazyMemmapStorage(1_000_000), ... sampler=SliceSampler(cache_values=True, num_slices=10), ... batch_size=320, ... ) >>> episode = torch.zeros(1000, dtype=torch.int) >>> episode[:300] = 1 >>> episode[300:550] = 2 >>> episode[550:700] = 3 >>> episode[700:] = 4 >>> data = TensorDict( ... { ... "episode": episode, ... "obs": torch.randn((3, 4, 5)).expand(1000, 3, 4, 5), ... "act": torch.randn((20,)).expand(1000, 20), ... "other": torch.randn((20, 50)).expand(1000, 20, 50), ... }, [1000] ... ) >>> rb.extend(data) >>> sample = rb.sample() >>> print("sample:", sample) >>> print("episodes", sample.get("episode").unique()) episodes tensor([1, 2, 3, 4], dtype=torch.int32)
SliceSampler
与大多数 TorchRL 数据集默认兼容示例
>>> import torch >>> >>> from torchrl.data.datasets import RobosetExperienceReplay >>> from torchrl.data import SliceSampler >>> >>> torch.manual_seed(0) >>> num_slices = 10 >>> dataid = list(RobosetExperienceReplay.available_datasets)[0] >>> data = RobosetExperienceReplay(dataid, batch_size=320, sampler=SliceSampler(num_slices=num_slices)) >>> for batch in data: ... batch = batch.reshape(num_slices, -1) ... break >>> print("check that each batch only has one episode:", batch["episode"].unique(dim=1)) check that each batch only has one episode: tensor([[19], [14], [ 8], [10], [13], [ 4], [ 2], [ 3], [22], [ 8]])