ThreadPoolMapper¶
- class torchdata.datapipes.iter.ThreadPoolMapper(source_datapipe: IterDataPipe, fn: Callable, input_col=None, output_col=None, scheduled_tasks: int = 128, max_workers: Optional[int] = None, **threadpool_kwargs)¶
使用
ThreadPoolExecutor
并发地对源 DataPipe 中的每个项目应用函数(函数名称:threadpool_map
)。该函数可以是任何常规 Python 函数或部分对象。不建议使用 Lambda 函数,因为它不受 pickle 支持。- 参数:
source_datapipe – 源 IterDataPipe
fn – 应用于每个项目的函数
input_col –
应用
fn
的数据索引或索引,例如None
作为默认值,表示直接将fn
应用于数据。整数用于列表/元组。
键用于字典。
output_col –
放置
fn
结果的数据索引。output_col
仅在input_col
不是None
时才能指定None
作为默认值,表示替换input_col
指定的索引;对于具有多个索引的input_col
,使用最左边的索引,并删除其他索引。整数用于列表/元组。
-1
表示将结果追加到末尾。键用于字典。可以接受新的键。
scheduled_tasks – 任何给定时间安排的任务数量(默认值:128)
max_workers – 执行函数调用的最大线程数
**threadpool_kwargs – 传递给
ThreadPoolExecutor
的其他参数
注意
有关
max_workers
和ThreadPoolExecutor
的其他参数的更多信息,请参阅:https://docs.pythonlang.cn/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor注意
为了最佳地利用所有线程,强烈建议
scheduled_tasks
>max_workers
。给定fn
完成执行所需时间的差异越大,scheduled_tasks
的值就需要越大,以避免线程在等待下一个结果时处于空闲状态(因为结果按正确的顺序返回)。但是,
scheduled_tasks
的值过高可能会导致在生成第一个元素之前等待很长时间,因为在生成之前,next
在source_datapipe
上被调用了scheduled_tasks
次。我们鼓励您尝试不同的
max_workers
和scheduled_tasks
值,以找到适合您的用例的最佳值。示例
# fetching html from remote def fetch_html(url: str, **kwargs): r = requests.get(url, **kwargs) r.raise_for_status() return r.content dp = IterableWrapper(urls) dp = dp.threadpool_map(fetch_html,max_workers=16)
def mul_ten(x): time.sleep(0.1) return x * 10 dp = IterableWrapper([(i, i) for i in range(50)]) dp = dp.threadpool_map(mul_ten, input_col=1) print(list(dp))
[(0, 0), (1, 10), (2, 20), (3, 30), ...]
dp = IterableWrapper([(i, i) for i in range(50)]) dp = dp.threadpool_map(mul_ten, input_col=1, output_col=-1) print(list(dp))
[(0, 0, 0), (1, 1, 10), (2, 2, 20), (3, 3, 30), ...]