PromptData¶
- class torchrl.data.PromptData(input_ids: 'torch.Tensor', attention_mask: 'torch.Tensor', prompt_rindex: 'torch.Tensor', labels: 'Optional[torch.Tensor]' = None, logits: 'Optional[torch.Tensor]' = None, loss: 'Optional[torch.Tensor]' = None, *, batch_size, device=None, names=None)[source]¶
-
- dumps(prefix: str | None = None, copy_existing: bool =False, *, num_threads: int =0, return_early: bool =False, share_non_tensor: bool =False) T ¶
将 tensordict 保存到磁盘。
此函数是
memmap()
的代理。
- classmethod fields()¶
返回描述此 dataclass 字段的元组。
接受 dataclass 或其实例。元组成员类型为 Field。
- classmethod from_dataset(split, dataset_name=None, max_length=550, root_dir=None, from_disk=False, num_workers: int | None = None)[source]¶
从数据集名称返回一个
PromptData
实例。- 参数:
split (str) – 根据需要的数据拆分,可以是
"train"
或"valid"
。dataset_name (str, optional) – 要处理的数据集名称。默认为
"CarperAI/openai_summarize_comparisons"
。max_length (int, optional) – 数据集序列的最大长度。默认为 550。
root_dir (path, optional) – 数据集存储的路径。默认为
"$HOME/.cache/torchrl/data"
from_disk (bool, optional) – 如果为
True
,则使用datasets.load_from_disk()
。否则,使用datasets.load_dataset()
。默认为False
。num_workers (int, optional) – 在分词期间调用
datasets.dataset.map()
的工作进程数。默认为max(os.cpu_count() // 2, 1)
。
- 返回: 一个
PromptData
实例,包含所需数据集的内存映射版本。
示例
>>> data = PromptData.from_dataset("train") >>> print(data) PromptDataTLDR( attention_mask=MemoryMappedTensor(shape=torch.Size([116722, 550]), device=cpu, dtype=torch.int64, is_shared=False), input_ids=MemoryMappedTensor(shape=torch.Size([116722, 550]), device=cpu, dtype=torch.int64, is_shared=False), prompt_rindex=MemoryMappedTensor(shape=torch.Size([116722]), device=cpu, dtype=torch.int64, is_shared=False), labels=MemoryMappedTensor(shape=torch.Size([116722, 550]), device=cpu, dtype=torch.int64, is_shared=False), logits=None, loss=None, batch_size=torch.Size([116722]), device=None, is_shared=False) >>> # data can be sampled from using regular indexing >>> sub_data = data[:3]
- classmethod from_tensordict(tensordict, non_tensordict=None, safe=True)¶
张量类包装器,用于实例化新的张量类对象。
- 参数:
tensordict (TensorDict) – 张量类型字典
non_tensordict (dict) – 包含非张量和嵌套张量类对象的字典
- get(key: NestedKey, *args, **kwargs)¶
获取与输入键关联存储的值。
- 参数:
key (str, tuple of str) – 要查询的键。如果是字符串元组,则相当于链式调用 getattr。
default – 如果在 tensorclass 中找不到键,则返回的默认值。
- 返回:
与输入键关联存储的值
- classmethod load(prefix: str | Path, *args, **kwargs) T ¶
从磁盘加载 tensordict。
此类方法是
load_memmap()
的代理。
- load_(prefix: str | Path, *args, **kwargs)¶
在当前 tensordict 中从磁盘加载 tensordict。
此方法是
load_memmap_()
的代理。
- classmethod load_memmap(prefix: str | Path, device: torch.device | None = None, non_blocking: bool =False, *, out: TensorDictBase | None = None) T ¶
从磁盘加载内存映射的 tensordict。
- 参数:
prefix (str 或 文件夹路径) – 应该从中获取已保存 tensordict 的文件夹路径。
device (torch.device 或 等效类型, 可选) – 如果提供,数据将异步转换为该设备。支持 “meta” 设备,在这种情况下,数据不会被加载,而是创建一组空的“meta”张量。这有助于在不实际打开任何文件的情况下了解模型总大小和结构。
non_blocking (bool, optional) – 如果为
True
,则在将张量加载到设备后不会调用同步操作。默认为False
。out (TensorDictBase, optional) – 可选的 tensordict,数据将写入其中。
示例
>>> from tensordict import TensorDict >>> td = TensorDict.fromkeys(["a", "b", "c", ("nested", "e")], 0) >>> td.memmap("./saved_td") >>> td_load = TensorDict.load_memmap("./saved_td") >>> assert (td == td_load).all()
此方法也允许加载嵌套的 tensordicts。
示例
>>> nested = TensorDict.load_memmap("./saved_td/nested") >>> assert nested["e"] == 0
tensordict 也可以加载到“meta”设备上,或者作为假张量加载。
示例
>>> import tempfile >>> td = TensorDict({"a": torch.zeros(()), "b": {"c": torch.zeros(())}}) >>> with tempfile.TemporaryDirectory() as path: ... td.save(path) ... td_load = TensorDict.load_memmap(path, device="meta") ... print("meta:", td_load) ... from torch._subclasses import FakeTensorMode ... with FakeTensorMode(): ... td_load = TensorDict.load_memmap(path) ... print("fake:", td_load) meta: TensorDict( fields={ a: Tensor(shape=torch.Size([]), device=meta, dtype=torch.float32, is_shared=False), b: TensorDict( fields={ c: Tensor(shape=torch.Size([]), device=meta, dtype=torch.float32, is_shared=False)}, batch_size=torch.Size([]), device=meta, is_shared=False)}, batch_size=torch.Size([]), device=meta, is_shared=False) fake: TensorDict( fields={ a: FakeTensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False), b: TensorDict( fields={ c: FakeTensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False)}, batch_size=torch.Size([]), device=cpu, is_shared=False)}, batch_size=torch.Size([]), device=cpu, is_shared=False)
- load_state_dict(state_dict: dict[str, Any], strict=True, assign=False, from_flatten=False)¶
尝试将 state_dict 原地加载到目标 tensorclass 上。
- memmap(prefix: str | None = None, copy_existing: bool =False, *, num_threads: int =0, return_early: bool =False, share_non_tensor: bool =False, existsok: bool =True) T ¶
将所有张量写入新 tensordict 中相应的内存映射张量。
- 参数:
prefix (str) – 存储内存映射张量的目录前缀。目录树结构将模拟 tensordict 的结构。
copy_existing (bool) – 如果为 False(默认),则如果 tensordict 中的条目已经是存储在磁盘上的张量且关联有文件,但未按照 prefix 保存到正确位置,将引发异常。如果为
True
,任何现有张量将被复制到新位置。
- 关键字参数:
num_threads (int, optional) – 用于写入内存映射张量的线程数。默认为 0。
return_early (bool, optional) – 如果为
True
且num_threads>0
,此方法将返回 tensordict 的 future 对象。share_non_tensor (bool, optional) – 如果为
True
,非张量数据将在进程之间共享,并且在单个节点内任何工作进程上的写入操作(例如原地更新或设置)将更新所有其他工作进程上的值。如果非张量叶节点的数量很高(例如,共享大量非张量数据堆栈),这可能会导致 OOM 或类似错误。默认为False
。existsok (bool, optional) – 如果为
False
,则如果同一路径中已存在张量,将引发异常。默认为True
。
然后 TensorDict 将被锁定,这意味着任何非原地写入操作(例如,重命名、设置或删除条目)都将抛出异常。一旦 tensordict 解锁,内存映射属性将变为
False
,因为不再保证跨进程标识。- 返回:
如果
return_early=False
,则返回一个新的 tensordict,其中张量存储在磁盘上;否则返回一个TensorDictFuture
实例。
注意
以这种方式序列化对于深度嵌套的 tensordicts 可能会很慢,因此不建议在训练循环内部调用此方法。
- memmap_(prefix: str | None = None, copy_existing: bool =False, *, num_threads: int =0, return_early: bool =False, share_non_tensor: bool =False, existsok: bool =True) T ¶
将所有张量原地写入相应的内存映射张量。
- 参数:
prefix (str) – 存储内存映射张量的目录前缀。目录树结构将模拟 tensordict 的结构。
copy_existing (bool) – 如果为 False(默认),则如果 tensordict 中的条目已经是存储在磁盘上的张量且关联有文件,但未按照 prefix 保存到正确位置,将引发异常。如果为
True
,任何现有张量将被复制到新位置。
- 关键字参数:
num_threads (int, optional) – 用于写入内存映射张量的线程数。默认为 0。
return_early (bool, optional) – 如果为
True
且num_threads>0
,此方法将返回 tensordict 的 future 对象。可以使用 future.result() 查询结果 tensordict。share_non_tensor (bool, optional) – 如果为
True
,非张量数据将在进程之间共享,并且在单个节点内任何工作进程上的写入操作(例如原地更新或设置)将更新所有其他工作进程上的值。如果非张量叶节点的数量很高(例如,共享大量非张量数据堆栈),这可能会导致 OOM 或类似错误。默认为False
。existsok (bool, optional) – 如果为
False
,则如果同一路径中已存在张量,将引发异常。默认为True
。
然后 TensorDict 将被锁定,这意味着任何非原地写入操作(例如,重命名、设置或删除条目)都将抛出异常。一旦 tensordict 解锁,内存映射属性将变为
False
,因为不再保证跨进程标识。- 返回:
如果
return_early=False
,则返回自身;否则返回一个TensorDictFuture
实例。
注意
以这种方式序列化对于深度嵌套的 tensordicts 可能会很慢,因此不建议在训练循环内部调用此方法。
- memmap_like(prefix: str | None = None, copy_existing: bool =False, *, existsok: bool =True, num_threads: int =0, return_early: bool =False, share_non_tensor: bool =False) T ¶
创建一个内容为空的内存映射 tensordict,其形状与原始 tensordict 相同。
- 参数:
prefix (str) – 存储内存映射张量的目录前缀。目录树结构将模拟 tensordict 的结构。
copy_existing (bool) – 如果为 False(默认),则如果 tensordict 中的条目已经是存储在磁盘上的张量且关联有文件,但未按照 prefix 保存到正确位置,将引发异常。如果为
True
,任何现有张量将被复制到新位置。
- 关键字参数:
num_threads (int, optional) – 用于写入内存映射张量的线程数。默认为 0。
return_early (bool, optional) – 如果为
True
且num_threads>0
,此方法将返回 tensordict 的 future 对象。share_non_tensor (bool, optional) – 如果为
True
,非张量数据将在进程之间共享,并且在单个节点内任何工作进程上的写入操作(例如原地更新或设置)将更新所有其他工作进程上的值。如果非张量叶节点的数量很高(例如,共享大量非张量数据堆栈),这可能会导致 OOM 或类似错误。默认为False
。existsok (bool, optional) – 如果为
False
,则如果同一路径中已存在张量,将引发异常。默认为True
。
然后 TensorDict 将被锁定,这意味着任何非原地写入操作(例如,重命名、设置或删除条目)都将抛出异常。一旦 tensordict 解锁,内存映射属性将变为
False
,因为不再保证跨进程标识。- 返回:
如果
return_early=False
,则返回一个新的TensorDict
实例,其中数据存储为内存映射张量;否则返回一个TensorDictFuture
实例。
注意
这是将一组大型缓冲区写入磁盘的推荐方法,因为
memmap_()
会复制信息,对于大型内容可能很慢。示例
>>> td = TensorDict({ ... "a": torch.zeros((3, 64, 64), dtype=torch.uint8), ... "b": torch.zeros(1, dtype=torch.int64), ... }, batch_size=[]).expand(1_000_000) # expand does not allocate new memory >>> buffer = td.memmap_like("/path/to/dataset")
- memmap_refresh_()¶
如果内存映射的 tensordict 具有
saved_path
,则刷新其内容。如果未关联路径,此方法将引发异常。
- save(prefix: str | None = None, copy_existing: bool =False, *, num_threads: int =0, return_early: bool =False, share_non_tensor: bool =False) T ¶
将 tensordict 保存到磁盘。
此函数是
memmap()
的代理。
- set(key: NestedKey, value: Any, inplace: bool =False, non_blocking: bool =False)¶
设置新的键值对。
- 参数:
key (str, tuple of str) – 要设置的键名。如果是字符串元组,则相当于链式调用 getattr 后跟最终的 setattr。
value (Any) – 要存储在 tensorclass 中的值
inplace (bool, optional) – 如果为
True
,set 将尝试原地更新值。如果为False
或键不存在,则值将直接写入其目标位置。
- 返回:
自身
- state_dict(destination=None, prefix='', keep_vars=False, flatten=False) dict[str, Any] ¶
返回一个 state_dict 字典,可用于保存和从 tensorclass 加载数据。
- to_tensordict(*, retain_none: bool | None = None) TensorDict ¶
将 tensorclass 转换为常规的 TensorDict。
复制所有条目。Memmap(内存映射)和共享内存张量将被转换为常规张量。
- 参数:
retain_none (bool) –
如果为
True
,None
值将被写入 tensordict 中。否则,它们将被丢弃。默认值:True
。注意
从 v0.8 版本起,默认值将切换为
False
。- 返回:
一个新的 TensorDict 对象,包含与 tensorclass 相同的值。
- unbind(dim: int)¶
返回一个元组,其中包含沿指定维度解除绑定的索引 tensorclass 实例。
结果 tensorclass 实例将共享初始 tensorclass 实例的存储。