ReadingService¶
ReadingService 根据不同的用例处理 DataPipe 图的原地修改。
特性¶
动态分片¶
动态分片是通过 MultiProcessingReadingService 和 DistributedReadingService 来实现的,它们根据相应的 multiprocessing 和 distributed worker 的信息对 pipeline 进行分片。并且,TorchData 提供了两种类型的 DataPipe,允许用户在 pipeline 中定义分片位置。
- sharding_filter(- ShardingFilter): 当 pipeline 是可复制的时,每个 distributed/multiprocessing worker 从其自己的- DataPipe图副本加载数据,同时在放置- sharding_filter的位置跳过不属于相应 worker 的样本。
- sharding_round_robin_dispatch(- ShardingRoundRobinDispatcher): 当 pipeline 中存在任何- sharding_round_robin_dispatch- DataPipe时,该分支(即- sharding_round_robin_dispatch之前的 所有 DataPipe)将被视为不可复制的分支(在 multiprocessing 的上下文中)。将创建一个单独的调度进程从不可复制的分支加载数据,并将数据分发给后续的 worker 进程。
以下是在 pipeline 中具有两种分片策略的示例。
![digraph Example {
    subgraph cluster_replicable {
        label="Replicable"
        a -> b -> c -> d -> l;
        color=blue;
    }
    subgraph cluster_non_replicable {
        style=filled;
        color=lightgrey;
        node [style=filled,color=white];
        label="Non-Replicable"
        e -> f -> g -> k;
        h -> i -> j -> k;
    }
    k -> l -> fullsync -> end;
    a [label="DP1"];
    b [label="shuffle"];
    c [label="sharding_filter", color=blue];
    d [label="DP4"];
    e [label="DP2"];
    f [label="shuffle"];
    g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
    h [label="DP3"];
    i [label="shuffle"];
    j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
    k [label="DP5 (Lowest common ancestor)"];
    l [label="DP6"];
    fullsync;
    end [shape=box];
}](_images/graphviz-a4b77ae8d32185927d8707cd0b25fc9226103ca2.png)
当 multiprocessing 发生时,图变为
![digraph Example {
    subgraph cluster_worker_0 {
        label="Worker 0"
        a0 -> b0 -> c0 -> d0 -> l0;
        m0 -> l0;
        color=blue;
    }
    subgraph cluster_worker_1 {
        label="Worker 1"
        a1 -> b1 -> c1 -> d1 -> l1;
        m1 -> l1;
        color=blue;
    }
    subgraph cluster_non_replicable {
        style=filled;
        color=lightgrey;
        node [style=filled,color=white];
        label="Non-Replicable"
        e -> f -> g -> k;
        h -> i -> j -> k;
        k -> round_robin_demux;
    }
    round_robin_demux -> m0;
    round_robin_demux -> m1;
    l0 -> n;
    l1 -> n;
    n -> fullsync -> end;
    a0 [label="DP1"];
    b0 [label="shuffle"];
    c0 [label="sharding_filter", color=blue];
    d0 [label="DP4"];
    a1 [label="DP1"];
    b1 [label="shuffle"];
    c1 [label="sharding_filter", color=blue];
    d1 [label="DP4"];
    e [label="DP2"];
    f [label="shuffle"];
    g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
    h [label="DP3"];
    i [label="shuffle"];
    j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
    k [label="DP5 (Lowest common ancestor)"];
    fullsync;
    l0 [label="DP6"];
    l1 [label="DP6"];
    m0 [label="Client"]
    m1 [label="Client"]
    n [label="Client"]
    end [shape=box];
}](_images/graphviz-8581116405d37f067d4dfa1c6bb711728c59d81e.png)
图中的 Client 是一个 DataPipe,它发送请求并从 multiprocessing 队列接收响应。
确定性¶
在 DataLoader2 中,SeedGenerator 成为随机性的单一来源,每个 ReadingService 将通过 initialize_iteration() 访问它,并为随机 DataPipe 操作生成相应的随机种子。
为了确保数据集分片在 multiprocessing 进程和分布式节点上是互斥且共同穷尽的,MultiProcessingReadingService 和 DistributedReadingService 将帮助 DataLoader2 同步在 sharding_filter 或 sharding_round_robin_dispatch 之前的任何随机 DataPipe 操作的随机状态。对于分片后的其余 DataPipe 操作,每个 ReadingService 会根据分布式等级和 worker 进程 ID 生成唯一的随机状态,以便执行不同的随机变换。
图模式¶
这也使得数据预处理 pipeline 从研究到生产的过渡更容易。在使用 ReadingServices 创建和验证 DataPipe 图后,可以向 DataLoader2 提供一个不同的 ReadingService,该服务配置并连接到生产服务/基础架构(如 AIStore)作为直接替换。 ReadingService 可以潜在地搜索图,并找到可以委托给生产服务/基础架构的 DataPipe 操作,然后相应地修改图以实现更高性能的执行。
扩展 ReadingService¶
以下是自定义 ReadingService 的接口。
- class torchdata.dataloader2.ReadingServiceInterface¶
- ReadingService的接口。请根据此接口类扩展自定义- ReadingService。- ReadingService 在调用 - initialize之前必须是可 pickle 的。这是因为- DataLoader2会创建一个它的副本,以避免多个- DataLoader2使用同一个 ReadingService 对象,并且它的内部状态会被它们中的每一个修改。- 由于此限制,某些初始化步骤可能需要在 - initialize方法中进行,而不是在 ReadingService 类的- __init__中进行。- finalize() None¶
- ReadingService清理内部状态并完全关闭服务。在- DataLoader2的- shutdown和- __del__中调用。
 - finalize_iteration() None¶
- 在一个 epoch 结束后, - ReadingService结束服务。在- DataLoader2的迭代器耗尽时调用。
 - abstract initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe]¶
- ReadingService接收一个- DataPipe图,并根据自定义需要将其修改为一个新的- DataPipe图。在第一次创建- DataLoader2迭代器时调用一次。在调用此方法之前,- ReadingService对象必须是可 pickle 的。- 参数:
- datapipe – 原始的 - DataPipe图。
- 返回值:
- 修改后的或新的 - DataPipe图。
 
 - initialize_iteration(seed_generator: SeedGenerator, iter_reset_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] = None) Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]]¶
- ReadingService会为每个 epoch 启动服务。在每次获取- DataLoader2迭代器时调用。- 参数:
- seed_generator – 由 DataLoader2 创建和管理的 SeedGenerator 对象。作为随机性的唯一来源,它将控制 DataPipes 图中所有随机操作的确定性。 
- iter_reset_fn – 当 - SequentialReadingService将多个- ReadingServices链式连接时,来自先前- ReadingServcie的可选重置函数
 
- 返回值:
- 供后续 - ReadingService使用的新- iter_reset_fn
 - 示例 - MultiProcessingReadingService 开始为每个进程设置工作进程种子,并从图中预取项目。 
 
检查点/快照功能正在开发中。以下是初步接口(可能会有一些小改动)
- class torchdata.dataloader2.CheckpointableReadingServiceInterface¶
- 使用两个附加方法扩展 - ReadingServiceInterface,以保存/恢复数据处理图的状态。- abstract checkpoint() bytes¶
- ReadingService序列化内部状态。在- DataLoader2.state_dict中调用。
 - abstract restore(datapipe: Union[IterDataPipe, MapDataPipe], serialized_state: bytes) Union[IterDataPipe, MapDataPipe]¶
- ReadingService根据序列化状态调整- DataPipe图。在第一次创建- DataLoader2迭代器时调用一次。与- initialize相对应,后者从头开始调整- DataPipe图。- 参数:
- datapipe – 由 - ReadingService调整前的原始- DataPipe图
- serialized_state – 用于恢复调整后的 - DataPipe图状态的内部状态的序列化状态。
 
- 返回值:
- 从序列化状态生成的调整后的 - DataPipe。
 
 
图函数¶
此外,torchdata.dataloader.graph 中提供了图实用函数,以帮助用户为自定义 ReadingService 进行 DataPipe 图重写。
| 遍历 DataPipes 及其属性以提取 DataPipe 图。 | |
| 给定由  | |
| 给定由  | |
| 给定由  | |
| 给定由  |