DataLoader2 教程¶
本教程面向希望创建 DataPipe
图并使用不同后端系统(ReadingService
)通过 DataLoader2
加载数据的用户。使用示例可以在 此 colab 笔记本 中找到。
DataPipe¶
有关更多详细信息,请参阅 DataPipe 教程。以下是需要特别注意的事项: 确保数据流水线在每个时期具有不同的顺序,并且数据分片相互排斥且共同构成完整的数据集
将
sharding_filter
或sharding_round_robin_dispatch
放在流水线中尽可能早的位置,以避免在工作进程/分布式进程中重复执行耗时的操作。在分片之前添加
shuffle
DataPipe 以实现分片间混洗。ReadingService
将处理这些shuffle
操作的同步,以确保在分片之前数据的顺序相同,从而使所有分片相互排斥且共同构成完整的数据集。
以下是 DataPipe
图的示例
datapipe = IterableWrapper(["./train1.csv", "./train2.csv"])
datapipe = datapipe.open_files(encoding="utf-8").parse_csv()
datapipe = datapipe.shuffle().sharding_filter()
datapipe = datapipe.map(fn).batch(8)
多进程¶
MultiProcessingReadingService
在 sharding_filter
处处理多进程分片,并在工作进程之间同步种子。
rs = MultiProcessingReadingService(num_workers=4)
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
dl.seed(epoch)
for d in dl:
model(d)
dl.shutdown()
分布式¶
DistributedReadingService
在 sharding_filter
处处理分布式分片,并在分布式进程之间同步种子。此外,为了在分布式节点之间平衡数据分片,将向 DataPipe
图附加一个 fullsync
DataPipe
,以对齐分布式排名之间的批次数。这将防止由于分布式训练中分片不均匀而导致的挂起问题。
rs = DistributedReadingService()
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
dl.seed(epoch)
for d in dl:
model(d)
dl.shutdown()
多进程 + 分布式¶
SequentialReadingService
可用于将 ReadingServices
组合在一起,以同时实现多进程和分布式训练。
mp_rs = MultiProcessingReadingService(num_workers=4)
dist_rs = DistributedReadingService()
rs = SequentialReadingService(dist_rs, mp_rs)
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
dl.seed(epoch)
for d in dl:
model(d)
dl.shutdown()