LazyMemmapStorage¶
- class torchrl.data.replay_buffers.LazyMemmapStorage(max_size: int, *, scratch_dir=None, device: device = 'cpu', ndim: int = 1, existsok: bool = False, compilable: bool = False)[source]¶
用于张量 (tensors) 和 tensordicts 的内存映射存储。
- 参数:
max_size (int) – 存储大小,即缓冲区中存储的最大元素数量。
- 关键字参数:
scratch_dir (str 或 path) – 内存映射张量将写入的目录。
device (torch.device, 可选) – 采样张量将被存储和发送到的设备。默认值为
torch.device("cpu")
。如果提供None
,设备将从传入的第一批数据中自动获取。默认情况下不启用此功能,以避免数据被错误地放置在 GPU 上,导致 OOM 问题。ndim (int, 可选) – 计算存储大小时要考虑的维度数量。例如,形状为
[3, 4]
的存储,如果ndim=1
,容量为3
;如果ndim=2
,容量为12
。默认值为1
。existsok (bool, 可选) – 如果任何张量已存在于磁盘上,是否应该引发错误。默认值为
True
。如果为False
,张量将按原样打开,不会被覆盖。
注意
在检查点
LazyMemmapStorage
时,可以提供与存储已存储位置相同的路径,以避免执行长时间的数据复制,因为数据已存储在磁盘上。这仅在使用默认的TensorStorageCheckpointer
检查点器时有效。示例>>> from tensordict import TensorDict >>> from torchrl.data import TensorStorage, LazyMemmapStorage, ReplayBuffer >>> import tempfile >>> from pathlib import Path >>> import time >>> td = TensorDict(a=0, b=1).expand(1000).clone() >>> # We pass a path that is <main_ckpt_dir>/storage to LazyMemmapStorage >>> rb_memmap = ReplayBuffer(storage=LazyMemmapStorage(10_000_000, scratch_dir="dump/storage")) >>> rb_memmap.extend(td); >>> # Checkpointing in `dump` is a zero-copy, as the data is already in `dump/storage` >>> rb_memmap.dumps(Path("./dump"))
示例
>>> data = TensorDict({ ... "some data": torch.randn(10, 11), ... ("some", "nested", "data"): torch.randn(10, 11, 12), ... }, batch_size=[10, 11]) >>> storage = LazyMemmapStorage(100) >>> storage.set(range(10), data) >>> len(storage) # only the first dimension is considered as indexable 10 >>> storage.get(0) TensorDict( fields={ some data: MemoryMappedTensor(shape=torch.Size([11]), device=cpu, dtype=torch.float32, is_shared=False), some: TensorDict( fields={ nested: TensorDict( fields={ data: MemoryMappedTensor(shape=torch.Size([11, 12]), device=cpu, dtype=torch.float32, is_shared=False)}, batch_size=torch.Size([11]), device=cpu, is_shared=False)}, batch_size=torch.Size([11]), device=cpu, is_shared=False)}, batch_size=torch.Size([11]), device=cpu, is_shared=False)
此类也支持 tensorclass 数据。
示例
>>> from tensordict import tensorclass >>> @tensorclass ... class MyClass: ... foo: torch.Tensor ... bar: torch.Tensor >>> data = MyClass(foo=torch.randn(10, 11), bar=torch.randn(10, 11, 12), batch_size=[10, 11]) >>> storage = LazyMemmapStorage(10) >>> storage.set(range(10), data) >>> storage.get(0) MyClass( bar=MemoryMappedTensor(shape=torch.Size([11, 12]), device=cpu, dtype=torch.float32, is_shared=False), foo=MemoryMappedTensor(shape=torch.Size([11]), device=cpu, dtype=torch.float32, is_shared=False), batch_size=torch.Size([11]), device=cpu, is_shared=False)
- attach(buffer: Any) None ¶
此函数将采样器附加到此存储。
从该存储读取的缓冲区必须通过调用此方法作为附加实体包含在内。这保证了当存储中的数据发生变化时,即使存储与其他缓冲区(例如,优先级采样器)共享,组件也会意识到这些变化。
- 参数:
buffer – 从此存储读取的对象。
- dump(*args, **kwargs)¶
dumps()
的别名。
- load(*args, **kwargs)¶
loads()
的别名。
- save(*args, **kwargs)¶
dumps()
的别名。