SliceSampler¶
- class torchrl.data.replay_buffers.SliceSampler(*, num_slices: Optional[int] = None, slice_len: Optional[int] = None, end_key: Optional[NestedKey] = None, traj_key: Optional[NestedKey] = None, ends: Optional[Tensor] = None, trajectories: Optional[Tensor] = None, cache_values: bool = False, truncated_key: tensordict._nestedkey.NestedKey | None = ('next', 'truncated'), strict_length: bool = True, compile: bool | dict = False, span: Union[bool, int, Tuple[bool | int, bool | int]] = False)[source]¶
沿第一个维度采样数据切片,给定起始和停止信号。
此类使用替换方式采样子轨迹。对于不替换的版本,请参阅
SliceSamplerWithoutReplacement
。- 关键字参数:
num_slices (int) – 要采样的切片数量。批大小必须大于或等于
num_slices
参数。与slice_len
互斥。slice_len (int) – 要采样的切片长度。批大小必须大于或等于
slice_len
参数,并且可以被其整除。与num_slices
互斥。end_key (NestedKey, optional) – 指示轨迹(或 episode)结束的键。默认为
("next", "done")
。traj_key (NestedKey, optional) – 指示轨迹的键。默认为
"episode"
(在 TorchRL 的数据集中常用)。ends (torch.Tensor, optional) – 包含运行结束信号的 1 维布尔张量。当
end_key
或traj_key
获取成本高昂,或者此信号已准备就绪时使用。必须与cache_values=True
一起使用,并且不能与end_key
或traj_key
结合使用。如果提供,则假定存储已满容量,并且如果ends
张量的最后一个元素为False
,则同一轨迹跨越结束和开始。trajectories (torch.Tensor, optional) – 包含运行 ID 的 1 维整数张量。当
end_key
或traj_key
获取成本高昂,或者此信号已准备就绪时使用。必须与cache_values=True
一起使用,并且不能与end_key
或traj_key
结合使用。如果提供,则假定存储已满容量,并且如果轨迹张量的最后一个元素与第一个元素相同,则同一轨迹跨越结束和开始。cache_values (bool, optional) –
与静态数据集一起使用。将缓存轨迹的起始和结束信号。即使在调用
extend
期间轨迹索引发生更改,也可以安全地使用此选项,因为此操作将擦除缓存。警告
如果采样器与由另一个缓冲区扩展的存储一起使用,则
cache_values=True
将不起作用。例如>>> buffer0 = ReplayBuffer(storage=storage, ... sampler=SliceSampler(num_slices=8, cache_values=True), ... writer=ImmutableWriter()) >>> buffer1 = ReplayBuffer(storage=storage, ... sampler=other_sampler) >>> # Wrong! Does not erase the buffer from the sampler of buffer0 >>> buffer1.extend(data)
警告
如果缓冲区在进程之间共享,并且一个进程负责写入,而另一个进程负责采样,则
cache_values=True
将无法按预期工作,因为擦除缓存只能在本地完成。truncated_key (NestedKey, optional) – 如果不是
None
,则此参数指示截断信号应写入输出数据的位置。这用于向值估计器指示提供的轨迹在哪里中断。默认为("next", "truncated")
。此功能仅适用于TensorDictReplayBuffer
实例(否则截断键将返回到sample()
方法返回的 info 字典中)。strict_length (bool, optional) – 如果为
False
,则允许长度短于 slice_len(或 batch_size // num_slices)的轨迹出现在批次中。如果为True
,则将滤除短于要求的轨迹。请注意,这可能会导致有效 batch_size 短于要求的批大小!可以使用split_trajectories()
拆分轨迹。默认为True
。compile (bool or dict of kwargs, optional) – 如果为
True
,则sample()
方法的瓶颈将使用compile()
进行编译。关键字参数也可以通过此参数传递给 torch.compile。默认为False
。span (bool, int, Tuple[bool | int, bool | int], optional) – 如果提供,则采样的轨迹将跨越左侧和/或右侧。这意味着提供的元素可能少于要求的元素。布尔值表示每个轨迹至少采样一个元素。整数 i 表示对于每个采样的轨迹,至少收集 slice_len - i 个样本。使用元组可以精细控制左侧(存储轨迹的开始)和右侧(存储轨迹的结束)的跨度。
注意
为了恢复存储中的轨迹分割,
SliceSampler
将首先尝试在存储中查找traj_key
条目。如果找不到,将使用end_key
重建 episode。示例
>>> import torch >>> from tensordict import TensorDict >>> from torchrl.data.replay_buffers import LazyMemmapStorage, TensorDictReplayBuffer >>> from torchrl.data.replay_buffers.samplers import SliceSampler >>> torch.manual_seed(0) >>> rb = TensorDictReplayBuffer( ... storage=LazyMemmapStorage(1_000_000), ... sampler=SliceSampler(cache_values=True, num_slices=10), ... batch_size=320, ... ) >>> episode = torch.zeros(1000, dtype=torch.int) >>> episode[:300] = 1 >>> episode[300:550] = 2 >>> episode[550:700] = 3 >>> episode[700:] = 4 >>> data = TensorDict( ... { ... "episode": episode, ... "obs": torch.randn((3, 4, 5)).expand(1000, 3, 4, 5), ... "act": torch.randn((20,)).expand(1000, 20), ... "other": torch.randn((20, 50)).expand(1000, 20, 50), ... }, [1000] ... ) >>> rb.extend(data) >>> sample = rb.sample() >>> print("sample:", sample) >>> print("episodes", sample.get("episode").unique()) episodes tensor([1, 2, 3, 4], dtype=torch.int32)
SliceSampler
默认情况下与 TorchRL 的大多数数据集兼容示例
>>> import torch >>> >>> from torchrl.data.datasets import RobosetExperienceReplay >>> from torchrl.data import SliceSampler >>> >>> torch.manual_seed(0) >>> num_slices = 10 >>> dataid = list(RobosetExperienceReplay.available_datasets)[0] >>> data = RobosetExperienceReplay(dataid, batch_size=320, sampler=SliceSampler(num_slices=num_slices)) >>> for batch in data: ... batch = batch.reshape(num_slices, -1) ... break >>> print("check that each batch only has one episode:", batch["episode"].unique(dim=1)) check that each batch only has one episode: tensor([[19], [14], [ 8], [10], [13], [ 4], [ 2], [ 3], [22], [ 8]])