DataLoader2¶
引入了一个新的轻量级 DataLoader2
,用于将过载的数据操作功能从 torch.utils.data.DataLoader
解耦到 DataPipe
操作。此外,某些功能只能通过 DataLoader2
实现,例如快照和切换后端服务以执行高性能操作。
DataLoader2¶
- class torchdata.dataloader2.DataLoader2(datapipe: Optional[Union[IterDataPipe, MapDataPipe]], datapipe_adapter_fn: Optional[Union[Iterable[Adapter], Adapter]] = None, reading_service: Optional[ReadingServiceInterface] = None)¶
DataLoader2
用于根据ReadingService
和Adapter
函数优化和执行给定的DataPipe
图,并支持多进程和分布式数据加载的动态分片
多个后端
ReadingServices
DataPipe
图的原地修改,如 shuffle 控制、内存固定等。快照数据预处理管道状态 (WIP)
- 参数:
datapipe (
IterDataPipe
或MapDataPipe
) - 用于加载数据的DataPipe
。在初始化过程中将创建此 datapipe 的深拷贝,允许在不同的DataLoader2
中重复使用输入,而无需共享状态。只有在创建 DataLoader 后立即调用load_state_dict
的情况下,才能输入None
。datapipe_adapter_fn (
Iterable[Adapter]
或Adapter
,可选) - 将应用于 DataPipe 的Adapter
函数(默认值:None
)。reading_service (ReadingServiceInterface,可选) - 定义
DataLoader2
应如何对DataPipe
执行操作,例如多进程/分布式(默认值:None
)。在初始化过程中将创建此服务的深拷贝,允许在不同的DataLoader2
中重复使用 ReadingService,而无需共享状态。
注意
当将
MapDataPipe
传递到DataLoader2
时,为了遍历数据,DataLoader2
将尝试通过iter(datapipe)
创建迭代器。如果对象具有非零索引的索引,则可能会失败。考虑使用.shuffle()
(将MapDataPipe
转换为IterDataPipe
)或datapipe.to_iter_datapipe(custom_indices)
。- __iter__() DataLoader2Iterator[T_co] ¶
从由
ReadingService
调整的DataPipe
图返回一个单例迭代器。如果提供序列化状态来构造DataLoader2
,则将恢复DataPipe
。并且,initialize_iteration
和finalize_iterator
将分别在迭代开始和结束时被调用。
- classmethod from_state(state: Dict[str, Any], reading_service: CheckpointableReadingServiceInterface) DataLoader2[T_co] ¶
使用从序列化状态恢复的
DataPipe
图和ReadingService
创建新的DataLoader2
。
- load_state_dict(state_dict: Dict[str, Any]) None ¶
对于现有的
DataLoader2
,加载序列化状态以恢复DataPipe
图并重置ReadingService
的内部状态。
- seed(seed: int) None ¶
为 DataLoader2 设置随机种子以控制确定性。
- 参数:
seed - 随机 uint64 种子
- shutdown() None ¶
关闭
ReadingService
并清理迭代器。
- state_dict() Dict[str, Any] ¶
返回一个字典,用以下键来表示数据处理管道的状态:
serialized_datapipe
:ReadingService
适配之前的序列化DataPipe
。reading_service_state
:ReadingService
和已适配的DataPipe
的状态。
注意:DataLoader2
不支持 torch.utils.data.Dataset
或 torch.utils.data.IterableDataset
。请使用以下相应的 DataPipe
包装它们:
torchdata.datapipes.map.SequenceWrapper
:torch.utils.data.Dataset
torchdata.datapipes.iter.IterableWrapper
:torch.utils.data.IterableDataset
ReadingService¶
ReadingService
指定数据处理图的执行后端。TorchData 中提供了三种类型的 ReadingServices
:
|
|
默认的 ReadingService,用于在主进程中为 “DataPipe” 图提供服务,并将图设置(如确定性控制)应用于图。 |
|
生成多个工作进程,以从 |
|
每个 ReadingServices
都会获取 DataPipe
图并对其进行重写,以实现一些功能,如动态分片、共享随机种子以及用于多进程/分布式进程的快照。有关这些功能的更多详细信息,请参阅文档。
适配器¶
适配器
用于在 DataLoader2
中配置、修改和扩展 DataPipe
图。它允许对 PyTorch 域提供的预先组装的 DataPipe
图进行就地修改或替换。例如,可以将 Shuffle(False)
提供给 DataLoader2
,这将禁用 DataPipes
图中的任何 shuffle
操作。
- class torchdata.dataloader2.adapter.Adapter¶
遵循 Python 可调用协议的适配器基类。
- abstract __call__(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe] ¶
可调用函数,可以对
DataPipe
图进行就地修改,也可以返回新的DataPipe
图。- 参数:
datapipe – 需要适配的
DataPipe
。- 返回:
已适配的
DataPipe
或新的DataPipe
。
以下是 TorchData 在 torchdata.dataloader2.adapter
中提供的 Adapter
列表:
Shuffle DataPipes 适配器允许控制图中所有现有的 Shuffler ( |
|
CacheTimeout DataPipes 适配器允许控制图中所有现有的 EndOnDiskCacheHolder ( |
此外,我们还将提供更多 Adapters
来涵盖数据处理选项:
PinMemory
:在数据处理图的末尾附加一个DataPipe
,将输出数据转换为固定内存中的torch.Tensor
。FullSync
:附加一个DataPipe
,以确保数据处理图在分布式进程之间同步,防止挂起。ShardingPolicy
:如果DataPipe
图中存在sharding_filter
,则修改分片策略。PrefetchPolicy
、InvalidateCache
等。
如果您对希望提供的 Adapters
有功能请求,请在 GitHub 上提交问题。对于特定需求,DataLoader2
也接受任何自定义 Adapter
,只要它继承自 Adapter
类即可。