DataLoader2 教程¶
本教程面向希望创建 DataPipe 图并使用不同后端系统(ReadingService)通过 DataLoader2 加载数据的用户。使用示例可以在 此 colab 笔记本 中找到。
DataPipe¶
有关更多详细信息,请参阅 DataPipe 教程。以下是需要特别注意的事项: 确保数据流水线在每个时期具有不同的顺序,并且数据分片相互排斥且共同构成完整的数据集
- 将 - sharding_filter或- sharding_round_robin_dispatch放在流水线中尽可能早的位置,以避免在工作进程/分布式进程中重复执行耗时的操作。
- 在分片之前添加 - shuffleDataPipe 以实现分片间混洗。- ReadingService将处理这些- shuffle操作的同步,以确保在分片之前数据的顺序相同,从而使所有分片相互排斥且共同构成完整的数据集。
以下是 DataPipe 图的示例
datapipe = IterableWrapper(["./train1.csv", "./train2.csv"])
datapipe = datapipe.open_files(encoding="utf-8").parse_csv()
datapipe = datapipe.shuffle().sharding_filter()
datapipe = datapipe.map(fn).batch(8)
多进程¶
MultiProcessingReadingService 在 sharding_filter 处处理多进程分片,并在工作进程之间同步种子。
rs = MultiProcessingReadingService(num_workers=4)
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
    dl.seed(epoch)
    for d in dl:
        model(d)
dl.shutdown()
分布式¶
DistributedReadingService 在 sharding_filter 处处理分布式分片,并在分布式进程之间同步种子。此外,为了在分布式节点之间平衡数据分片,将向 DataPipe 图附加一个 fullsync DataPipe,以对齐分布式排名之间的批次数。这将防止由于分布式训练中分片不均匀而导致的挂起问题。
rs = DistributedReadingService()
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
    dl.seed(epoch)
    for d in dl:
        model(d)
dl.shutdown()
多进程 + 分布式¶
SequentialReadingService 可用于将 ReadingServices 组合在一起,以同时实现多进程和分布式训练。
mp_rs = MultiProcessingReadingService(num_workers=4)
dist_rs = DistributedReadingService()
rs = SequentialReadingService(dist_rs, mp_rs)
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
    dl.seed(epoch)
    for d in dl:
        model(d)
dl.shutdown()