torch.futures¶
此包提供一个 Future
类型,它封装了异步执行和一组实用程序函数,以简化对 Future
对象的操作。目前,Future
类型主要由 分布式 RPC 框架 使用。
- class torch.futures.Future(*, devices=None)¶
围绕
torch._C.Future
的包装器,它封装了可调用对象的异步执行,例如rpc_async()
。它还公开了一组 API 以添加回调函数和设置结果。警告
GPU 支持是测试版功能,可能会发生变化。
- add_done_callback(callback)[源代码]¶
将给定的回调函数追加到此
Future
,该函数将在Future
完成时运行。可以将多个回调函数添加到同一个Future
,但不能保证它们执行的顺序。回调函数必须接受一个参数,该参数是此Future
的引用。回调函数可以使用value()
方法获取值。请注意,如果此Future
已经完成,则给定的回调函数将立即运行。我们建议您使用
then()
方法,因为它提供了一种在回调函数完成之后进行同步的方法。如果您的回调函数不返回值,add_done_callback
可能会更便宜。但then()
和add_done_callback
都使用幕后的相同回调注册 API。对于 GPU 张量,此方法的行为与
then()
相同。- 参数
callback (
Future
) – 一个Callable
,它接受一个参数,该参数是此Future
的引用。
注意
请注意,如果回调函数引发异常,无论是通过原始 future 完成异常并调用
fut.wait()
,还是通过回调中的其他代码引发异常,都必须仔细处理错误处理。例如,如果此回调函数稍后完成其他 future,则这些 future 不会被标记为完成异常,用户负责独立处理这些 future 的完成/等待。- 示例:
>>> def callback(fut): ... print("This will run after the future has finished.") ... print(fut.wait()) >>> fut = torch.futures.Future() >>> fut.add_done_callback(callback) >>> fut.set_result(5) This will run after the future has finished. 5
- done()[源代码]¶
如果此
Future
已完成,则返回True
。如果Future
具有结果或异常,则表示已完成。如果值包含驻留在 GPU 上的张量,即使填充这些张量的异步内核尚未完成在设备上运行,
Future.done()
也会返回True
,因为在此时结果已可以使用,前提是您执行了适当的同步(请参阅wait()
)。- 返回类型
- set_exception(result)[source]¶
为该
Future
设置异常,这将标记该Future
为已完成但出现错误,并触发所有附加的回调。请注意,当对该Future
调用 wait()/value() 时,此处设置的异常将被内联引发。- 参数
result (BaseException) – 该
Future
的异常。
- 示例:
>>> fut = torch.futures.Future() >>> fut.set_exception(ValueError("foo")) >>> fut.wait() Traceback (most recent call last): ... ValueError: foo
- set_result(result)[source]¶
为该
Future
设置结果,这将标记该Future
为已完成并触发所有附加的回调。请注意,一个Future
无法被标记为两次完成。如果结果包含驻留在 GPU 上的张量,则即使填充这些张量的异步内核尚未完成在设备上的运行,也可以调用此方法,前提是在调用此方法时,将那些内核排队的流设置为当前流。简而言之,在启动这些内核之后,无需任何额外的同步,就可以安全地调用此方法,只要在此期间不更改流即可。此方法将在所有相关的当前流上记录事件,并使用它们来确保此
Future
所有消费者的正确调度。- 参数
result (object) – 该
Future
的结果对象。
- 示例:
>>> import threading >>> import time >>> def slow_set_future(fut, value): ... time.sleep(0.5) ... fut.set_result(value) >>> fut = torch.futures.Future() >>> t = threading.Thread( ... target=slow_set_future, ... args=(fut, torch.ones(2) * 3) ... ) >>> t.start() >>> print(fut.wait()) tensor([3., 3.]) >>> t.join()
- then(callback)[source]¶
将给定的回调函数追加到该
Future
,当该Future
完成时,该回调函数将被运行。可以将多个回调添加到同一个Future
,但不能保证它们执行的顺序(为了强制执行特定的顺序,请考虑链式调用:fut.then(cb1).then(cb2)
)。回调必须接受一个参数,即对该Future
的引用。回调函数可以使用value()
方法获取值。请注意,如果该Future
已经完成,则给定的回调将立即内联运行。如果该
Future
的值包含驻留在 GPU 上的张量,则在填充这些张量的异步内核尚未完成在设备上的执行时,可能会调用回调。但是,回调将被调用,其中一些专用流被设置为当前流(从全局池中获取),这些流将与这些内核同步。因此,回调对这些张量执行的任何操作都将在内核完成后在设备上调度。换句话说,只要回调不切换流,就可以安全地操作结果,无需任何额外的同步。这类似于wait()
的非阻塞行为。类似地,如果回调返回一个包含驻留在 GPU 上的张量的值,即使生成这些张量的内核仍在设备上运行,它也可以这样做,只要回调在执行过程中没有更改流即可。如果想要更改流,必须小心地将其与原始流重新同步,即回调被调用时为当前的流。
- 参数
callback (
Callable
) – 一个Callable
,它将该Future
作为唯一参数。- 返回值
一个新的
Future
对象,它保存callback
的返回值,并在给定的callback
完成时标记为已完成。- 返回类型
Future[S]
注意
请注意,如果回调函数抛出异常,无论是通过原始的 future 完成异常并调用
fut.wait()
,还是通过回调中的其他代码抛出,则由then
返回的 future 将被适当地标记为遇到错误。但是,如果该回调随后完成其他 future,则那些 future 不会被标记为完成错误,用户有责任独立处理那些 future 的完成/等待。- 示例:
>>> def callback(fut): ... print(f"RPC return value is {fut.wait()}.") >>> fut = torch.futures.Future() >>> # The inserted callback will print the return value when >>> # receiving the response from "worker1" >>> cb_fut = fut.then(callback) >>> chain_cb_fut = cb_fut.then( ... lambda x : print(f"Chained cb done. {x.wait()}") ... ) >>> fut.set_result(5) RPC return value is 5. Chained cb done. None
- torch.futures.collect_all(futures)[source]¶
将提供的
Future
对象收集到一个组合的Future
中,该Future
在所有子 future 完成时完成。- 示例:
>>> fut0 = torch.futures.Future() >>> fut1 = torch.futures.Future() >>> fut = torch.futures.collect_all([fut0, fut1]) >>> fut0.set_result(0) >>> fut1.set_result(1) >>> fut_list = fut.wait() >>> print(f"fut0 result = {fut_list[0].wait()}") fut0 result = 0 >>> print(f"fut1 result = {fut_list[1].wait()}") fut1 result = 1