DataLoader2¶
引入了一个新的轻量级 DataLoader2
,以将重载的数据操作功能从 torch.utils.data.DataLoader
解耦到 DataPipe
操作。此外,某些功能只能通过 DataLoader2
实现,例如快照和切换后端服务以执行高性能操作。
DataLoader2¶
- class torchdata.dataloader2.DataLoader2(datapipe: Optional[Union[IterDataPipe, MapDataPipe]], datapipe_adapter_fn: Optional[Union[Iterable[Adapter], Adapter]] = None, reading_service: Optional[ReadingServiceInterface] = None)¶
DataLoader2
用于基于ReadingService
和Adapter
函数优化和执行给定的DataPipe
图,并支持以下功能:用于多进程和分布式数据加载的动态分片
多个后端
ReadingServices
对
DataPipe
图进行就地修改,例如 shuffle 控制、内存固定等。对数据预处理管道的状态进行快照(WIP)
- 参数:
datapipe (
IterDataPipe
或MapDataPipe
) – 用于加载数据的DataPipe
。在初始化期间将对此 datapipe 进行深拷贝,允许在不同的DataLoader2
中重新使用输入而不会共享状态。仅当在创建 DataLoader 后立即调用load_state_dict
时,才能使用输入None
。datapipe_adapter_fn (
Iterable[Adapter]
或Adapter
,可选) – 将应用于 DataPipe 的Adapter
函数(默认值:None
)。reading_service (ReadingServiceInterface, 可选) – 定义
DataLoader2
如何在DataPipe
上执行操作,例如多处理/分布式(默认值:None
)。在初始化期间将创建此服务的深拷贝,允许在不同的DataLoader2
中重新使用 ReadingService 而不会共享状态。
注意
当将
MapDataPipe
传递到DataLoader2
时,为了迭代数据,DataLoader2
将尝试通过iter(datapipe)
创建迭代器。如果对象具有非零索引的索引,则这可能会失败。请考虑使用.shuffle()
(它将MapDataPipe
转换为IterDataPipe
)或datapipe.to_iter_datapipe(custom_indices)
。- __iter__() DataLoader2Iterator[T_co] ¶
从由
ReadingService
适配的DataPipe
图中返回一个单例迭代器。如果提供序列化状态来构造DataLoader2
,则将恢复DataPipe
。并且,将在迭代开始和结束时分别调用initialize_iteration
和finalize_iterator
。
- classmethod from_state(state: Dict[str, Any], reading_service: CheckpointableReadingServiceInterface) DataLoader2[T_co] ¶
使用序列化状态恢复的
DataPipe
图和ReadingService
创建新的DataLoader2
。
- load_state_dict(state_dict: Dict[str, Any]) None ¶
对于现有的
DataLoader2
,加载序列化状态以恢复DataPipe
图并重置ReadingService
的内部状态。
- seed(seed: int) None ¶
设置 DataLoader2 的随机种子以控制确定性。
- 参数:
seed – 随机 uint64 种子
- shutdown() None ¶
关闭
ReadingService
并清理迭代器。
- state_dict() Dict[str, Any] ¶
返回一个字典来表示数据处理管道的状态,其键为
serialized_datapipe
:ReadingService
适配之前序列化的DataPipe
。reading_service_state
:ReadingService
和适配后的DataPipe
的状态。
注意:DataLoader2
不支持 torch.utils.data.Dataset
或 torch.utils.data.IterableDataset
。请将它们中的每一个包装在下面对应的 DataPipe
中
torchdata.datapipes.map.SequenceWrapper
:torch.utils.data.Dataset
torchdata.datapipes.iter.IterableWrapper
:torch.utils.data.IterableDataset
ReadingService¶
ReadingService
指定数据处理图的执行后端。TorchData 提供了三种类型的 ReadingServices
|
|
默认的 ReadingService,用于在主进程中服务 |
|
生成多个工作进程以从 |
|
每个 ReadingServices
将获取 DataPipe
图并对其进行重写,以实现一些功能,例如动态分片、共享随机种子和多/分布式进程的快照。有关这些功能的更多详细信息,请参阅文档。
Adapter¶
Adapter
用于配置、修改和扩展 DataLoader2
中的 DataPipe
图。它允许就地修改或替换 PyTorch 领域提供的预组装 DataPipe
图。例如,可以向 DataLoader2
提供 Shuffle(False)
,这将禁用 DataPipes
图中任何 shuffle
操作。
- class torchdata.dataloader2.adapter.Adapter¶
遵循 Python 可调用协议的 Adapter 基类。
- abstract __call__(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe] ¶
可调用函数,要么对
DataPipe
图进行就地修改,要么返回一个新的DataPipe
图。- 参数:
datapipe – 需要适配的
DataPipe
。- 返回值:
已适配的
DataPipe
或新的DataPipe
。
以下是 TorchData 在 torchdata.dataloader2.adapter
中提供的 Adapter
列表
Shuffle DataPipes 适配器允许控制图中所有现有的 Shuffler( |
|
CacheTimeout DataPipes 适配器允许控制图中所有现有的 EndOnDiskCacheHolder( |
并且,我们将提供更多 Adapters
来涵盖数据处理选项
PinMemory
:在数据处理图的末尾附加一个DataPipe
,该DataPipe
将输出数据转换为固定内存中的torch.Tensor
。FullSync
:附加一个DataPipe
以确保数据处理图在分布式进程之间同步,以防止挂起。ShardingPolicy
:如果DataPipe
图中存在sharding_filter
,则修改分片策略。PrefetchPolicy
、InvalidateCache
等。
如果您对想要提供的 Adapters
有功能请求,请打开一个 GitHub 问题。对于特定需求,只要 DataLoader2
继承自 Adapter
类,它也接受任何自定义 Adapter
。