DataLoader2 教程¶
本教程面向希望创建 DataPipe
图并在不同后端系统(ReadingService
)中使用 DataLoader2
加载数据的用户。可以在 这个 colab 笔记本 中找到使用示例。
数据管道¶
有关详细信息,请参阅 数据管道教程。以下是一些必须注意的最重要事项:为了确保数据管道在每个 epoch 中具有不同的顺序,并且数据分片互斥且集体穷尽
在管道中尽早放置
sharding_filter
或sharding_round_robin_dispatch
,以避免在 worker/分布式进程中重复昂贵的操作。在分片之前添加
shuffle
数据管道以实现分片间混洗。ReadingService
将处理这些shuffle
操作的同步,以确保数据顺序在分片之前相同,从而使所有分片互斥且集体穷尽。
以下是一个 DataPipe
图的示例
datapipe = IterableWrapper(["./train1.csv", "./train2.csv"])
datapipe = datapipe.open_files(encoding="utf-8").parse_csv()
datapipe = datapipe.shuffle().sharding_filter()
datapipe = datapipe.map(fn).batch(8)
多处理¶
MultiProcessingReadingService
处理 sharding_filter
处的多处理分片,并跨 worker 进程同步种子。
rs = MultiProcessingReadingService(num_workers=4)
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
dl.seed(epoch)
for d in dl:
model(d)
dl.shutdown()
分布式¶
DistributedReadingService
处理 sharding_filter
处的分布式分片,并跨分布式进程同步种子。为了平衡分布式节点上的数据分片,将 fullsync
DataPipe
附加到 DataPipe
图,以对齐分布式等级上的批次数量。这将防止由分布式训练中不均匀的分片导致的挂起问题。
rs = DistributedReadingService()
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
dl.seed(epoch)
for d in dl:
model(d)
dl.shutdown()
多处理 + 分布式¶
SequentialReadingService
可用于将 ReadingServices
组合在一起,以同时实现多处理和分布式训练。
mp_rs = MultiProcessingReadingService(num_workers=4)
dist_rs = DistributedReadingService()
rs = SequentialReadingService(dist_rs, mp_rs)
dl = DataLoader2(datapipe, reading_service=rs)
for epoch in range(10):
dl.seed(epoch)
for d in dl:
model(d)
dl.shutdown()