分片循环调度器¶
- class torchdata.datapipes.iter.ShardingRoundRobinDispatcher(source_datapipe: IterDataPipe, sharding_group_filter: Optional[SHARDING_PRIORITIES] = None)¶
包装器,指示
DataPipe
图的先前部分不可复制,并且将在单独的单个调度进程中进行迭代,以便在使用多处理时以循环方式将数据分发到工作进程。(函数名称:sharding_round_robin_dispatch
)。- 参数:
source_datapipe – 将被分片的 Iterable DataPipe
sharding_group_filter – 可选的
SHARDING_PRIORITIES
值
注意
sharding_group_filter
目前仅接受SHARDING_PRIORITIES.MULTIPROCESSING
在使用分布式训练时,可以在此 DataPipe 之前添加
sharding_filter()
以在工作节点之间分发样本。
示例
>>> # xdoctest: +SKIP >>> from torchdata.datapipes.iter import IterableWrapper >>> from torch.utils.data.datapipes.iter.sharding import SHARDING_PRIORITIES >>> dp = IterableWrapper(range(10)) >>> # `.shuffle()` will be executed in a single dispatching processing, then the samples are distributed >>> # to worker processes >>> dp = dp.shuffle().sharding_round_robin_dispatch(SHARDING_PRIORITIES.MULTIPROCESSING) >>> # `.map()` will be executed within each worker process >>> dp = dp.map(lambda x: x + 1) >>> # Distributed case: the 10 samples will be distributed among the nodes >>> dp = IterableWrapper(range(10)).sharding_filter() >>> # `.map()` will be executed in a single dispatching processing in each node >>> # You may apply further transformation after within each worker process >>> dp = dp.map(lambda x: x + 1).sharding_round_robin_dispatch(SHARDING_PRIORITIES.MULTIPROCESSING)