ReadingService¶
ReadingService
根据不同的用例处理 DataPipe
图的原地修改。
特性¶
动态分片¶
动态分片是通过 MultiProcessingReadingService
和 DistributedReadingService
来实现的,它们根据相应的 multiprocessing 和 distributed worker 的信息对 pipeline 进行分片。并且,TorchData 提供了两种类型的 DataPipe
,允许用户在 pipeline 中定义分片位置。
sharding_filter
(ShardingFilter
): 当 pipeline 是可复制的时,每个 distributed/multiprocessing worker 从其自己的DataPipe
图副本加载数据,同时在放置sharding_filter
的位置跳过不属于相应 worker 的样本。sharding_round_robin_dispatch
(ShardingRoundRobinDispatcher
): 当 pipeline 中存在任何sharding_round_robin_dispatch
DataPipe
时,该分支(即sharding_round_robin_dispatch
之前的 所有 DataPipe)将被视为不可复制的分支(在 multiprocessing 的上下文中)。将创建一个单独的调度进程从不可复制的分支加载数据,并将数据分发给后续的 worker 进程。
以下是在 pipeline 中具有两种分片策略的示例。
![digraph Example {
subgraph cluster_replicable {
label="Replicable"
a -> b -> c -> d -> l;
color=blue;
}
subgraph cluster_non_replicable {
style=filled;
color=lightgrey;
node [style=filled,color=white];
label="Non-Replicable"
e -> f -> g -> k;
h -> i -> j -> k;
}
k -> l -> fullsync -> end;
a [label="DP1"];
b [label="shuffle"];
c [label="sharding_filter", color=blue];
d [label="DP4"];
e [label="DP2"];
f [label="shuffle"];
g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
h [label="DP3"];
i [label="shuffle"];
j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
k [label="DP5 (Lowest common ancestor)"];
l [label="DP6"];
fullsync;
end [shape=box];
}](_images/graphviz-a4b77ae8d32185927d8707cd0b25fc9226103ca2.png)
当 multiprocessing 发生时,图变为
![digraph Example {
subgraph cluster_worker_0 {
label="Worker 0"
a0 -> b0 -> c0 -> d0 -> l0;
m0 -> l0;
color=blue;
}
subgraph cluster_worker_1 {
label="Worker 1"
a1 -> b1 -> c1 -> d1 -> l1;
m1 -> l1;
color=blue;
}
subgraph cluster_non_replicable {
style=filled;
color=lightgrey;
node [style=filled,color=white];
label="Non-Replicable"
e -> f -> g -> k;
h -> i -> j -> k;
k -> round_robin_demux;
}
round_robin_demux -> m0;
round_robin_demux -> m1;
l0 -> n;
l1 -> n;
n -> fullsync -> end;
a0 [label="DP1"];
b0 [label="shuffle"];
c0 [label="sharding_filter", color=blue];
d0 [label="DP4"];
a1 [label="DP1"];
b1 [label="shuffle"];
c1 [label="sharding_filter", color=blue];
d1 [label="DP4"];
e [label="DP2"];
f [label="shuffle"];
g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
h [label="DP3"];
i [label="shuffle"];
j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
k [label="DP5 (Lowest common ancestor)"];
fullsync;
l0 [label="DP6"];
l1 [label="DP6"];
m0 [label="Client"]
m1 [label="Client"]
n [label="Client"]
end [shape=box];
}](_images/graphviz-8581116405d37f067d4dfa1c6bb711728c59d81e.png)
图中的 Client
是一个 DataPipe
,它发送请求并从 multiprocessing 队列接收响应。
确定性¶
在 DataLoader2
中,SeedGenerator
成为随机性的单一来源,每个 ReadingService
将通过 initialize_iteration()
访问它,并为随机 DataPipe
操作生成相应的随机种子。
为了确保数据集分片在 multiprocessing 进程和分布式节点上是互斥且共同穷尽的,MultiProcessingReadingService
和 DistributedReadingService
将帮助 DataLoader2
同步在 sharding_filter
或 sharding_round_robin_dispatch
之前的任何随机 DataPipe
操作的随机状态。对于分片后的其余 DataPipe
操作,每个 ReadingService
会根据分布式等级和 worker 进程 ID 生成唯一的随机状态,以便执行不同的随机变换。
图模式¶
这也使得数据预处理 pipeline 从研究到生产的过渡更容易。在使用 ReadingServices
创建和验证 DataPipe
图后,可以向 DataLoader2
提供一个不同的 ReadingService
,该服务配置并连接到生产服务/基础架构(如 AIStore
)作为直接替换。 ReadingService
可以潜在地搜索图,并找到可以委托给生产服务/基础架构的 DataPipe
操作,然后相应地修改图以实现更高性能的执行。
扩展 ReadingService¶
以下是自定义 ReadingService
的接口。
- class torchdata.dataloader2.ReadingServiceInterface¶
ReadingService
的接口。请根据此接口类扩展自定义ReadingService
。ReadingService 在调用
initialize
之前必须是可 pickle 的。这是因为DataLoader2
会创建一个它的副本,以避免多个DataLoader2
使用同一个 ReadingService 对象,并且它的内部状态会被它们中的每一个修改。由于此限制,某些初始化步骤可能需要在
initialize
方法中进行,而不是在 ReadingService 类的__init__
中进行。- finalize() None ¶
ReadingService
清理内部状态并完全关闭服务。在DataLoader2
的shutdown
和__del__
中调用。
- finalize_iteration() None ¶
在一个 epoch 结束后,
ReadingService
结束服务。在DataLoader2
的迭代器耗尽时调用。
- abstract initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe] ¶
ReadingService
接收一个DataPipe
图,并根据自定义需要将其修改为一个新的DataPipe
图。在第一次创建DataLoader2
迭代器时调用一次。在调用此方法之前,ReadingService
对象必须是可 pickle 的。- 参数:
datapipe – 原始的
DataPipe
图。- 返回值:
修改后的或新的
DataPipe
图。
- initialize_iteration(seed_generator: SeedGenerator, iter_reset_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] = None) Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] ¶
ReadingService
会为每个 epoch 启动服务。在每次获取DataLoader2
迭代器时调用。- 参数:
seed_generator – 由 DataLoader2 创建和管理的 SeedGenerator 对象。作为随机性的唯一来源,它将控制 DataPipes 图中所有随机操作的确定性。
iter_reset_fn – 当
SequentialReadingService
将多个ReadingServices
链式连接时,来自先前ReadingServcie
的可选重置函数
- 返回值:
供后续
ReadingService
使用的新iter_reset_fn
示例
MultiProcessingReadingService 开始为每个进程设置工作进程种子,并从图中预取项目。
检查点/快照功能正在开发中。以下是初步接口(可能会有一些小改动)
- class torchdata.dataloader2.CheckpointableReadingServiceInterface¶
使用两个附加方法扩展
ReadingServiceInterface
,以保存/恢复数据处理图的状态。- abstract checkpoint() bytes ¶
ReadingService
序列化内部状态。在DataLoader2.state_dict
中调用。
- abstract restore(datapipe: Union[IterDataPipe, MapDataPipe], serialized_state: bytes) Union[IterDataPipe, MapDataPipe] ¶
ReadingService
根据序列化状态调整DataPipe
图。在第一次创建DataLoader2
迭代器时调用一次。与initialize
相对应,后者从头开始调整DataPipe
图。- 参数:
datapipe – 由
ReadingService
调整前的原始DataPipe
图serialized_state – 用于恢复调整后的
DataPipe
图状态的内部状态的序列化状态。
- 返回值:
从序列化状态生成的调整后的
DataPipe
。
图函数¶
此外,torchdata.dataloader.graph
中提供了图实用函数,以帮助用户为自定义 ReadingService
进行 DataPipe
图重写。
遍历 DataPipes 及其属性以提取 DataPipe 图。 |
|
给定由 |
|
给定由 |
|
给定由 |
|
给定由 |