多进程读取服务¶
- class torchdata.dataloader2.MultiProcessingReadingService(num_workers: int = 0, multiprocessing_context: Optional[str] = None, worker_prefetch_cnt: int = 10, main_prefetch_cnt: int = 10, worker_init_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe], WorkerInfo], Union[IterDataPipe, MapDataPipe]]] = None, worker_reset_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe], WorkerInfo, SeedGenerator], Union[IterDataPipe, MapDataPipe]]] = None)¶
生成多个工作进程以从
DataPipe
图中加载数据。如果图中存在任何不可复制的DataPipe
(sharding_round_robin_dispatch
),将创建一个单独的调度进程以从所有不可复制的DataPipes
的最低共同祖先加载数据,并以循环方式将数据分发到每个工作进程。然后,每个工作进程中的后续DataPipe
图将处理来自调度进程的数据,并最终将结果返回到主进程。- 参数:
num_workers (int) – 用于数据加载的子进程数量。
multiprocessing_context (str, 可选) – 多进程启动方法。如果方法为 None,则返回默认上下文。否则,方法应为 ‘fork’,‘spawn’。
worker_prefetch_cnt – (int,默认值为 10):每个工作进程结束时将预取的数据数量。
main_prefetch_cnt – (int,默认值为 10):在主进程中整个管道结束时将预取的数据数量。
worker_init_fn – (Callable,可选):每个工作进程启动时调用的函数,参数为
DataPipe
和WorkerInfo
。worker_reset_fn – (Callable,可选):在每个工作进程中每个 epoch 开始时调用的函数,参数为
DataPipe
、WorkerInfo
和SeedGenerator
。
- finalize() None ¶
MultiProcessingReadingService
使状态无效并正确退出所有子进程。
- initialize(datapipe: Union[IterDataPipe, MapDataPipe]) Union[IterDataPipe, MapDataPipe] ¶
MultiProcessingReadingService
查找有关分片的信息,将图分成多个部分,并使用队列重新连接它们。创建子进程。
- initialize_iteration(seed_generator: SeedGenerator, iter_reset_fn: Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] = None) Optional[Callable[[Union[IterDataPipe, MapDataPipe]], Union[IterDataPipe, MapDataPipe]]] ¶
ReadingService
为一个 epoch 启动服务。每次获取DataLoader2
迭代器时,在开头调用。- 参数:
seed_generator – 由 DataLoader2 创建和管理的 SeedGenerator 对象。作为随机性的唯一来源,它将控制 DataPipes 图中所有随机操作的确定性。
iter_reset_fn – 当
SequentialReadingService
将多个ReadingServices
连接起来时,来自先前ReadingServcie
的可选重置函数。
- 返回值:
一个新的
iter_reset_fn
,供后续ReadingService
使用
示例
MultiProcessingReadingService 开始为每个进程设置工作器种子,并从图中预取项目。