DataLoader2¶
引入了一个新的轻量级 DataLoader2,用于将过载的数据操作功能从 torch.utils.data.DataLoader 解耦到 DataPipe 操作。此外,某些功能只能通过 DataLoader2 实现,例如快照和切换后端服务以执行高性能操作。
DataLoader2¶
- class torchdata.dataloader2.DataLoader2(datapipe: Optional[Union[IterDataPipe, MapDataPipe]], datapipe_adapter_fn: Optional[Union[Iterable[Adapter], Adapter]] = None, reading_service: Optional[ReadingServiceInterface] = None)¶
- DataLoader2用于根据- ReadingService和- Adapter函数优化和执行给定的- DataPipe图,并支持- 多进程和分布式数据加载的动态分片 
- 多个后端 - ReadingServices
- DataPipe图的原地修改,如 shuffle 控制、内存固定等。
- 快照数据预处理管道状态 (WIP) 
 - 参数:
- datapipe ( - IterDataPipe或- MapDataPipe) - 用于加载数据的- DataPipe。在初始化过程中将创建此 datapipe 的深拷贝,允许在不同的- DataLoader2中重复使用输入,而无需共享状态。只有在创建 DataLoader 后立即调用- load_state_dict的情况下,才能输入- None。
- datapipe_adapter_fn ( - Iterable[Adapter]或- Adapter,可选) - 将应用于 DataPipe 的- Adapter函数(默认值:- None)。
- reading_service (ReadingServiceInterface,可选) - 定义 - DataLoader2应如何对- DataPipe执行操作,例如多进程/分布式(默认值:- None)。在初始化过程中将创建此服务的深拷贝,允许在不同的- DataLoader2中重复使用 ReadingService,而无需共享状态。
 
 - 注意 - 当将 - MapDataPipe传递到- DataLoader2时,为了遍历数据,- DataLoader2将尝试通过- iter(datapipe)创建迭代器。如果对象具有非零索引的索引,则可能会失败。考虑使用- .shuffle()(将- MapDataPipe转换为- IterDataPipe)或- datapipe.to_iter_datapipe(custom_indices)。- __iter__() DataLoader2Iterator[T_co]¶
- 从由 - ReadingService调整的- DataPipe图返回一个单例迭代器。如果提供序列化状态来构造- DataLoader2,则将恢复- DataPipe。并且,- initialize_iteration和- finalize_iterator将分别在迭代开始和结束时被调用。
 - classmethod from_state(state: Dict[str, Any], reading_service: CheckpointableReadingServiceInterface) DataLoader2[T_co]¶
- 使用从序列化状态恢复的 - DataPipe图和- ReadingService创建新的- DataLoader2。
 - load_state_dict(state_dict: Dict[str, Any]) None¶
- 对于现有的 - DataLoader2,加载序列化状态以恢复- DataPipe图并重置- ReadingService的内部状态。
 - seed(seed: int) None¶
- 为 DataLoader2 设置随机种子以控制确定性。 - 参数:
- seed - 随机 uint64 种子 
 
 - shutdown() None¶
- 关闭 - ReadingService并清理迭代器。
 - state_dict() Dict[str, Any]¶
- 返回一个字典,用以下键来表示数据处理管道的状态: - serialized_datapipe:- ReadingService适配之前的序列化- DataPipe。
- reading_service_state:- ReadingService和已适配的- DataPipe的状态。
 
 
注意:DataLoader2 不支持 torch.utils.data.Dataset 或 torch.utils.data.IterableDataset。请使用以下相应的 DataPipe 包装它们:
- torchdata.datapipes.map.SequenceWrapper:- torch.utils.data.Dataset
- torchdata.datapipes.iter.IterableWrapper:- torch.utils.data.IterableDataset
ReadingService¶
ReadingService 指定数据处理图的执行后端。TorchData 中提供了三种类型的 ReadingServices:
| 
 | |
| 默认的 ReadingService,用于在主进程中为 “DataPipe” 图提供服务,并将图设置(如确定性控制)应用于图。 | |
| 生成多个工作进程,以从  | |
每个 ReadingServices 都会获取 DataPipe 图并对其进行重写,以实现一些功能,如动态分片、共享随机种子以及用于多进程/分布式进程的快照。有关这些功能的更多详细信息,请参阅文档。
适配器¶
适配器 用于在 DataLoader2 中配置、修改和扩展 DataPipe 图。它允许对 PyTorch 域提供的预先组装的 DataPipe 图进行就地修改或替换。例如,可以将 Shuffle(False) 提供给 DataLoader2,这将禁用 DataPipes 图中的任何 shuffle 操作。
- class torchdata.dataloader2.adapter.Adapter¶
- 遵循 Python 可调用协议的适配器基类。 - abstract __call__(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe]¶
- 可调用函数,可以对 - DataPipe图进行就地修改,也可以返回新的- DataPipe图。- 参数:
- datapipe – 需要适配的 - DataPipe。
- 返回:
- 已适配的 - DataPipe或新的- DataPipe。
 
 
以下是 TorchData 在 torchdata.dataloader2.adapter 中提供的 Adapter 列表:
| Shuffle DataPipes 适配器允许控制图中所有现有的 Shuffler ( | |
| CacheTimeout DataPipes 适配器允许控制图中所有现有的 EndOnDiskCacheHolder ( | 
此外,我们还将提供更多 Adapters 来涵盖数据处理选项:
- PinMemory:在数据处理图的末尾附加一个- DataPipe,将输出数据转换为固定内存中的- torch.Tensor。
- FullSync:附加一个- DataPipe,以确保数据处理图在分布式进程之间同步,防止挂起。
- ShardingPolicy:如果- DataPipe图中存在- sharding_filter,则修改分片策略。
- PrefetchPolicy、- InvalidateCache等。
如果您对希望提供的 Adapters 有功能请求,请在 GitHub 上提交问题。对于特定需求,DataLoader2 也接受任何自定义 Adapter,只要它继承自 Adapter 类即可。