torch.futures¶
此软件包提供了一个 Future 类型,它封装了一个异步执行和一组实用函数,用于简化对 Future 对象的操作。目前, Future 类型主要由 分布式 RPC 框架 使用。
- class torch.futures.Future(*, devices=None)¶
- 封装可调用函数的异步执行的 - torch._C.Future的包装器,例如- rpc_async()。它还公开了一组 API,用于添加回调函数和设置结果。- 警告 - GPU 支持是一项测试版功能,可能会发生更改。 - add_done_callback(callback)[源代码]¶
- 将给定的回调函数附加到此 - Future,该函数将在- Future完成时运行。可以将多个回调添加到同一个- Future,但无法保证执行它们的顺序。回调必须采用一个参数,即对此- Future的引用。回调函数可以使用- value()方法获取值。请注意,如果此- Future已完成,则将内联运行给定的回调。- 我们建议您使用 - then()方法,因为它提供了一种在回调完成后进行同步的方法。如果您的回调不返回任何内容,- add_done_callback可能更便宜。但- then()和- add_done_callback都在底层使用相同的回调注册 API。- 对于 GPU 张量,此方法的行为与 - then()相同。- 参数
- callback ( - Future) – 一个- Callable,它采用一个参数,即对此- Future的引用。
 - 注意 - 请注意,如果回调函数抛出异常,无论是通过原始 future 完成异常并调用 - fut.wait(),还是通过回调中的其他代码,都必须小心处理错误。例如,如果此回调稍后完成其他 future,则这些 future 不会标记为已完成错误,用户负责独立处理这些 future 的完成/等待。- 示例:
- >>> def callback(fut): ... print("This will run after the future has finished.") ... print(fut.wait()) >>> fut = torch.futures.Future() >>> fut.add_done_callback(callback) >>> fut.set_result(5) This will run after the future has finished. 5 
 
 - done()[源代码]¶
- 如果此 - Future已完成,则返回- True。如果- Future有结果或异常,则表示已完成。- 如果值包含驻留在 GPU 上的张量,则 - Future.done()将返回- True,即使填充这些张量的异步内核尚未完成在设备上运行,因为在该阶段,结果已经可用,前提是执行适当的同步(请参阅- wait())。- 返回类型
 
 - set_exception(result)[源代码]¶
- 为此 - Future设置异常,该异常将标记此- Future已带有错误完成,并触发所有附加回调。请注意,在此- Future上调用 wait()/value() 时,此处设置的异常将内联引发。- 参数
- result (BaseException) – 此 - Future的异常。
 - 示例:
- >>> fut = torch.futures.Future() >>> fut.set_exception(ValueError("foo")) >>> fut.wait() Traceback (most recent call last): ... ValueError: foo 
 
 - set_result(result)[源代码]¶
- 为此 - Future设置结果,该结果将标记此- Future已完成,并触发所有附加回调。请注意,- Future无法标记为已完成两次。- 如果结果包含驻留在 GPU 上的张量,则即使填充这些张量的异步内核尚未完成在设备上运行,也可以调用此方法,前提是在调用此方法时将那些内核排队的流设置为当前流。简而言之,只要不更改流,就可以在启动这些内核后立即调用此方法,而无需任何其他同步,这是安全的。此方法将在所有相关的当前流上记录事件,并将使用这些事件来确保此 - Future的所有使用者正确调度。- 参数
- result (object) – 此 - Future的结果对象。
 - 示例:
- >>> import threading >>> import time >>> def slow_set_future(fut, value): ... time.sleep(0.5) ... fut.set_result(value) >>> fut = torch.futures.Future() >>> t = threading.Thread( ... target=slow_set_future, ... args=(fut, torch.ones(2) * 3) ... ) >>> t.start() >>> print(fut.wait()) tensor([3., 3.]) >>> t.join() 
 
 - then(callback)[源代码]¶
- 将给定的回调函数附加到此 - Future,此回调函数将在- Future完成时运行。可以将多个回调函数添加到同一个- Future,但无法保证执行它们的顺序(要强制执行特定顺序,请考虑链接:- fut.then(cb1).then(cb2))。回调函数必须采用一个参数,即对此- Future的引用。回调函数可以使用- value()方法获取值。请注意,如果此- Future已完成,则给定的回调函数将立即以内联方式运行。- 如果 - Future的值包含驻留在 GPU 上的张量,则可能会在填充这些张量的异步内核尚未在设备上完成执行时调用回调函数。但是,回调函数将被调用,同时将一些专用流设置为当前流(从全局池中获取),这些流将与这些内核同步。因此,回调函数对这些张量执行的任何操作都将在内核完成后在设备上进行调度。换句话说,只要回调函数不切换流,它就可以安全地操作结果,而无需任何其他同步。这类似于- wait()的非阻塞行为。- 类似地,如果回调函数返回的值包含驻留在 GPU 上的张量,它可以这样做,即使产生这些张量的内核仍在设备上运行,只要回调函数在执行期间未更改流。如果要更改流,则必须小心地将其与原始流重新同步,即在调用回调函数时当前的流。 - 参数
- 回调 ( - Callable) – 一个- Callable,将此- Future作为唯一参数。
- 返回
- 一个新的 - Future对象,它保存- callback的返回值,并在给定的- callback完成时标记为已完成。
- 返回类型
- Future[S] 
 - 注意 - 请注意,如果回调函数抛出异常,无论是通过原始 future 使用异常完成并调用 - fut.wait(),还是通过回调中的其他代码,- then返回的 future 将使用遇到的错误进行适当标记。但是,如果此回调稍后完成其他 future,则这些 future 不会标记为已完成错误,用户负责独立处理这些 future 的完成/等待。- 示例:
- >>> def callback(fut): ... print(f"RPC return value is {fut.wait()}.") >>> fut = torch.futures.Future() >>> # The inserted callback will print the return value when >>> # receiving the response from "worker1" >>> cb_fut = fut.then(callback) >>> chain_cb_fut = cb_fut.then( ... lambda x : print(f"Chained cb done. {x.wait()}") ... ) >>> fut.set_result(5) RPC return value is 5. Chained cb done. None 
 
 
- torch.futures.collect_all(futures)[source]¶
- 将提供的 - Future对象收集到一个组合的- Future中,该- Future在所有子- Future完成后完成。- 示例:
- >>> fut0 = torch.futures.Future() >>> fut1 = torch.futures.Future() >>> fut = torch.futures.collect_all([fut0, fut1]) >>> fut0.set_result(0) >>> fut1.set_result(1) >>> fut_list = fut.wait() >>> print(f"fut0 result = {fut_list[0].wait()}") fut0 result = 0 >>> print(f"fut1 result = {fut_list[1].wait()}") fut1 result = 1