快捷方式

DataLoader2 教程

本教程面向希望创建 DataPipe 图并使用不同后端系统(ReadingService)通过 DataLoader2 加载数据的用户。使用示例可以在 此 colab 笔记本 中找到。

DataPipe

有关更多详细信息,请参阅 DataPipe 教程。以下是需要特别注意的事项: 确保数据流水线在每个时期具有不同的顺序,并且数据分片相互排斥且共同构成完整的数据集

  • sharding_filtersharding_round_robin_dispatch 放在流水线中尽可能早的位置,以避免在工作进程/分布式进程中重复执行耗时的操作。

  • 在分片之前添加 shuffle DataPipe 以实现分片间混洗。 ReadingService 将处理这些 shuffle 操作的同步,以确保在分片之前数据的顺序相同,从而使所有分片相互排斥且共同构成完整的数据集。

以下是 DataPipe 图的示例

datapipe = IterableWrapper(["./train1.csv", "./train2.csv"])
datapipe = datapipe.open_files(encoding="utf-8").parse_csv()
datapipe = datapipe.shuffle().sharding_filter()
datapipe = datapipe.map(fn).batch(8)

多进程

MultiProcessingReadingServicesharding_filter 处处理多进程分片,并在工作进程之间同步种子。

rs = MultiProcessingReadingService(num_workers=4)
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
    dl.seed(epoch)
    for d in dl:
        model(d)
dl.shutdown()

分布式

DistributedReadingServicesharding_filter 处处理分布式分片,并在分布式进程之间同步种子。此外,为了在分布式节点之间平衡数据分片,将向 DataPipe 图附加一个 fullsync DataPipe,以对齐分布式排名之间的批次数。这将防止由于分布式训练中分片不均匀而导致的挂起问题。

rs = DistributedReadingService()
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
    dl.seed(epoch)
    for d in dl:
        model(d)
dl.shutdown()

多进程 + 分布式

SequentialReadingService 可用于将 ReadingServices 组合在一起,以同时实现多进程和分布式训练。

mp_rs = MultiProcessingReadingService(num_workers=4)
dist_rs = DistributedReadingService()
rs = SequentialReadingService(dist_rs, mp_rs)

dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
    dl.seed(epoch)
    for d in dl:
        model(d)
dl.shutdown()

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源