保存 TensorDict 和 tensorclass 对象¶
虽然我们可以使用 save()
保存 tensordict,但这会创建一个包含数据结构全部内容的单个文件。很容易想象在某些情况下,这样做效率不高!
TensorDict 的序列化 API 主要依赖于 MemoryMappedTensor
,它用于将张量独立地写入磁盘,并使用模仿 TensorDict 结构的数据结构。
TensorDict 的序列化速度比 PyTorch 的 save()
(依赖 pickle)快一个数量级。本文档说明了如何使用 TensorDict 创建和交互存储在磁盘上的数据。
保存内存映射的 TensorDict¶
当 tensordict 作为 mmap 数据结构转储时,每个条目对应一个 *.memmap
文件,目录结构由键结构决定:通常,嵌套键对应子目录。
将数据结构保存为结构化的内存映射张量集具有以下优点
可以部分加载保存的数据。如果一个大型模型保存在磁盘上,但只需要将部分权重加载到在单独脚本中创建的模块中,则只会将这些权重加载到内存中。
保存数据是安全的:使用 pickle 库序列化大型数据结构可能是不安全的,因为取消序列化可以执行任何任意代码。TensorDict 的加载 API 仅读取从保存的 json 文件和磁盘上保存的内存缓冲区中预先选择的字段。
保存速度快:因为数据写入多个独立的文件,我们可以通过启动多个并发线程(每个线程访问自己的专用文件)来分摊 IO 开销。
保存数据的结构很明显:目录树指示了数据内容。
但是,这种方法也有一些缺点
并非所有数据类型都可以保存。
tensorclass
允许保存任何非张量数据:如果这些数据可以用 json 文件表示,则将使用 json 格式。否则,非张量数据将使用save()
作为后备独立保存。NonTensorData
类可用于在常规TensorDict
实例中表示非张量数据。
tensordict 的内存映射 API 依赖于四种核心方法: memmap_()
、 memmap()
、 memmap_like()
和 load_memmap()
。
memmap_()
和 memmap()
方法将数据写入磁盘,无论是否修改包含数据的 tensordict 实例。这些方法可用于将模型序列化到磁盘(我们使用多个线程来加速序列化)
>>> model = nn.Transformer()
>>> weights = TensorDict.from_module(model)
>>> weights_disk = weights.memmap("/path/to/saved/dir", num_threads=32)
>>> new_weights = TensorDict.load_memmap("/path/to/saved/dir")
>>> assert (weights_disk == new_weights).all()
memmap_like()
用于在磁盘上预分配数据集时,典型用法为
>>> def make_datum(): # used for illustration purposes
... return TensorDict({"image": torch.randint(255, (3, 64, 64)), "label": 0}, batch_size=[])
>>> dataset_size = 1_000_000
>>> datum = make_datum() # creates a single instance of a TensorDict datapoint
>>> data = datum.expand(dataset_size) # does NOT require more memory usage than datum, since it's only a view on datum!
>>> data_disk = data.memmap_like("/path/to/data") # creates the two memory-mapped tensors on disk
>>> del data # data is not needed anymore
如上所示,当将 TensorDict`
的条目转换为 MemoryMappedTensor
时,可以控制内存映射保存在磁盘上的位置,以便它们持久化并可以在以后加载。另一方面,也可以使用文件系统。要使用它,只需在上述三个序列化方法中丢弃 prefix
参数即可。
当指定 prefix
时,数据结构遵循 TensorDict 的结构
>>> import torch
>>> from tensordict import TensorDict
>>> td = TensorDict({"a": torch.rand(10), "b": {"c": torch.rand(10)}}, [10])
>>> td.memmap_(prefix="tensordict")
生成以下目录结构
tensordict
├── a.memmap
├── b
│ ├── c.memmap
│ └── meta.json
└── meta.json
meta.json
文件包含重建 tensordict 的所有相关信息,例如设备、批次大小,以及 tensordict 子类型。这意味着 load_memmap()
将能够重建复杂的嵌套结构,其中子 tensordict 与父 tensordict 具有不同的类型
>>> from tensordict import TensorDict, tensorclass, TensorDictBase
>>> from tensordict.utils import print_directory_tree
>>> import torch
>>> import tempfile
>>> td_list = [TensorDict({"item": i}, batch_size=[]) for i in range(4)]
>>> @tensorclass
... class MyClass:
... data: torch.Tensor
... metadata: str
>>> tc = MyClass(torch.randn(3), metadata="some text", batch_size=[])
>>> data = TensorDict({"td_list": torch.stack(td_list), "tensorclass": tc}, [])
>>> with tempfile.TemporaryDirectory() as tempdir:
... data.memmap_(tempdir)
...
... loaded_data = TensorDictBase.load_memmap(tempdir)
... assert (loaded_data == data).all()
... print_directory_tree(tempdir)
tmpzy1jcaoq/
tensorclass/
_tensordict/
data.memmap
meta.json
meta.json
td_list/
0/
item.memmap
meta.json
1/
item.memmap
meta.json
3/
item.memmap
meta.json
2/
item.memmap
meta.json
meta.json
meta.json
处理现有的 MemoryMappedTensor
¶
如果 TensorDict`
已经包含 MemoryMappedTensor
条目,则有几种可能的行为。
如果未指定
prefix
并且调用了两次memmap()
,则生成的 TensorDict 将包含与原始数据相同的数据。>>> td = TensorDict({"a": 1}, []) >>> td0 = td.memmap() >>> td1 = td0.memmap() >>> td0["a"] is td1["a"] True
如果指定了
prefix
并且它与现有MemoryMappedTensor
实例的前缀不同,则会引发异常,除非传递了 copy_existing=True>>> with tempfile.TemporaryDirectory() as tmpdir_0: ... td0 = td.memmap(tmpdir_0) ... td0 = td.memmap(tmpdir_0) # works, results are just overwritten ... with tempfile.TemporaryDirectory() as tmpdir_1: ... td1 = td0.memmap(tmpdir_1) ... td_load = TensorDict.load_memmap(tmpdir_1) # works! ... assert (td_load == td).all() ... with tempfile.TemporaryDirectory() as tmpdir_1: ... td_load = TensorDict.load_memmap(tmpdir_1) # breaks!
此功能旨在防止用户无意中将内存映射张量从一个位置复制到另一个位置。
TorchSnapshot 兼容性¶
警告
由于 torchsnapshot 的维护即将停止。因此,我们不会为 tensordict 与此库的兼容性实现新功能。
TensorDict 与 torchsnapshot(一个 PyTorch 检查点库)兼容。TorchSnapshot 将独立保存每个张量,并使用模仿 tensordict 或 tensorclass 结构的数据结构。此外,TensorDict 天然内置了在磁盘上保存和加载大型数据集的必要工具,而无需将完整张量加载到内存中:换句话说,tensordict + torchsnapshot 的组合使得可以将大小为数百 GB 的张量加载到预分配的 MemmapTensor
上,而无需将其以单个块的形式传递到 RAM 中。
主要有两个用例:保存和加载适合内存的 tensordict,以及保存和加载使用 MemmapTensor
存储在磁盘上的 tensordict。
一般用例:内存加载¶
如果目标 tensordict 未预分配,则此方法适用。它提供了灵活性(您可以将任何 tensordict 加载到您的 tensordict 中,无需事先知道其内容),并且此方法比其他方法更容易编码。但是,如果您的张量非常大且不适合内存,则可能会出错。此外,它不允许您直接加载到您选择的设备上。
保存操作需要记住的两个主要命令是
>>> state = {"state": tensordict_source}
>>> snapshot = torchsnapshot.Snapshot.take(app_state=state, path="/path/to/my/snapshot")
要加载到目标 tensordict 上,您可以简单地加载快照并更新 tensordict。在后台,此方法将调用 tensordict_target.load_state_dict(state_dict)
,这意味着 state_dict
将首先完全加载到内存中,然后加载到目标 tensordict 上
>>> snapshot = Snapshot(path="/path/to/my/snapshot")
>>> state_target = {"state": tensordict_target}
>>> snapshot.restore(app_state=state_target)
这是一个完整的示例
>>> import uuid
>>> import torchsnapshot
>>> from tensordict import TensorDict
>>> import torch
>>>
>>> tensordict_source = TensorDict({"a": torch.randn(3), "b": {"c": torch.randn(3)}}, [])
>>> state = {"state": tensordict}
>>> path = f"/tmp/{uuid.uuid4()}"
>>> snapshot = torchsnapshot.Snapshot.take(app_state=state, path=path)
>>> # later
>>> snapshot = torchsnapshot.Snapshot(path=path)
>>> tensordict2 = TensorDict({}, [])
>>> target_state = {
>>> "state": tensordict2
>>> }
>>> snapshot.restore(app_state=target_state)
>>> assert (tensordict == tensordict2).all()
保存和加载大型数据集¶
如果数据集太大而无法放入内存,上述方法很容易出错。我们利用 torchsnapshot 的功能,将张量以小块的形式加载到其预分配的目标上。这需要您知道目标数据的形状、设备等信息,以及它所在的存储位置,但这对于能够检查点您的模型或数据加载来说是微不足道的代价!
与前面的示例相反,我们不会使用 load_state_dict()
方法 TensorDict
,而是使用从目标对象获得的 state_dict
,我们将用保存的数据重新填充它。
同样,两行代码就足以保存数据
>>> app_state = {
... "state": torchsnapshot.StateDict(tensordict=tensordict_source.state_dict(keep_vars=True))
... }
>>> snapshot = torchsnapshot.Snapshot.take(app_state=app_state, path="/path/to/my/snapshot")
我们一直在使用 torchsnapshot.StateDict
,并且我们显式地调用了 my_tensordict_source.state_dict(keep_vars=True)
,这与前面的示例不同。现在,要将其加载到目标 tensordict 上
>>> snapshot = Snapshot(path="/path/to/my/snapshot")
>>> app_state = {
... "state": torchsnapshot.StateDict(tensordict=tensordict_target.state_dict(keep_vars=True))
... }
>>> snapshot.restore(app_state=app_state)
在这个示例中,加载完全由 torchsnapshot 处理,即没有调用 TensorDict.load_state_dict()
。
注意
这有两个重要的含义
由于
LazyStackedTensorDict.state_dict()
(以及其他延迟 tensordict 类)在执行某些操作后返回数据的副本,因此加载到 state-dict 上不会更新原始类。但是,由于支持 state_dict() 操作,因此不会引发错误。同样,由于 state-dict 是就地更新的,但 tensordict 没有使用
TensorDict.update()
或TensorDict.set()
更新,因此目标 tensordict 中缺少的键将不会被注意到。
这是一个完整的示例
>>> td = TensorDict({"a": torch.randn(3), "b": TensorDict({"c": torch.randn(3, 1)}, [3, 1])}, [3])
>>> td.memmap_()
>>> assert isinstance(td["b", "c"], MemmapTensor)
>>>
>>> app_state = {
... "state": torchsnapshot.StateDict(tensordict=td.state_dict(keep_vars=True))
... }
>>> snapshot = torchsnapshot.Snapshot.take(app_state=app_state, path=f"/tmp/{uuid.uuid4()}")
>>>
>>>
>>> td_dest = TensorDict({"a": torch.zeros(3), "b": TensorDict({"c": torch.zeros(3, 1)}, [3, 1])}, [3])
>>> td_dest.memmap_()
>>> assert isinstance(td_dest["b", "c"], MemmapTensor)
>>> app_state = {
... "state": torchsnapshot.StateDict(tensordict=td_dest.state_dict(keep_vars=True))
... }
>>> snapshot.restore(app_state=app_state)
>>> # sanity check
>>> assert (td_dest == td).all()
>>> assert (td_dest["b"].batch_size == td["b"].batch_size)
>>> assert isinstance(td_dest["b", "c"], MemmapTensor)
最后,tensorclass 也支持此功能。代码与上面的代码非常相似
>>> from __future__ import annotations
>>> import uuid
>>> from typing import Union, Optional
>>>
>>> import torchsnapshot
>>> from tensordict import TensorDict, MemmapTensor
>>> import torch
>>> from tensordict.prototype import tensorclass
>>>
>>> @tensorclass
>>> class MyClass:
... x: torch.Tensor
... y: Optional[MyClass]=None
...
>>> tc = MyClass(x=torch.randn(3), y=MyClass(x=torch.randn(3), batch_size=[]), batch_size=[])
>>> tc.memmap_()
>>> assert isinstance(tc.y.x, MemmapTensor)
>>>
>>> app_state = {
... "state": torchsnapshot.StateDict(tensordict=tc.state_dict(keep_vars=True))
... }
>>> snapshot = torchsnapshot.Snapshot.take(app_state=app_state, path=f"/tmp/{uuid.uuid4()}")
>>>
>>> tc_dest = MyClass(x=torch.randn(3), y=MyClass(x=torch.randn(3), batch_size=[]), batch_size=[])
>>> tc_dest.memmap_()
>>> assert isinstance(tc_dest.y.x, MemmapTensor)
>>> app_state = {
... "state": torchsnapshot.StateDict(tensordict=tc_dest.state_dict(keep_vars=True))
... }
>>> snapshot.restore(app_state=app_state)
>>>
>>> assert (tc_dest == tc).all()
>>> assert (tc_dest.y.batch_size == tc.y.batch_size)
>>> assert isinstance(tc_dest.y.x, MemmapTensor)