torch.futures¶
此包提供了一个 Future
类型,用于封装异步执行以及一组用于简化 Future
对象操作的工具函数。目前,Future
类型主要由分布式 RPC 框架使用。
- class torch.futures.Future(*, devices=None)¶
它是
torch._C.Future
的封装,用于封装可调用对象(例如rpc_async()
)的异步执行。它还提供了一组 API 来添加回调函数和设置结果。警告
GPU 支持是测试版功能,可能会有所更改。
- add_done_callback(callback)[source][source]¶
将给定的回调函数附加到此
Future
,该函数将在Future
完成时运行。可以将多个回调添加到同一个Future
,但无法保证它们的执行顺序。回调必须接受一个参数,即此Future
的引用。回调函数可以使用value()
方法获取值。请注意,如果此Future
已经完成,则给定回调将立即内联运行。我们建议您使用
then()
方法,因为它提供了一种在回调完成后同步的方式。如果您的回调不返回任何内容,add_done_callback
可能成本更低。但这两种方法 (then()
和add_done_callback
) 底层都使用相同的回调注册 API。对于 GPU 张量,此方法的行为方式与
then()
相同。- 参数
callback (
Future
) – 一个Callable
,接受一个参数,该参数是此Future
的引用。
注意
请注意,如果回调函数抛出异常,无论是由于原始 Future 以异常完成并调用
fut.wait()
,还是由于回调中的其他代码,都必须仔细处理错误。例如,如果此回调稍后完成额外的 futures,这些 futures 不会被标记为以错误完成,用户需要独立负责处理这些 futures 的完成/等待。- 示例:
>>> def callback(fut): ... print("This will run after the future has finished.") ... print(fut.wait()) >>> fut = torch.futures.Future() >>> fut.add_done_callback(callback) >>> fut.set_result(5) This will run after the future has finished. 5
- done()[source][source]¶
如果此
Future
已完成,则返回True
。一个Future
完成表示它已有结果或异常。如果该值包含驻留在 GPU 上的张量,即使填充这些张量的异步内核尚未在设备上完成运行,
Future.done()
也会返回True
,因为在此阶段结果已经可用,前提是执行了适当的同步(参见wait()
)。- 返回类型
- set_exception(result)[source][source]¶
为此
Future
设置一个异常,这将把此Future
标记为因错误完成,并触发所有附加的回调。请注意,在此Future
上调用wait()
/value()
时,此处设置的异常将内联抛出。- 参数
result (BaseException) – 此
Future
的异常。
- 示例:
>>> fut = torch.futures.Future() >>> fut.set_exception(ValueError("foo")) >>> fut.wait() Traceback (most recent call last): ... ValueError: foo
- set_result(result)[source][source]¶
为此
Future
设置结果,这将把此Future
标记为已完成,并触发所有附加的回调。请注意,一个Future
不能被标记完成两次。如果结果包含驻留在 GPU 上的张量,即使填充这些张量的异步内核尚未在设备上完成运行,只要调用此方法时,enqueue 这些内核的 stream 被设置为当前 stream,也可以调用此方法。简单来说,只要中间不改变 stream,就可以在启动这些内核后立即安全地调用此方法,无需额外同步。此方法将在所有相关的当前 stream 上记录事件,并使用它们确保此
Future
的所有 consumer 都能正确调度。- 参数
result (object) – 此
Future
的结果对象。
- 示例:
>>> import threading >>> import time >>> def slow_set_future(fut, value): ... time.sleep(0.5) ... fut.set_result(value) >>> fut = torch.futures.Future() >>> t = threading.Thread( ... target=slow_set_future, ... args=(fut, torch.ones(2) * 3) ... ) >>> t.start() >>> print(fut.wait()) tensor([3., 3.]) >>> t.join()
- then(callback)[source][source]¶
将给定的回调函数附加到此
Future
,该函数将在Future
完成时运行。可以将多个回调添加到同一个Future
,但无法保证它们的执行顺序(若要强制特定顺序,请考虑链式调用:fut.then(cb1).then(cb2)
)。回调必须接受一个参数,即此Future
的引用。回调函数可以使用value()
方法获取值。请注意,如果此Future
已经完成,则给定回调将立即内联运行。如果
Future
的值包含驻留在 GPU 上的张量,即使填充这些张量的异步内核尚未在设备上完成执行,也可能会调用回调。但是,调用回调时会设置一些专用的 stream 作为当前 stream(从全局池中获取),这些 stream 将与这些内核同步。因此,回调在此张量上执行的任何操作都将在内核完成后在设备上调度。换句话说,只要回调不切换 stream,就可以安全地操作结果而无需额外的同步。这类似于wait()
的非阻塞行为。同样,如果回调返回的值包含驻留在 GPU 上的张量,即使产生这些张量的内核仍在设备上运行,只要回调在执行期间未更改 stream,也可以这样做。如果需要更改 stream,则必须注意将其与原始 stream(即调用回调时当前的那些 stream)重新同步。
- 参数
callback (
Callable
) – 一个Callable
,接受此Future
作为唯一参数。- 返回
一个新的
Future
对象,它持有回调的返回值,并在给定回调完成时被标记为已完成。- 返回类型
Future[S]
注意
请注意,如果回调函数抛出异常,无论是由于原始 Future 以异常完成并调用
fut.wait()
,还是由于回调中的其他代码,由then
返回的 Future 都将 appropriately 标记遇到的错误。但是,如果此回调稍后完成额外的 futures,这些 futures 不会被标记为以错误完成,用户需要独立负责处理这些 futures 的完成/等待。- 示例:
>>> def callback(fut): ... print(f"RPC return value is {fut.wait()}.") >>> fut = torch.futures.Future() >>> # The inserted callback will print the return value when >>> # receiving the response from "worker1" >>> cb_fut = fut.then(callback) >>> chain_cb_fut = cb_fut.then( ... lambda x : print(f"Chained cb done. {x.wait()}") ... ) >>> fut.set_result(5) RPC return value is 5. Chained cb done. None
- value()[source][source]¶
获取已完成 Future 的值。
此方法只能在调用
wait()
完成后或在传递给then()
的回调函数内部调用。在其他情况下,此Future
可能尚未持有值,调用value()
可能会失败。如果该值包含驻留在 GPU 上的张量,则此方法不会执行任何额外同步。这应事先通过调用
wait()
单独完成(回调内部除外,then()
已处理)。- 返回
此
Future
持有的值。如果创建值(回调或 RPC)的函数抛出错误,此value()
方法也会抛出错误。- 返回类型
T
- wait()[source][source]¶
阻塞直到此
Future
的值准备就绪。如果该值包含驻留在 GPU 上的张量,则会与可能异步填充这些张量的内核(在设备上执行)执行额外同步。这种同步是非阻塞的,这意味着
wait()
将在当前 stream 中插入必要的指令,以确保 enqueue 到这些 stream 的后续操作将在异步内核之后正确调度,但一旦完成,wait()
将返回,即使这些内核仍在运行。只要不改变 stream,访问和使用这些值时就不需要进一步同步。- 返回
此
Future
持有的值。如果创建值(回调或 RPC)的函数抛出错误,此wait
方法也会抛出错误。- 返回类型
T
- torch.futures.collect_all(futures)[source][source]¶
将提供的
Future
对象收集到一个组合的Future
中,当所有子 Future 完成时,该组合 Future 完成。- 参数
- 返回
返回一个指向传入的 Futures 列表的
Future
对象。- 返回类型
- 示例:
>>> fut0 = torch.futures.Future() >>> fut1 = torch.futures.Future() >>> fut = torch.futures.collect_all([fut0, fut1]) >>> fut0.set_result(0) >>> fut1.set_result(1) >>> fut_list = fut.wait() >>> print(f"fut0 result = {fut_list[0].wait()}") fut0 result = 0 >>> print(f"fut1 result = {fut_list[1].wait()}") fut1 result = 1