快捷方式

DataLoader2 教程

本教程面向希望创建 DataPipe 图并在不同后端系统(ReadingService)中使用 DataLoader2 加载数据的用户。可以在 这个 colab 笔记本 中找到使用示例。

数据管道

有关详细信息,请参阅 数据管道教程。以下是一些必须注意的最重要事项:为了确保数据管道在每个 epoch 中具有不同的顺序,并且数据分片互斥且集体穷尽

  • 在管道中尽早放置 sharding_filtersharding_round_robin_dispatch,以避免在 worker/分布式进程中重复昂贵的操作。

  • 在分片之前添加 shuffle 数据管道以实现分片间混洗。ReadingService 将处理这些 shuffle 操作的同步,以确保数据顺序在分片之前相同,从而使所有分片互斥且集体穷尽。

以下是一个 DataPipe 图的示例

datapipe = IterableWrapper(["./train1.csv", "./train2.csv"])
datapipe = datapipe.open_files(encoding="utf-8").parse_csv()
datapipe = datapipe.shuffle().sharding_filter()
datapipe = datapipe.map(fn).batch(8)

多处理

MultiProcessingReadingService 处理 sharding_filter 处的多处理分片,并跨 worker 进程同步种子。

rs = MultiProcessingReadingService(num_workers=4)
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
    dl.seed(epoch)
    for d in dl:
        model(d)
dl.shutdown()

分布式

DistributedReadingService 处理 sharding_filter 处的分布式分片,并跨分布式进程同步种子。为了平衡分布式节点上的数据分片,将 fullsync DataPipe 附加到 DataPipe 图,以对齐分布式等级上的批次数量。这将防止由分布式训练中不均匀的分片导致的挂起问题。

rs = DistributedReadingService()
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
    dl.seed(epoch)
    for d in dl:
        model(d)
dl.shutdown()

多处理 + 分布式

SequentialReadingService 可用于将 ReadingServices 组合在一起,以同时实现多处理和分布式训练。

mp_rs = MultiProcessingReadingService(num_workers=4)
dist_rs = DistributedReadingService()
rs = SequentialReadingService(dist_rs, mp_rs)

dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
    dl.seed(epoch)
    for d in dl:
        model(d)
dl.shutdown()

文档

访问 PyTorch 的综合开发者文档

查看文档

教程

获取针对初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源