快捷方式

ThreadPoolMapper

class torchdata.datapipes.iter.ThreadPoolMapper(source_datapipe: IterDataPipe, fn: Callable, input_col=None, output_col=None, scheduled_tasks: int = 128, max_workers: Optional[int] = None, **threadpool_kwargs)

使用 ThreadPoolExecutor 并发地对源 DataPipe 中的每个项目应用函数(函数名称:threadpool_map)。该函数可以是任何常规 Python 函数或部分对象。不建议使用 Lambda 函数,因为它不受 pickle 支持。

参数:
  • source_datapipe – 源 IterDataPipe

  • fn – 应用于每个项目的函数

  • input_col

    应用 fn 的数据索引或索引,例如

    • None 作为默认值,表示直接将 fn 应用于数据。

    • 整数用于列表/元组。

    • 键用于字典。

  • output_col

    放置 fn 结果的数据索引。output_col 仅在 input_col 不是 None 时才能指定

    • None 作为默认值,表示替换 input_col 指定的索引;对于具有多个索引的 input_col,使用最左边的索引,并删除其他索引。

    • 整数用于列表/元组。-1 表示将结果追加到末尾。

    • 键用于字典。可以接受新的键。

  • scheduled_tasks – 任何给定时间安排的任务数量(默认值:128)

  • max_workers – 执行函数调用的最大线程数

  • **threadpool_kwargs – 传递给 ThreadPoolExecutor 的其他参数

注意

有关 max_workersThreadPoolExecutor 的其他参数的更多信息,请参阅:https://docs.pythonlang.cn/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor

注意

为了最佳地利用所有线程,强烈建议 scheduled_tasks > max_workers。给定 fn 完成执行所需时间的差异越大,scheduled_tasks 的值就需要越大,以避免线程在等待下一个结果时处于空闲状态(因为结果按正确的顺序返回)。

但是,scheduled_tasks 的值过高可能会导致在生成第一个元素之前等待很长时间,因为在生成之前,nextsource_datapipe 上被调用了 scheduled_tasks 次。

我们鼓励您尝试不同的 max_workersscheduled_tasks 值,以找到适合您的用例的最佳值。

示例

# fetching html from remote
def fetch_html(url: str, **kwargs):
    r = requests.get(url, **kwargs)
    r.raise_for_status()
    return r.content
dp = IterableWrapper(urls)
dp = dp.threadpool_map(fetch_html,max_workers=16)
def mul_ten(x):
    time.sleep(0.1)
    return x * 10

dp = IterableWrapper([(i, i) for i in range(50)])
dp = dp.threadpool_map(mul_ten, input_col=1)
print(list(dp))
[(0, 0), (1, 10), (2, 20), (3, 30), ...]
dp = IterableWrapper([(i, i) for i in range(50)])
dp = dp.threadpool_map(mul_ten, input_col=1, output_col=-1)
print(list(dp))
[(0, 0, 0), (1, 1, 10), (2, 2, 20), (3, 3, 30), ...]

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取针对初学者和高级开发人员的深度教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源