快捷方式

torch.futures

此包提供一个 Future 类型,它封装了异步执行和一组实用程序函数,以简化对 Future 对象的操作。目前,Future 类型主要由 分布式 RPC 框架 使用。

class torch.futures.Future(*, devices=None)

围绕 torch._C.Future 的包装器,它封装了可调用对象的异步执行,例如 rpc_async()。它还公开了一组 API 以添加回调函数和设置结果。

警告

GPU 支持是测试版功能,可能会发生变化。

add_done_callback(callback)[源代码]

将给定的回调函数追加到此 Future,该函数将在 Future 完成时运行。可以将多个回调函数添加到同一个 Future,但不能保证它们执行的顺序。回调函数必须接受一个参数,该参数是此 Future 的引用。回调函数可以使用 value() 方法获取值。请注意,如果此 Future 已经完成,则给定的回调函数将立即运行。

我们建议您使用 then() 方法,因为它提供了一种在回调函数完成之后进行同步的方法。如果您的回调函数不返回值,add_done_callback 可能会更便宜。但 then()add_done_callback 都使用幕后的相同回调注册 API。

对于 GPU 张量,此方法的行为与 then() 相同。

参数

callback (Future) – 一个 Callable,它接受一个参数,该参数是此 Future 的引用。

注意

请注意,如果回调函数引发异常,无论是通过原始 future 完成异常并调用 fut.wait(),还是通过回调中的其他代码引发异常,都必须仔细处理错误处理。例如,如果此回调函数稍后完成其他 future,则这些 future 不会被标记为完成异常,用户负责独立处理这些 future 的完成/等待。

示例:
>>> def callback(fut):
...     print("This will run after the future has finished.")
...     print(fut.wait())
>>> fut = torch.futures.Future()
>>> fut.add_done_callback(callback)
>>> fut.set_result(5)
This will run after the future has finished.
5
done()[源代码]

如果此 Future 已完成,则返回 True。如果 Future 具有结果或异常,则表示已完成。

如果值包含驻留在 GPU 上的张量,即使填充这些张量的异步内核尚未完成在设备上运行,Future.done() 也会返回 True,因为在此时结果已可以使用,前提是您执行了适当的同步(请参阅 wait())。

返回类型

bool

set_exception(result)[source]

为该 Future 设置异常,这将标记该 Future 为已完成但出现错误,并触发所有附加的回调。请注意,当对该 Future 调用 wait()/value() 时,此处设置的异常将被内联引发。

参数

result (BaseException) – 该 Future 的异常。

示例:
>>> fut = torch.futures.Future()
>>> fut.set_exception(ValueError("foo"))
>>> fut.wait()
Traceback (most recent call last):
...
ValueError: foo
set_result(result)[source]

为该 Future 设置结果,这将标记该 Future 为已完成并触发所有附加的回调。请注意,一个 Future 无法被标记为两次完成。

如果结果包含驻留在 GPU 上的张量,则即使填充这些张量的异步内核尚未完成在设备上的运行,也可以调用此方法,前提是在调用此方法时,将那些内核排队的流设置为当前流。简而言之,在启动这些内核之后,无需任何额外的同步,就可以安全地调用此方法,只要在此期间不更改流即可。此方法将在所有相关的当前流上记录事件,并使用它们来确保此 Future 所有消费者的正确调度。

参数

result (object) – 该 Future 的结果对象。

示例:
>>> import threading
>>> import time
>>> def slow_set_future(fut, value):
...     time.sleep(0.5)
...     fut.set_result(value)
>>> fut = torch.futures.Future()
>>> t = threading.Thread(
...     target=slow_set_future,
...     args=(fut, torch.ones(2) * 3)
... )
>>> t.start()
>>> print(fut.wait())
tensor([3., 3.])
>>> t.join()
then(callback)[source]

将给定的回调函数追加到该 Future,当该 Future 完成时,该回调函数将被运行。可以将多个回调添加到同一个 Future,但不能保证它们执行的顺序(为了强制执行特定的顺序,请考虑链式调用:fut.then(cb1).then(cb2))。回调必须接受一个参数,即对该 Future 的引用。回调函数可以使用 value() 方法获取值。请注意,如果该 Future 已经完成,则给定的回调将立即内联运行。

如果该 Future 的值包含驻留在 GPU 上的张量,则在填充这些张量的异步内核尚未完成在设备上的执行时,可能会调用回调。但是,回调将被调用,其中一些专用流被设置为当前流(从全局池中获取),这些流将与这些内核同步。因此,回调对这些张量执行的任何操作都将在内核完成后在设备上调度。换句话说,只要回调不切换流,就可以安全地操作结果,无需任何额外的同步。这类似于 wait() 的非阻塞行为。

类似地,如果回调返回一个包含驻留在 GPU 上的张量的值,即使生成这些张量的内核仍在设备上运行,它也可以这样做,只要回调在执行过程中没有更改流即可。如果想要更改流,必须小心地将其与原始流重新同步,即回调被调用时为当前的流。

参数

callback (Callable) – 一个 Callable,它将该 Future 作为唯一参数。

返回值

一个新的 Future 对象,它保存 callback 的返回值,并在给定的 callback 完成时标记为已完成。

返回类型

Future[S]

注意

请注意,如果回调函数抛出异常,无论是通过原始的 future 完成异常并调用 fut.wait(),还是通过回调中的其他代码抛出,则由 then 返回的 future 将被适当地标记为遇到错误。但是,如果该回调随后完成其他 future,则那些 future 不会被标记为完成错误,用户有责任独立处理那些 future 的完成/等待。

示例:
>>> def callback(fut):
...     print(f"RPC return value is {fut.wait()}.")
>>> fut = torch.futures.Future()
>>> # The inserted callback will print the return value when
>>> # receiving the response from "worker1"
>>> cb_fut = fut.then(callback)
>>> chain_cb_fut = cb_fut.then(
...     lambda x : print(f"Chained cb done. {x.wait()}")
... )
>>> fut.set_result(5)
RPC return value is 5.
Chained cb done. None
value()[source]

获取已完成 future 的值。

此方法应仅在调用 wait() 完成后或在传递给 then() 的回调函数中调用。在其他情况下,该 Future 可能尚未保存值,调用 value() 可能会失败。

如果值包含驻留在 GPU 上的张量,则此方法 *不会* 执行任何额外的同步。这应该事先单独完成,通过调用 wait()(除了在回调内,对于这种情况,它已经由 then() 处理)。

返回值

Future 保存的值。如果创建值的函数(回调或 RPC)抛出错误,则该 value() 方法也会抛出错误。

返回类型

T

wait()[source]

阻塞,直到该 Future 的值准备就绪。

如果值包含驻留在 GPU 上的张量,则将对可能异步填充这些张量的内核(在设备上执行)执行额外的同步。这种同步是非阻塞的,这意味着 wait() 将在当前流中插入必要的指令,以确保在异步内核之后正确调度这些流上排队的进一步操作,但是,一旦完成,wait() 将返回,即使那些内核仍在运行。只要不更改流,访问和使用值时就不需要进一步同步。

返回值

Future 保存的值。如果创建值的函数(回调或 RPC)抛出错误,则该 wait 方法也会抛出错误。

返回类型

T

torch.futures.collect_all(futures)[source]

将提供的 Future 对象收集到一个组合的 Future 中,该 Future 在所有子 future 完成时完成。

参数

futures (list) – Future 对象列表。

返回值

返回一个 Future 对象,指向传递的 Futures 列表。

返回类型

Future[List[Future]]

示例:
>>> fut0 = torch.futures.Future()
>>> fut1 = torch.futures.Future()
>>> fut = torch.futures.collect_all([fut0, fut1])
>>> fut0.set_result(0)
>>> fut1.set_result(1)
>>> fut_list = fut.wait()
>>> print(f"fut0 result = {fut_list[0].wait()}")
fut0 result = 0
>>> print(f"fut1 result = {fut_list[1].wait()}")
fut1 result = 1
torch.futures.wait_all(futures)[source]

等待所有提供的 future 完成,并返回已完成的值列表。如果任何 future 遇到错误,该方法将提前退出并报告错误,而不等待其他 future 完成。

参数

futures (list) – Future 对象列表。

返回值

已完成的 Future 结果列表。如果在任何 Future 上调用 wait 抛出错误,则该方法也会抛出错误。

返回类型

List

文档

訪問 PyTorch 的綜合開發者文檔

查看文檔

教程

獲得面向初學者和高級開發者的深入教程

查看教程

資源

查找開發資源並獲得問題解答

查看資源