ReadingService¶
ReadingService
处理基于不同用例的 DataPipe
图的原地修改。
功能¶
动态分片¶
动态分片通过 MultiProcessingReadingService
和 DistributedReadingService
实现,它们根据相应的多进程和分布式工作者的信息对管道进行分片。此外,TorchData 提供了两种类型的 DataPipe
,让用户可以在管道中定义分片位置。
sharding_filter
(ShardingFilter
): 当管道可复制时,每个分布式/多进程工作者都会从其自身的DataPipe
图副本加载数据,同时跳过不属于相应工作者的样本,跳过位置是sharding_filter
的放置位置。sharding_round_robin_dispatch
(ShardingRoundRobinDispatcher
): 当管道中存在任何sharding_round_robin_dispatch
DataPipe
时,该分支(即所有先于sharding_round_robin_dispatch
的 DataPipes)将被视为不可复制分支(在多进程的上下文中)。将创建单个调度进程来从不可复制分支加载数据并将其分发到后续的工作者进程。
以下是管道中使用两种类型的分片策略的示例。
当进行多进程时,图形变为
Client
在图形中是一个 DataPipe
,它发送请求并接收来自多进程队列的响应。
确定性¶
在 DataLoader2
中,SeedGenerator
成为随机性的单一来源,每个 ReadingService
都可以通过 initialize_iteration()
访问它并为随机 DataPipe
操作生成相应的随机种子。
为了确保数据集分片在多进程和分布式节点上是相互排斥且集体穷举的,MultiProcessingReadingService
和 DistributedReadingService
将帮助 DataLoader2
同步任何随机 DataPipe
操作(先于 sharding_filter
或 sharding_round_robin_dispatch
)的随机状态。对于分片后剩余的 DataPipe
操作,每个 ReadingService
会基于分布式等级和工作者进程 ID 生成唯一的随机状态,以便执行不同的随机转换。
图形模式¶
这也简化了数据预处理管道从研究到生产的过渡。在 DataPipe
图被创建并使用 ReadingServices
验证后,可以将配置并连接到生产服务/基础设施(例如 AIStore
)的不同 ReadingService
作为直接替换提供给 DataLoader2
。ReadingService
可能能够搜索图形并找到可以委托给生产服务/基础设施的 DataPipe
操作,然后相应地修改图形以实现更高效的执行。
扩展 ReadingService¶
以下是用于自定义 ReadingService
的接口。
- class torchdata.dataloader2.ReadingServiceInterface¶
用于
ReadingService
的接口。请基于此接口类扩展自定义ReadingService
。在调用
initialize
之前,ReadingService 必须是可腌制的。这是因为DataLoader2
将创建它的副本以避免同一个 ReadingService 对象被多个DataLoader2
使用,并且它的内部状态将可被它们修改。由于此约束,某些初始化步骤可能需要在
initialize
方法中执行,而不是在 ReadingService 类的__init__
中执行。- finalize() None ¶
ReadingService
清理内部状态并完全关闭服务。在DataLoader2
的shutdown
和__del__
中调用。
- finalize_iteration() None ¶
在完成一个 epoch 后,
ReadingService
结束服务。当DataLoader2
的迭代器耗尽时调用。
- abstract initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe] ¶
ReadingService
接收一个DataPipe
图,根据自定义需求将其改造成一个新的DataPipe
图。在第一次创建DataLoader2
迭代器时被调用一次。在调用此方法之前,ReadingService
对象必须是可拾取的。- 参数:
datapipe – 原始的
DataPipe
图。- 返回:
一个经过改造的或新的
DataPipe
图。
- initialize_iteration(seed_generator: SeedGenerator, iter_reset_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] = None) Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] ¶
ReadingService
为一个 epoch 启动服务。在每次获取DataLoader2
迭代器开始时被调用。- 参数:
seed_generator – 由 DataLoader2 创建和管理的 SeedGenerator 对象。作为随机性的单一来源,它将控制 DataPipes 图中所有随机操作的确定性。
iter_reset_fn – 当
SequentialReadingService
将多个ReadingServices
链接在一起时,来自先前ReadingServcie
的可选重置函数。
- 返回:
一个新的
iter_reset_fn
供后续ReadingService
使用。
示例
MultiProcessingReadingService 开始设置每个进程的 worker 种子并从图中预取项目。
检查点/快照功能正在开发中。以下是初步接口(可能会有微小的变化)。
- class torchdata.dataloader2.CheckpointableReadingServiceInterface¶
扩展
ReadingServiceInterface
,添加两个用于保存/恢复数据处理图状态的额外方法。- abstract checkpoint() bytes ¶
ReadingService
序列化内部状态。在DataLoader2.state_dict
中被调用。
- abstract restore(datapipe: Union[IterDataPipe, MapDataPipe], serialized_state: bytes) Union[IterDataPipe, MapDataPipe] ¶
ReadingService
根据序列化状态调整DataPipe
图。在第一次创建DataLoader2
迭代器时被调用一次。是initialize
的对应方法,后者从头开始调整DataPipe
图。- 参数:
datapipe – 原始的
DataPipe
图,在被ReadingService
调整之前。serialized_state – 用于恢复经过调整的
DataPipe
图状态的内部状态的序列化状态。
- 返回:
从序列化状态生成的经过调整的
DataPipe
。
图函数¶
此外,图实用函数在 torchdata.dataloader.graph
中提供,以帮助用户为自定义 ReadingService
进行 DataPipe
图重写。
遍历 DataPipes 及其属性以提取 DataPipe 图。 |
|
给定由 |
|
给定由 |
|
给定由 |
|
给定由 |