PrioritizedSliceSampler¶
- class torchrl.data.replay_buffers.PrioritizedSliceSampler(max_capacity: int, alpha: float, beta: float, eps: float = 1e-08, dtype: torch.dtype = torch.float32, reduction: str = 'max', *, num_slices: int = None, slice_len: int = None, end_key: NestedKey | None = ('next', 'done'), traj_key: NestedKey | None = 'episode', ends: torch.Tensor | None = None, trajectories: torch.Tensor | None = None, cache_values: bool = False, truncated_key: NestedKey | None = ('next', 'truncated'), strict_length: bool = True, compile: bool | dict = False, span: bool | int | Tuple[bool | int, bool | int] = False, max_priority_within_buffer: bool = False)[source]¶
根据开始和停止信号,沿第一维度使用优先采样对数据切片进行采样。
- 此类根据“Schaul, T.; Quan, J.; Antonoglou, I.; and Silver, D. 2015 年论文”中提出的优先级权重,有替换地对子轨迹进行采样。
优先经验回放” (https://arxiv.org/abs/1511.05952)
更多信息请参见
SliceSampler
和PrioritizedSampler
。警告
PrioritizedSliceSampler 将查看单个转换的优先级,并据此对起始点进行采样。这意味着如果低优先级转换紧随更高优先级的转换,它们也可能出现在样本中;而高优先级但更靠近轨迹末尾的转换,如果不能用作起始点,则可能永远不会被采样。目前,用户有责任使用
update_priority()
方法聚合轨迹项的优先级。- 参数:
alpha (
float
) – 指数 α 决定了优先化的程度,α = 0 对应于均匀采样的情况。beta (
float
) – 重要性采样的负指数。eps (
float
, 可选) – 添加到优先级中的 delta 值,以确保缓冲区不包含零优先级。默认为 1e-8。reduction (str, 可选) – 多维 tensordicts(即存储的轨迹)的归约方法。可以是“max”、“min”、“median”或“mean”之一。
- 关键字参数:
num_slices (int) – 要采样的切片数量。批处理大小(batch-size)必须大于或等于
num_slices
参数。与slice_len
互斥。slice_len (int) – 要采样的切片的长度。批处理大小(batch-size)必须大于或等于
slice_len
参数,并且可以被其整除。与num_slices
互斥。end_key (NestedKey, 可选) – 指示轨迹(或 episode)结束的键。默认为
("next", "done")
。traj_key (NestedKey, 可选) – 指示轨迹的键。默认为
"episode"
(TorchRL 数据集中常用)。ends (torch.Tensor, 可选) – 包含运行结束信号的 1d 布尔张量。当获取
end_key
或traj_key
代价较高,或者当此信号已准备好时使用。必须与cache_values=True
一起使用,且不能与end_key
或traj_key
结合使用。trajectories (torch.Tensor, 可选) – 包含运行 ID 的 1d 整型张量。当获取
end_key
或traj_key
代价较高,或者当此信号已准备好时使用。必须与cache_values=True
一起使用,且不能与end_key
或traj_key
结合使用。cache_values (bool, 可选) –
用于静态数据集。将缓存轨迹的开始和结束信号。即使在调用
extend
期间轨迹索引发生变化,也可以安全使用,因为此操作将清除缓存。警告
cache_values=True
在采样器与由另一个缓冲区扩展的存储一起使用时将不起作用。例如>>> buffer0 = ReplayBuffer(storage=storage, ... sampler=SliceSampler(num_slices=8, cache_values=True), ... writer=ImmutableWriter()) >>> buffer1 = ReplayBuffer(storage=storage, ... sampler=other_sampler) >>> # Wrong! Does not erase the buffer from the sampler of buffer0 >>> buffer1.extend(data)
警告
cache_values=True
如果缓冲区在进程之间共享,一个进程负责写入,一个进程负责采样,则将无法按预期工作,因为清除缓存只能在本地完成。truncated_key (NestedKey, 可选) – 如果不是
None
,此参数指示截断信号应写入输出数据中的位置。这用于向值估计器指示提供的轨迹中断的位置。默认为("next", "truncated")
。此功能仅适用于TensorDictReplayBuffer
实例(否则截断键将在sample()
方法返回的信息字典中返回)。strict_length (bool, 可选) – 如果为
False
,长度小于 slice_len(或 batch_size // num_slices)的轨迹将被允许出现在批处理中。如果为True
,则将过滤掉短于要求的轨迹。请注意,这可能导致实际的 batch_size 小于请求的大小!轨迹可以使用split_trajectories()
函数进行分割。默认为True
。compile (bool 或 dict of kwargs, 可选) – 如果为
True
,sample()
方法的瓶颈部分将使用compile()
进行编译。关键字参数也可以通过此参数传递给 torch.compile。默认为False
。span (bool, int, Tuple[bool | int, bool | int], 可选) – 如果提供,采样的轨迹将跨越左侧和/或右侧。这意味着提供的元素数量可能少于所需数量。布尔值表示每个轨迹至少会采样一个元素。整数 i 表示每个采样的轨迹至少会收集 slice_len - i 个样本。使用元组可以精细控制左侧(存储轨迹的开始)和右侧(存储轨迹的结束)的跨度。
max_priority_within_buffer (bool, 可选) – 如果为
True
,则在缓冲区内跟踪最大优先级。如果为False
,则最大优先级跟踪自采样器实例化以来的最大值。默认为False
。
示例
>>> import torch >>> from torchrl.data.replay_buffers import TensorDictReplayBuffer, LazyMemmapStorage, PrioritizedSliceSampler >>> from tensordict import TensorDict >>> sampler = PrioritizedSliceSampler(max_capacity=9, num_slices=3, alpha=0.7, beta=0.9) >>> rb = TensorDictReplayBuffer(storage=LazyMemmapStorage(9), sampler=sampler, batch_size=6) >>> data = TensorDict( ... { ... "observation": torch.randn(9,16), ... "action": torch.randn(9, 1), ... "episode": torch.tensor([0,0,0,1,1,1,2,2,2], dtype=torch.long), ... "steps": torch.tensor([0,1,2,0,1,2,0,1,2], dtype=torch.long), ... ("next", "observation"): torch.randn(9,16), ... ("next", "reward"): torch.randn(9,1), ... ("next", "done"): torch.tensor([0,0,1,0,0,1,0,0,1], dtype=torch.bool).unsqueeze(1), ... }, ... batch_size=[9], ... ) >>> rb.extend(data) >>> sample, info = rb.sample(return_info=True) >>> print("episode", sample["episode"].tolist()) episode [2, 2, 2, 2, 1, 1] >>> print("steps", sample["steps"].tolist()) steps [1, 2, 0, 1, 1, 2] >>> print("weight", info["_weight"].tolist()) weight [1.0, 1.0, 1.0, 1.0, 1.0, 1.0] >>> priority = torch.tensor([0,3,3,0,0,0,1,1,1]) >>> rb.update_priority(torch.arange(0,9,1), priority=priority) >>> sample, info = rb.sample(return_info=True) >>> print("episode", sample["episode"].tolist()) episode [2, 2, 2, 2, 2, 2] >>> print("steps", sample["steps"].tolist()) steps [1, 2, 0, 1, 0, 1] >>> print("weight", info["_weight"].tolist()) weight [9.120110917137936e-06, 9.120110917137936e-06, 9.120110917137936e-06, 9.120110917137936e-06, 9.120110917137936e-06, 9.120110917137936e-06]
- update_priority(index: Union[int, torch.Tensor], priority: Union[float, torch.Tensor], *, storage: TensorStorage | None = None) None ¶
更新索引指向的数据的优先级。
- 参数:
index (int 或 torch.Tensor) – 要更新优先级的索引。
priority (Number 或 torch.Tensor) – 索引元素的新的优先级。
- 关键字参数:
storage (Storage, 可选) – 用于将 Nd 索引大小映射到 sum_tree 和 min_tree 的 1d 大小的存储。仅在
index.ndim > 2
时需要。