快捷方式

LazyMemmapStorage

class torchrl.data.replay_buffers.LazyMemmapStorage(max_size: int, *, scratch_dir=None, device: device = 'cpu', ndim: int = 1, existsok: bool = False, compilable: bool = False)[source]

用于张量 (tensors) 和 tensordicts 的内存映射存储。

参数:

max_size (int) – 存储大小,即缓冲区中存储的最大元素数量。

关键字参数:
  • scratch_dir (strpath) – 内存映射张量将写入的目录。

  • device (torch.device, 可选) – 采样张量将被存储和发送到的设备。默认值为 torch.device("cpu")。如果提供 None,设备将从传入的第一批数据中自动获取。默认情况下不启用此功能,以避免数据被错误地放置在 GPU 上,导致 OOM 问题。

  • ndim (int, 可选) – 计算存储大小时要考虑的维度数量。例如,形状为 [3, 4] 的存储,如果 ndim=1,容量为 3;如果 ndim=2,容量为 12。默认值为 1

  • existsok (bool, 可选) – 如果任何张量已存在于磁盘上,是否应该引发错误。默认值为 True。如果为 False,张量将按原样打开,不会被覆盖。

注意

在检查点 LazyMemmapStorage 时,可以提供与存储已存储位置相同的路径,以避免执行长时间的数据复制,因为数据已存储在磁盘上。这仅在使用默认的 TensorStorageCheckpointer 检查点器时有效。示例

>>> from tensordict import TensorDict
>>> from torchrl.data import TensorStorage, LazyMemmapStorage, ReplayBuffer
>>> import tempfile
>>> from pathlib import Path
>>> import time
>>> td = TensorDict(a=0, b=1).expand(1000).clone()
>>> # We pass a path that is <main_ckpt_dir>/storage to LazyMemmapStorage
>>> rb_memmap = ReplayBuffer(storage=LazyMemmapStorage(10_000_000, scratch_dir="dump/storage"))
>>> rb_memmap.extend(td);
>>> # Checkpointing in `dump` is a zero-copy, as the data is already in `dump/storage`
>>> rb_memmap.dumps(Path("./dump"))

示例

>>> data = TensorDict({
...     "some data": torch.randn(10, 11),
...     ("some", "nested", "data"): torch.randn(10, 11, 12),
... }, batch_size=[10, 11])
>>> storage = LazyMemmapStorage(100)
>>> storage.set(range(10), data)
>>> len(storage)  # only the first dimension is considered as indexable
10
>>> storage.get(0)
TensorDict(
    fields={
        some data: MemoryMappedTensor(shape=torch.Size([11]), device=cpu, dtype=torch.float32, is_shared=False),
        some: TensorDict(
            fields={
                nested: TensorDict(
                    fields={
                        data: MemoryMappedTensor(shape=torch.Size([11, 12]), device=cpu, dtype=torch.float32, is_shared=False)},
                    batch_size=torch.Size([11]),
                    device=cpu,
                    is_shared=False)},
            batch_size=torch.Size([11]),
            device=cpu,
            is_shared=False)},
    batch_size=torch.Size([11]),
    device=cpu,
    is_shared=False)

此类也支持 tensorclass 数据。

示例

>>> from tensordict import tensorclass
>>> @tensorclass
... class MyClass:
...     foo: torch.Tensor
...     bar: torch.Tensor
>>> data = MyClass(foo=torch.randn(10, 11), bar=torch.randn(10, 11, 12), batch_size=[10, 11])
>>> storage = LazyMemmapStorage(10)
>>> storage.set(range(10), data)
>>> storage.get(0)
MyClass(
    bar=MemoryMappedTensor(shape=torch.Size([11, 12]), device=cpu, dtype=torch.float32, is_shared=False),
    foo=MemoryMappedTensor(shape=torch.Size([11]), device=cpu, dtype=torch.float32, is_shared=False),
    batch_size=torch.Size([11]),
    device=cpu,
    is_shared=False)
attach(buffer: Any) None

此函数将采样器附加到此存储。

从该存储读取的缓冲区必须通过调用此方法作为附加实体包含在内。这保证了当存储中的数据发生变化时,即使存储与其他缓冲区(例如,优先级采样器)共享,组件也会意识到这些变化。

参数:

buffer – 从此存储读取的对象。

dump(*args, **kwargs)

dumps() 的别名。

load(*args, **kwargs)

loads() 的别名。

save(*args, **kwargs)

dumps() 的别名。

文档

访问 PyTorch 的完整开发者文档

查看文档

教程

获取面向初学者和高级开发者的深度教程

查看教程

资源

查找开发资源并获得问题解答

查看资源