快捷方式

ReadingService

ReadingService 根据不同的用例处理 DataPipe 图的原地修改。

特性

动态分片

动态分片是通过 MultiProcessingReadingServiceDistributedReadingService 来实现的,它们根据相应的 multiprocessing 和 distributed worker 的信息对 pipeline 进行分片。并且,TorchData 提供了两种类型的 DataPipe,允许用户在 pipeline 中定义分片位置。

  • sharding_filter (ShardingFilter): 当 pipeline 是可复制的时,每个 distributed/multiprocessing worker 从其自己的 DataPipe 图副本加载数据,同时在放置 sharding_filter 的位置跳过不属于相应 worker 的样本。

  • sharding_round_robin_dispatch (ShardingRoundRobinDispatcher): 当 pipeline 中存在任何 sharding_round_robin_dispatch DataPipe 时,该分支(即 sharding_round_robin_dispatch 之前的 所有 DataPipe)将被视为不可复制的分支(在 multiprocessing 的上下文中)。将创建一个单独的调度进程从不可复制的分支加载数据,并将数据分发给后续的 worker 进程。

以下是在 pipeline 中具有两种分片策略的示例。

digraph Example {
    subgraph cluster_replicable {
        label="Replicable"
        a -> b -> c -> d -> l;
        color=blue;
    }

    subgraph cluster_non_replicable {
        style=filled;
        color=lightgrey;
        node [style=filled,color=white];
        label="Non-Replicable"
        e -> f -> g -> k;
        h -> i -> j -> k;
    }

    k -> l -> fullsync -> end;

    a [label="DP1"];
    b [label="shuffle"];
    c [label="sharding_filter", color=blue];
    d [label="DP4"];
    e [label="DP2"];
    f [label="shuffle"];
    g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
    h [label="DP3"];
    i [label="shuffle"];
    j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
    k [label="DP5 (Lowest common ancestor)"];
    l [label="DP6"];
    fullsync;
    end [shape=box];
}

当 multiprocessing 发生时,图变为

digraph Example {
    subgraph cluster_worker_0 {
        label="Worker 0"
        a0 -> b0 -> c0 -> d0 -> l0;
        m0 -> l0;
        color=blue;
    }

    subgraph cluster_worker_1 {
        label="Worker 1"
        a1 -> b1 -> c1 -> d1 -> l1;
        m1 -> l1;
        color=blue;
    }

    subgraph cluster_non_replicable {
        style=filled;
        color=lightgrey;
        node [style=filled,color=white];
        label="Non-Replicable"
        e -> f -> g -> k;
        h -> i -> j -> k;
        k -> round_robin_demux;
    }

    round_robin_demux -> m0;
    round_robin_demux -> m1;
    l0 -> n;
    l1 -> n;
    n -> fullsync -> end;

    a0 [label="DP1"];
    b0 [label="shuffle"];
    c0 [label="sharding_filter", color=blue];
    d0 [label="DP4"];
    a1 [label="DP1"];
    b1 [label="shuffle"];
    c1 [label="sharding_filter", color=blue];
    d1 [label="DP4"];
    e [label="DP2"];
    f [label="shuffle"];
    g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
    h [label="DP3"];
    i [label="shuffle"];
    j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
    k [label="DP5 (Lowest common ancestor)"];
    fullsync;
    l0 [label="DP6"];
    l1 [label="DP6"];
    m0 [label="Client"]
    m1 [label="Client"]
    n [label="Client"]
    end [shape=box];
}

图中的 Client 是一个 DataPipe,它发送请求并从 multiprocessing 队列接收响应。

确定性

DataLoader2 中,SeedGenerator 成为随机性的单一来源,每个 ReadingService 将通过 initialize_iteration() 访问它,并为随机 DataPipe 操作生成相应的随机种子。

为了确保数据集分片在 multiprocessing 进程和分布式节点上是互斥且共同穷尽的,MultiProcessingReadingServiceDistributedReadingService 将帮助 DataLoader2 同步在 sharding_filtersharding_round_robin_dispatch 之前的任何随机 DataPipe 操作的随机状态。对于分片后的其余 DataPipe 操作,每个 ReadingService 会根据分布式等级和 worker 进程 ID 生成唯一的随机状态,以便执行不同的随机变换。

图模式

这也使得数据预处理 pipeline 从研究到生产的过渡更容易。在使用 ReadingServices 创建和验证 DataPipe 图后,可以向 DataLoader2 提供一个不同的 ReadingService,该服务配置并连接到生产服务/基础架构(如 AIStore)作为直接替换。 ReadingService 可以潜在地搜索图,并找到可以委托给生产服务/基础架构的 DataPipe 操作,然后相应地修改图以实现更高性能的执行。

扩展 ReadingService

以下是自定义 ReadingService 的接口。

class torchdata.dataloader2.ReadingServiceInterface

ReadingService 的接口。请根据此接口类扩展自定义 ReadingService

ReadingService 在调用 initialize 之前必须是可 pickle 的。这是因为 DataLoader2 会创建一个它的副本,以避免多个 DataLoader2 使用同一个 ReadingService 对象,并且它的内部状态会被它们中的每一个修改。

由于此限制,某些初始化步骤可能需要在 initialize 方法中进行,而不是在 ReadingService 类的 __init__ 中进行。

finalize() None

ReadingService 清理内部状态并完全关闭服务。在 DataLoader2shutdown__del__ 中调用。

finalize_iteration() None

在一个 epoch 结束后,ReadingService 结束服务。在 DataLoader2 的迭代器耗尽时调用。

abstract initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe]

ReadingService 接收一个 DataPipe 图,并根据自定义需要将其修改为一个新的 DataPipe 图。在第一次创建 DataLoader2 迭代器时调用一次。在调用此方法之前,ReadingService 对象必须是可 pickle 的。

参数:

datapipe – 原始的 DataPipe 图。

返回值:

修改后的或新的 DataPipe 图。

initialize_iteration(seed_generator: SeedGenerator, iter_reset_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] = None) Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]]

ReadingService 会为每个 epoch 启动服务。在每次获取 DataLoader2 迭代器时调用。

参数:
  • seed_generator – 由 DataLoader2 创建和管理的 SeedGenerator 对象。作为随机性的唯一来源,它将控制 DataPipes 图中所有随机操作的确定性。

  • iter_reset_fn – 当 SequentialReadingService 将多个 ReadingServices 链式连接时,来自先前 ReadingServcie 的可选重置函数

返回值:

供后续 ReadingService 使用的新 iter_reset_fn

示例

MultiProcessingReadingService 开始为每个进程设置工作进程种子,并从图中预取项目。

检查点/快照功能正在开发中。以下是初步接口(可能会有一些小改动)

class torchdata.dataloader2.CheckpointableReadingServiceInterface

使用两个附加方法扩展 ReadingServiceInterface,以保存/恢复数据处理图的状态。

abstract checkpoint() bytes

ReadingService 序列化内部状态。在 DataLoader2.state_dict 中调用。

abstract restore(datapipe: Union[IterDataPipe, MapDataPipe], serialized_state: bytes) Union[IterDataPipe, MapDataPipe]

ReadingService 根据序列化状态调整 DataPipe 图。在第一次创建 DataLoader2 迭代器时调用一次。与 initialize 相对应,后者从头开始调整 DataPipe 图。

参数:
  • datapipe – 由 ReadingService 调整前的原始 DataPipe

  • serialized_state – 用于恢复调整后的 DataPipe 图状态的内部状态的序列化状态。

返回值:

从序列化状态生成的调整后的 DataPipe

图函数

此外,torchdata.dataloader.graph 中提供了图实用函数,以帮助用户为自定义 ReadingService 进行 DataPipe 图重写。

traverse_dps

遍历 DataPipes 及其属性以提取 DataPipe 图。

find_dps

给定由 traverse_dps 函数生成的 DataPipe 图,返回具有提供的 DataPipe 类型的 DataPipe 实例。

list_dps

给定由 traverse_dps 函数生成的 DataPipe 图,返回所有 DataPipe 实例的列表,不重复。

remove_dp

给定由 traverse_dps 函数生成的 DataPipe 图和要删除的 DataPipe,返回新的 DataPipe 图。

replace_dp

给定由 traverse_dps 函数生成的 DataPipe 图、要替换的 DataPipe 和新的 DataPipe,返回新的 DataPipe 图。

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的答案

查看资源