快捷方式

ReadingService

ReadingService 处理基于不同用例的 DataPipe 图的原地修改。

功能

动态分片

动态分片通过 MultiProcessingReadingServiceDistributedReadingService 实现,它们根据相应的多进程和分布式工作者的信息对管道进行分片。此外,TorchData 提供了两种类型的 DataPipe,让用户可以在管道中定义分片位置。

  • sharding_filter (ShardingFilter): 当管道可复制时,每个分布式/多进程工作者都会从其自身的 DataPipe 图副本加载数据,同时跳过不属于相应工作者的样本,跳过位置是 sharding_filter 的放置位置。

  • sharding_round_robin_dispatch (ShardingRoundRobinDispatcher): 当管道中存在任何 sharding_round_robin_dispatch DataPipe 时,该分支(即所有先于 sharding_round_robin_dispatch 的 DataPipes)将被视为不可复制分支(在多进程的上下文中)。将创建单个调度进程来从不可复制分支加载数据并将其分发到后续的工作者进程。

以下是管道中使用两种类型的分片策略的示例。

digraph Example {
    subgraph cluster_replicable {
        label="Replicable"
        a -> b -> c -> d -> l;
        color=blue;
    }

    subgraph cluster_non_replicable {
        style=filled;
        color=lightgrey;
        node [style=filled,color=white];
        label="Non-Replicable"
        e -> f -> g -> k;
        h -> i -> j -> k;
    }

    k -> l -> fullsync -> end;

    a [label="DP1"];
    b [label="shuffle"];
    c [label="sharding_filter", color=blue];
    d [label="DP4"];
    e [label="DP2"];
    f [label="shuffle"];
    g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
    h [label="DP3"];
    i [label="shuffle"];
    j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
    k [label="DP5 (Lowest common ancestor)"];
    l [label="DP6"];
    fullsync;
    end [shape=box];
}

当进行多进程时,图形变为

digraph Example {
    subgraph cluster_worker_0 {
        label="Worker 0"
        a0 -> b0 -> c0 -> d0 -> l0;
        m0 -> l0;
        color=blue;
    }

    subgraph cluster_worker_1 {
        label="Worker 1"
        a1 -> b1 -> c1 -> d1 -> l1;
        m1 -> l1;
        color=blue;
    }

    subgraph cluster_non_replicable {
        style=filled;
        color=lightgrey;
        node [style=filled,color=white];
        label="Non-Replicable"
        e -> f -> g -> k;
        h -> i -> j -> k;
        k -> round_robin_demux;
    }

    round_robin_demux -> m0;
    round_robin_demux -> m1;
    l0 -> n;
    l1 -> n;
    n -> fullsync -> end;

    a0 [label="DP1"];
    b0 [label="shuffle"];
    c0 [label="sharding_filter", color=blue];
    d0 [label="DP4"];
    a1 [label="DP1"];
    b1 [label="shuffle"];
    c1 [label="sharding_filter", color=blue];
    d1 [label="DP4"];
    e [label="DP2"];
    f [label="shuffle"];
    g [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
    h [label="DP3"];
    i [label="shuffle"];
    j [label="sharding_round_robin_dispatch", style="filled,rounded", color=red, fillcolor=white];
    k [label="DP5 (Lowest common ancestor)"];
    fullsync;
    l0 [label="DP6"];
    l1 [label="DP6"];
    m0 [label="Client"]
    m1 [label="Client"]
    n [label="Client"]
    end [shape=box];
}

Client 在图形中是一个 DataPipe,它发送请求并接收来自多进程队列的响应。

确定性

DataLoader2 中,SeedGenerator 成为随机性的单一来源,每个 ReadingService 都可以通过 initialize_iteration() 访问它并为随机 DataPipe 操作生成相应的随机种子。

为了确保数据集分片在多进程和分布式节点上是相互排斥且集体穷举的,MultiProcessingReadingServiceDistributedReadingService 将帮助 DataLoader2 同步任何随机 DataPipe 操作(先于 sharding_filtersharding_round_robin_dispatch)的随机状态。对于分片后剩余的 DataPipe 操作,每个 ReadingService 会基于分布式等级和工作者进程 ID 生成唯一的随机状态,以便执行不同的随机转换。

图形模式

这也简化了数据预处理管道从研究到生产的过渡。在 DataPipe 图被创建并使用 ReadingServices 验证后,可以将配置并连接到生产服务/基础设施(例如 AIStore)的不同 ReadingService 作为直接替换提供给 DataLoader2ReadingService 可能能够搜索图形并找到可以委托给生产服务/基础设施的 DataPipe 操作,然后相应地修改图形以实现更高效的执行。

扩展 ReadingService

以下是用于自定义 ReadingService 的接口。

class torchdata.dataloader2.ReadingServiceInterface

用于 ReadingService 的接口。请基于此接口类扩展自定义 ReadingService

在调用 initialize 之前,ReadingService 必须是可腌制的。这是因为 DataLoader2 将创建它的副本以避免同一个 ReadingService 对象被多个 DataLoader2 使用,并且它的内部状态将可被它们修改。

由于此约束,某些初始化步骤可能需要在 initialize 方法中执行,而不是在 ReadingService 类的 __init__ 中执行。

finalize() None

ReadingService 清理内部状态并完全关闭服务。在 DataLoader2shutdown__del__ 中调用。

finalize_iteration() None

在完成一个 epoch 后,ReadingService 结束服务。当 DataLoader2 的迭代器耗尽时调用。

abstract initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe]

ReadingService 接收一个 DataPipe 图,根据自定义需求将其改造成一个新的 DataPipe 图。在第一次创建 DataLoader2 迭代器时被调用一次。在调用此方法之前,ReadingService 对象必须是可拾取的。

参数:

datapipe – 原始的 DataPipe 图。

返回:

一个经过改造的或新的 DataPipe 图。

initialize_iteration(seed_generator: SeedGenerator, iter_reset_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] = None) Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]]

ReadingService 为一个 epoch 启动服务。在每次获取 DataLoader2 迭代器开始时被调用。

参数:
  • seed_generator – 由 DataLoader2 创建和管理的 SeedGenerator 对象。作为随机性的单一来源,它将控制 DataPipes 图中所有随机操作的确定性。

  • iter_reset_fn – 当 SequentialReadingService 将多个 ReadingServices 链接在一起时,来自先前 ReadingServcie 的可选重置函数。

返回:

一个新的 iter_reset_fn 供后续 ReadingService 使用。

示例

MultiProcessingReadingService 开始设置每个进程的 worker 种子并从图中预取项目。

检查点/快照功能正在开发中。以下是初步接口(可能会有微小的变化)。

class torchdata.dataloader2.CheckpointableReadingServiceInterface

扩展 ReadingServiceInterface ,添加两个用于保存/恢复数据处理图状态的额外方法。

abstract checkpoint() bytes

ReadingService 序列化内部状态。在 DataLoader2.state_dict 中被调用。

abstract restore(datapipe: Union[IterDataPipe, MapDataPipe], serialized_state: bytes) Union[IterDataPipe, MapDataPipe]

ReadingService 根据序列化状态调整 DataPipe 图。在第一次创建 DataLoader2 迭代器时被调用一次。是 initialize 的对应方法,后者从头开始调整 DataPipe 图。

参数:
  • datapipe – 原始的 DataPipe 图,在被 ReadingService 调整之前。

  • serialized_state – 用于恢复经过调整的 DataPipe 图状态的内部状态的序列化状态。

返回:

从序列化状态生成的经过调整的 DataPipe

图函数

此外,图实用函数在 torchdata.dataloader.graph 中提供,以帮助用户为自定义 ReadingService 进行 DataPipe 图重写。

traverse_dps

遍历 DataPipes 及其属性以提取 DataPipe 图。

find_dps

给定由 traverse_dps 函数生成的 DataPipe 图,返回具有提供的 DataPipe 类型的 DataPipe 实例。

list_dps

给定由 traverse_dps 函数生成的 DataPipe 图,返回所有 DataPipe 实例的列表,不重复。

remove_dp

给定由 traverse_dps 函数生成的 DataPipe 图以及要删除的 DataPipe,返回新的 DataPipe 图。

replace_dp

给定由 traverse_dps 函数生成的 DataPipe 图以及要替换的 DataPipe 和新的 DataPipe,返回新的 DataPipe 图。

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取针对初学者和高级开发者的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源