快捷方式

DataLoader2

引入了一个新的轻量级 DataLoader2,用于将过载的数据操作功能从 torch.utils.data.DataLoader 解耦到 DataPipe 操作。此外,某些功能只能通过 DataLoader2 实现,例如快照和切换后端服务以执行高性能操作。

DataLoader2

class torchdata.dataloader2.DataLoader2(datapipe: Optional[Union[IterDataPipe, MapDataPipe]], datapipe_adapter_fn: Optional[Union[Iterable[Adapter], Adapter]] = None, reading_service: Optional[ReadingServiceInterface] = None)

DataLoader2 用于根据 ReadingServiceAdapter 函数优化和执行给定的 DataPipe 图,并支持

  • 多进程和分布式数据加载的动态分片

  • 多个后端 ReadingServices

  • DataPipe 图的原地修改,如 shuffle 控制、内存固定等。

  • 快照数据预处理管道状态 (WIP)

参数:
  • datapipe (IterDataPipeMapDataPipe) - 用于加载数据的 DataPipe。在初始化过程中将创建此 datapipe 的深拷贝,允许在不同的 DataLoader2 中重复使用输入,而无需共享状态。只有在创建 DataLoader 后立即调用 load_state_dict 的情况下,才能输入 None

  • datapipe_adapter_fn (Iterable[Adapter]Adapter,可选) - 将应用于 DataPipe 的 Adapter 函数(默认值:None)。

  • reading_service (ReadingServiceInterface可选) - 定义 DataLoader2 应如何对 DataPipe 执行操作,例如多进程/分布式(默认值:None)。在初始化过程中将创建此服务的深拷贝,允许在不同的 DataLoader2 中重复使用 ReadingService,而无需共享状态。

注意

当将 MapDataPipe 传递到 DataLoader2 时,为了遍历数据,DataLoader2 将尝试通过 iter(datapipe) 创建迭代器。如果对象具有非零索引的索引,则可能会失败。考虑使用 .shuffle()(将 MapDataPipe 转换为 IterDataPipe)或 datapipe.to_iter_datapipe(custom_indices)

__iter__() DataLoader2Iterator[T_co]

从由 ReadingService 调整的 DataPipe 图返回一个单例迭代器。如果提供序列化状态来构造 DataLoader2,则将恢复 DataPipe。并且,initialize_iterationfinalize_iterator 将分别在迭代开始和结束时被调用。

classmethod from_state(state: Dict[str, Any], reading_service: CheckpointableReadingServiceInterface) DataLoader2[T_co]

使用从序列化状态恢复的 DataPipe 图和 ReadingService 创建新的 DataLoader2

load_state_dict(state_dict: Dict[str, Any]) None

对于现有的 DataLoader2,加载序列化状态以恢复 DataPipe 图并重置 ReadingService 的内部状态。

seed(seed: int) None

为 DataLoader2 设置随机种子以控制确定性。

参数:

seed - 随机 uint64 种子

shutdown() None

关闭 ReadingService 并清理迭代器。

state_dict() Dict[str, Any]

返回一个字典,用以下键来表示数据处理管道的状态:

  • serialized_datapipeReadingService 适配之前的序列化 DataPipe

  • reading_service_stateReadingService 和已适配的 DataPipe 的状态。

注意:DataLoader2 不支持 torch.utils.data.Datasettorch.utils.data.IterableDataset。请使用以下相应的 DataPipe 包装它们:

ReadingService

ReadingService 指定数据处理图的执行后端。TorchData 中提供了三种类型的 ReadingServices

DistributedReadingService

DistributedReadingSerivce 处理 DataPipe 图上的分布式分片,并通过在分布式进程之间共享相同的种子来保证随机性。

InProcessReadingService

默认的 ReadingService,用于在主进程中为 “DataPipe” 图提供服务,并将图设置(如确定性控制)应用于图。

MultiProcessingReadingService

生成多个工作进程,以从 DataPipe 图加载数据。

SequentialReadingService

每个 ReadingServices 都会获取 DataPipe 图并对其进行重写,以实现一些功能,如动态分片、共享随机种子以及用于多进程/分布式进程的快照。有关这些功能的更多详细信息,请参阅文档

适配器

适配器 用于在 DataLoader2 中配置、修改和扩展 DataPipe 图。它允许对 PyTorch 域提供的预先组装的 DataPipe 图进行就地修改或替换。例如,可以将 Shuffle(False) 提供给 DataLoader2,这将禁用 DataPipes 图中的任何 shuffle 操作。

class torchdata.dataloader2.adapter.Adapter

遵循 Python 可调用协议的适配器基类。

abstract __call__(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe]

可调用函数,可以对 DataPipe 图进行就地修改,也可以返回新的 DataPipe 图。

参数:

datapipe – 需要适配的 DataPipe

返回:

已适配的 DataPipe 或新的 DataPipe

以下是 TorchData 在 torchdata.dataloader2.adapter 中提供的 Adapter 列表:

Shuffle

Shuffle DataPipes 适配器允许控制图中所有现有的 Shuffler (shuffle) DataPipes。

CacheTimeout

CacheTimeout DataPipes 适配器允许控制图中所有现有的 EndOnDiskCacheHolder (end_caching) 的超时。

此外,我们还将提供更多 Adapters 来涵盖数据处理选项:

  • PinMemory:在数据处理图的末尾附加一个 DataPipe,将输出数据转换为固定内存中的 torch.Tensor

  • FullSync:附加一个 DataPipe,以确保数据处理图在分布式进程之间同步,防止挂起。

  • ShardingPolicy:如果 DataPipe 图中存在 sharding_filter,则修改分片策略。

  • PrefetchPolicyInvalidateCache 等。

如果您对希望提供的 Adapters 有功能请求,请在 GitHub 上提交问题。对于特定需求,DataLoader2 也接受任何自定义 Adapter,只要它继承自 Adapter 类即可。

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取针对初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源