• 文档 >
  • 分布式 RPC 框架
快捷方式

分布式 RPC 框架

分布式 RPC 框架提供了一套用于多机模型训练的机制,通过一组用于远程通信的原语以及一个更高级别的 API 来自动微分跨多个机器拆分的模型。

警告

RPC 包中的 API 稳定。目前有许多正在进行的工作项来改进性能和错误处理,这些改进将在将来的版本中发布。

警告

CUDA 支持是在 PyTorch 1.9 中引入的,目前仍处于测试版阶段。RPC 包并非所有功能都与 CUDA 支持兼容,因此不建议使用这些功能。这些不受支持的功能包括:RRef、JIT 兼容性、dist autograd 和 dist 优化器以及性能分析。这些缺点将在未来的版本中得到解决。

注意

请参考 PyTorch 分布式概述,以简要介绍与分布式训练相关的所有功能。

基础知识

分布式 RPC 框架简化了远程运行函数的过程,支持在不复制实际数据的情况下引用远程对象,并提供自动微分和优化器 API,以透明地在 RPC 边界上执行反向传播和更新参数。这些功能可以分为四组 API。

  1. 远程过程调用 (RPC) 支持使用给定的参数在指定的目的地工作程序上运行函数,并获取返回值或创建对返回值的引用。有三个主要的 RPC API:rpc_sync()(同步)、rpc_async()(异步)和 remote()(异步,并返回对远程返回值的引用)。如果用户代码在没有返回值的情况下无法继续,则使用同步 API。否则,使用异步 API 获取 future,并在需要返回值时在调用方等待 future。当需求是在远程创建某些东西,而无需将其获取到调用方时,remote() API 非常有用。想象一下,驱动程序进程正在设置参数服务器和训练器。驱动程序可以在参数服务器上创建嵌入表,然后将对嵌入表的引用共享给训练器,但自身不会在本地使用嵌入表。在这种情况下,rpc_sync()rpc_async() 就不再合适了,因为它们总是意味着返回值将立即或在未来返回给调用方。

  2. 远程引用 (RRef) 用作指向本地或远程对象的分布式共享指针。它可以与其他工作程序共享,并且引用计数将被透明地处理。每个 RRef 只有一个所有者,并且该对象仅存在于该所有者上。持有 RRef 的非所有者工作程序可以通过显式请求从所有者获取对象的副本。当工作程序需要访问某些数据对象,但自身既不是创建者(remote() 的调用方)也不是对象的拥有者时,这很有用。分布式优化器,正如我们将在下面讨论的那样,就是这种用例的一个例子。

  3. 分布式 Autograd 将所有参与正向传播的工作程序上的本地 autograd 引擎拼接在一起,并在反向传播期间自动联系它们以计算梯度。这在正向传播需要跨越多台机器进行时特别有用,例如在进行分布式模型并行训练、参数服务器训练等时。有了这个功能,用户代码不再需要担心如何跨 RPC 边界发送梯度以及应该以何种顺序启动本地 autograd 引擎,因为当正向传播中存在嵌套和相互依赖的 RPC 调用时,这可能会变得非常复杂。

  4. 分布式优化器 的构造函数接受一个 Optimizer()(例如,SGD()Adagrad() 等)和一个参数 RRef 列表,在每个不同的 RRef 所有者上创建一个 Optimizer() 实例,并在运行 step() 时相应地更新参数。当您有分布式的前向和反向传递时,参数和梯度将散布在多个工作器上,因此需要在每个参与的工作器上有一个优化器。分布式优化器将所有这些本地优化器包装成一个,并提供简洁的构造函数和 step() API。

RPC

在使用 RPC 和分布式自动微分原语之前,必须进行初始化。要初始化 RPC 框架,我们需要使用 init_rpc(),它将初始化 RPC 框架、RRef 框架和分布式自动微分。

torch.distributed.rpc.init_rpc(name, backend=None, rank=-1, world_size=None, rpc_backend_options=None)[source]

初始化 RPC 原语,例如本地 RPC 代理和分布式自动微分,这会立即使当前进程准备好发送和接收 RPC。

参数
  • name (str) – 此节点的全局唯一名称。(例如,Trainer3ParameterServer2MasterWorker1)名称只能包含数字、字母、下划线、冒号和/或连字符,并且必须小于 128 个字符。

  • backend (BackendType, optional) – RPC 后端实现的类型。支持的值是 BackendType.TENSORPIPE(默认)。有关更多信息,请参见 Backends

  • rank (int) – 此节点的全局唯一 ID/排名。

  • world_size (int) – 组中的工作器数量。

  • rpc_backend_options (RpcBackendOptions, optional) – 传递给 RpcAgent 构造函数的选项。它必须是 RpcBackendOptions 的代理特定子类,并且包含代理特定的初始化配置。默认情况下,对于所有代理,它将默认超时设置为 60 秒,并使用使用 init_method = "env://" 初始化的基础进程组执行 rendezvous,这意味着需要正确设置环境变量 MASTER_ADDRMASTER_PORT。有关更多信息,请参见 Backends 查找哪些选项可用。

以下 API 允许用户远程执行函数以及创建对远程数据对象的引用(RRef)。在这些 API 中,当将 Tensor 作为参数或返回值传递时,目标工作器将尝试创建具有相同元数据(即形状、步幅等)的 Tensor。我们有意禁止传输 CUDA 张量,因为如果源和目标工作器上的设备列表不匹配,它可能会崩溃。在这种情况下,应用程序始终可以显式地将输入张量移动到调用方的 CPU 上,并在需要时将它移动到被调用方的所需设备上。

警告

RPC 中的 TorchScript 支持是一个原型功能,可能会发生变化。从 v1.5.0 开始,torch.distributed.rpc 支持将 TorchScript 函数作为 RPC 目标函数调用,这将有助于提高被调用方侧面的并行性,因为执行 TorchScript 函数不需要 GIL。

torch.distributed.rpc.rpc_sync(to, func, args=None, kwargs=None, timeout=-1.0)[source]

对工作器 to 上运行函数 func 进行阻塞 RPC 调用。RPC 消息与 Python 代码的执行并行发送和接收。此方法是线程安全的。

参数
  • to (str or WorkerInfo or int) – 目标工作器的名称/排名/WorkerInfo

  • func (Callable) – 可调用函数,例如 Python 可调用函数、内置运算符(例如 add())和带注释的 TorchScript 函数。

  • args (tuple) – func 调用的参数元组。

  • kwargs (dict) – 是 func 调用的关键字参数字典。

  • timeout (float, optional) – 此 RPC 的超时时间(以秒为单位)。如果 RPC 在此时间内未完成,则将引发一个指示它已超时的异常。值为 0 表示无限超时,即永远不会引发超时错误。如果未提供,则使用在初始化期间或使用 _set_rpc_timeout 设置的默认值。

返回值

返回使用 argskwargs 运行 func 的结果。

示例:

确保在两个工作器上都正确设置了 MASTER_ADDRMASTER_PORT。有关更多详细信息,请参阅 init_process_group() API。例如,

export MASTER_ADDR=localhost export MASTER_PORT=5678

然后在两个不同的进程中运行以下代码

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3))
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

下面是使用 RPC 运行 TorchScript 函数的示例。

>>> # On both workers:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>>    return torch.add(tensor, scalar)
>>> # On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3))
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
torch.distributed.rpc.rpc_async(to, func, args=None, kwargs=None, timeout=-1.0)[source]

对工作器 to 上运行函数 func 进行非阻塞 RPC 调用。RPC 消息与 Python 代码的执行并行发送和接收。此方法是线程安全的。此方法将立即返回一个 Future,可以对其进行等待。

参数
  • to (str or WorkerInfo or int) – 目标工作器的名称/排名/WorkerInfo

  • func (Callable) – 可调用函数,例如 Python 可调用函数、内置运算符(例如 add())和带注释的 TorchScript 函数。

  • args (tuple) – func 调用的参数元组。

  • kwargs (dict) – 是 func 调用的关键字参数字典。

  • timeout (float, optional) – 此 RPC 的超时时间(以秒为单位)。如果 RPC 在此时间内未完成,则将引发一个指示它已超时的异常。值为 0 表示无限超时,即永远不会引发超时错误。如果未提供,则使用在初始化期间或使用 _set_rpc_timeout 设置的默认值。

返回值

返回一个 Future 对象,可以对其进行等待。完成时,可以从 Future 对象中检索在 argskwargs 上运行 func 的返回值。

警告

由于不支持通过网络传输 GPU 张量,因此不支持使用 GPU 张量作为 func 的参数或返回值。您需要显式地将 GPU 张量复制到 CPU,然后才能将其用作 func 的参数或返回值。

警告

rpc_async API 不会在通过网络发送参数张量之前复制其存储空间,这可能会由不同的线程完成,具体取决于 RPC 后端类型。调用者应确保这些张量的内容在返回的 Future 完成之前保持完整。

示例:

确保在两个工作器上都正确设置了 MASTER_ADDRMASTER_PORT。有关更多详细信息,请参阅 init_process_group() API。例如,

export MASTER_ADDR=localhost export MASTER_PORT=5678

然后在两个不同的进程中运行以下代码

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3))
>>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2))
>>> result = fut1.wait() + fut2.wait()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

下面是使用 RPC 运行 TorchScript 函数的示例。

>>> # On both workers:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>>    return torch.add(tensor, scalar)
>>> # On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3))
>>> ret = fut.wait()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
torch.distributed.rpc.remote(to, func, args=None, kwargs=None, timeout=-1.0)[source]

在工作节点 to 上远程调用运行 func,并立即返回指向结果值的 RRef。工作节点 to 将是返回的 RRef 的所有者,而调用 remote 的工作节点是用户。所有者管理其 RRef 的全局引用计数,并且只有在全局范围内没有对它的有效引用时,所有者 RRef 才会被销毁。

参数
  • to (str or WorkerInfo or int) – 目标工作器的名称/排名/WorkerInfo

  • func (Callable) – 可调用函数,例如 Python 可调用函数、内置运算符(例如 add())和带注释的 TorchScript 函数。

  • args (tuple) – func 调用的参数元组。

  • kwargs (dict) – 是 func 调用的关键字参数字典。

  • timeout (float, optional) – 此远程调用的超时时间(以秒为单位)。如果在工作节点 to 上创建此 RRef 未能在此超时时间内在此工作节点上成功处理,那么下次尝试使用 RRef(例如 to_here())时,将引发超时错误,指示此错误。值为 0 表示无限超时,即永远不会引发超时错误。如果没有提供,则使用初始化时或使用 _set_rpc_timeout 设置的默认值。

返回值

指向结果值的 用户 RRef 实例。使用阻塞式 API torch.distributed.rpc.RRef.to_here() 在本地检索结果值。

警告

remote API 不会在通过网络发送参数张量之前复制其存储空间,这可能会由不同的线程完成,具体取决于 RPC 后端类型。调用者应确保这些张量的内容在所有者确认返回的 RRef 之前保持完整,这可以使用 torch.distributed.rpc.RRef.confirmed_by_owner() API 进行检查。

警告

remote API 的超时等错误以尽力而为的方式处理。这意味着当由 remote 启动的远程调用失败时,例如出现超时错误,我们会采取尽力而为的错误处理方法。这意味着错误会异步处理并设置在结果 RRef 上。如果用户应用程序在处理之前尚未使用 RRef(例如 to_here 或 fork 调用),那么将来使用 RRef 将会适当地引发错误。但是,用户应用程序可能在处理错误之前使用 RRef。在这种情况下,错误可能不会被引发,因为它们尚未被处理。

示例

Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly
on both workers. Refer to :meth:`~torch.distributed.init_process_group`
API for more details. For example,

export MASTER_ADDR=localhost
export MASTER_PORT=5678

Then run the following code in two different processes:

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
>>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1))
>>> x = rref1.to_here() + rref2.to_here()
>>> rpc.shutdown()

>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

Below is an example of running a TorchScript function using RPC.

>>> # On both workers:
>>> @torch.jit.script
>>> def my_script_add(tensor: torch.Tensor, scalar: int):
>>>    return torch.add(tensor, scalar)

>>> # On worker 0:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3))
>>> rref.to_here()
>>> rpc.shutdown()

>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()
torch.distributed.rpc.get_worker_info(worker_name=None)[source]

获取给定工作节点名称的 WorkerInfo。使用此 WorkerInfo 以避免在每次调用时传递一个代价高昂的字符串。

参数

worker_name (str) – 工作节点的字符串名称。如果为 None,则返回当前工作节点的 ID。(默认值为 None

返回值

给定 worker_nameWorkerInfo 实例,或如果 worker_nameNone,则为当前工作节点的 WorkerInfo

torch.distributed.rpc.shutdown(graceful=True, timeout=0)[source]

执行 RPC 代理的关闭,然后销毁 RPC 代理。这会停止本地代理接受未完成的请求,并通过终止所有 RPC 线程来关闭 RPC 框架。如果 graceful=True,这将阻塞,直到所有本地和远程 RPC 进程都到达此方法并等待所有未完成的工作完成。否则,如果 graceful=False,这是一个本地关闭,并且不会等待其他 RPC 进程到达此方法。

警告

对于由 Future 对象返回的 rpc_async()future.wait() 不应在 shutdown() 之后调用。

参数

graceful (bool) – 是否执行优雅关闭。如果为 True,这将 1) 等待直到没有 UserRRefs 的待处理系统消息并删除它们;2) 阻塞,直到所有本地和远程 RPC 进程都到达此方法并等待所有未完成的工作完成。

示例:

确保在两个工作器上都正确设置了 MASTER_ADDRMASTER_PORT。有关更多详细信息,请参阅 init_process_group() API。例如,

export MASTER_ADDR=localhost export MASTER_PORT=5678

然后在两个不同的进程中运行以下代码

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> # do some work
>>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1))
>>> # ready to shutdown
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch.distributed.rpc as rpc
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> # wait for worker 0 to finish work, and then shutdown.
>>> rpc.shutdown()
class torch.distributed.rpc.WorkerInfo

封装系统中工作节点信息的结构。包含工作节点的名称和 ID。不应直接构造此类,而是可以通过 get_worker_info() 检索实例,并将结果传递给 rpc_sync()rpc_async()remote() 等函数,以避免在每次调用时复制字符串。

property id

全局唯一 ID,用于标识工作节点。

property name

工作节点的名称。

RPC 包还提供装饰器,允许应用程序指定在被调用方侧如何处理给定函数。

torch.distributed.rpc.functions.async_execution(fn)[source]

用于函数的装饰器,表示该函数的返回值保证为 Future 对象,并且该函数可以在 RPC 被调用方上异步运行。更具体地说,被调用方会提取由包装函数返回的 Future,并将后续的处理步骤安装为该 Future 的回调。安装的回调会在完成时从 Future 读取值,并将该值作为 RPC 响应发送回。这也意味着返回的 Future 仅存在于被调用方侧,并且永远不会通过 RPC 发送。当包装函数 (fn) 的执行需要暂停和恢复时,此装饰器很有用,例如,包含 rpc_async() 或等待其他信号。

注意

为了启用异步执行,应用程序必须将此装饰器返回的函数对象传递给 RPC API。如果 RPC 检测到此装饰器安装的属性,它就知道此函数返回一个 Future 对象,并会相应地进行处理。但是,这并不意味着此装饰器必须是定义函数时的最外层装饰器。例如,当与 @staticmethod@classmethod 结合使用时,@rpc.functions.async_execution 需要是内部装饰器,以允许目标函数被识别为静态或类函数。此目标函数仍然可以异步执行,因为当访问时,静态或类方法会保留 @rpc.functions.async_execution 安装的属性。

示例:

返回的 Future 对象可以来自 rpc_async()then()Future 构造函数。下面的示例演示了直接使用 Future,该对象由 then() 返回。

>>> from torch.distributed import rpc
>>>
>>> # omitting setup and shutdown RPC
>>>
>>> # On all workers
>>> @rpc.functions.async_execution
>>> def async_add_chained(to, x, y, z):
>>>     # This function runs on "worker1" and returns immediately when
>>>     # the callback is installed through the `then(cb)` API. In the
>>>     # mean time, the `rpc_async` to "worker2" can run concurrently.
>>>     # When the return value of that `rpc_async` arrives at
>>>     # "worker1", "worker1" will run the lambda function accordingly
>>>     # and set the value for the previously returned `Future`, which
>>>     # will then trigger RPC to send the result back to "worker0".
>>>     return rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>         lambda fut: fut.wait() + z
>>>     )
>>>
>>> # On worker0
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     async_add_chained,
>>>     args=("worker2", torch.ones(2), 1, 1)
>>> )
>>> print(ret)  # prints tensor([3., 3.])

当与 TorchScript 装饰器结合使用时,此装饰器必须是最外层的装饰器。

>>> from torch import Tensor
>>> from torch.futures import Future
>>> from torch.distributed import rpc
>>>
>>> # omitting setup and shutdown RPC
>>>
>>> # On all workers
>>> @torch.jit.script
>>> def script_add(x: Tensor, y: Tensor) -> Tensor:
>>>     return x + y
>>>
>>> @rpc.functions.async_execution
>>> @torch.jit.script
>>> def async_add(to: str, x: Tensor, y: Tensor) -> Future[Tensor]:
>>>     return rpc.rpc_async(to, script_add, (x, y))
>>>
>>> # On worker0
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     async_add,
>>>     args=("worker2", torch.ones(2), 1)
>>> )
>>> print(ret)  # prints tensor([2., 2.])

当与静态或类方法结合使用时,此装饰器必须是内部装饰器。

>>> from torch.distributed import rpc
>>>
>>> # omitting setup and shutdown RPC
>>>
>>> # On all workers
>>> class AsyncExecutionClass:
>>>
>>>     @staticmethod
>>>     @rpc.functions.async_execution
>>>     def static_async_add(to, x, y, z):
>>>         return rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>             lambda fut: fut.wait() + z
>>>         )
>>>
>>>     @classmethod
>>>     @rpc.functions.async_execution
>>>     def class_async_add(cls, to, x, y, z):
>>>         ret_fut = torch.futures.Future()
>>>         rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>             lambda fut: ret_fut.set_result(fut.wait() + z)
>>>         )
>>>         return ret_fut
>>>
>>>     @rpc.functions.async_execution
>>>     def bound_async_add(self, to, x, y, z):
>>>         return rpc.rpc_async(to, torch.add, args=(x, y)).then(
>>>             lambda fut: fut.wait() + z
>>>         )
>>>
>>> # On worker0
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     AsyncExecutionClass.static_async_add,
>>>     args=("worker2", torch.ones(2), 1, 2)
>>> )
>>> print(ret)  # prints tensor([4., 4.])
>>>
>>> ret = rpc.rpc_sync(
>>>     "worker1",
>>>     AsyncExecutionClass.class_async_add,
>>>     args=("worker2", torch.ones(2), 1, 2)
>>> )
>>> print(ret)  # prints tensor([4., 4.])

此装饰器也适用于 RRef 助手,即 . torch.distributed.rpc.RRef.rpc_sync()torch.distributed.rpc.RRef.rpc_async()torch.distributed.rpc.RRef.remote()

>>> from torch.distributed import rpc
>>>
>>> # reuse the AsyncExecutionClass class above
>>> rref = rpc.remote("worker1", AsyncExecutionClass)
>>> ret = rref.rpc_sync().static_async_add("worker2", torch.ones(2), 1, 2)
>>> print(ret)  # prints tensor([4., 4.])
>>>
>>> rref = rpc.remote("worker1", AsyncExecutionClass)
>>> ret = rref.rpc_async().static_async_add("worker2", torch.ones(2), 1, 2).wait()
>>> print(ret)  # prints tensor([4., 4.])
>>>
>>> rref = rpc.remote("worker1", AsyncExecutionClass)
>>> ret = rref.remote().static_async_add("worker2", torch.ones(2), 1, 2).to_here()
>>> print(ret)  # prints tensor([4., 4.])

后端

RPC 模块可以利用不同的后端来执行节点之间的通信。要使用的后端可以在 init_rpc() 函数中指定,方法是传递 BackendType 枚举的某个值。无论使用哪种后端,RPC API 的其余部分都不会改变。每个后端还定义了其自己的 RpcBackendOptions 类子类,可以将该类子类的一个实例传递给 init_rpc() 来配置后端的行为。

class torch.distributed.rpc.BackendType(value)

可用后端的枚举类。

PyTorch 附带内置的 BackendType.TENSORPIPE 后端。可以使用 register_backend() 函数注册其他后端。

class torch.distributed.rpc.RpcBackendOptions

一个抽象结构,封装传递到 RPC 后端的选项。可以将此类的实例传递到 init_rpc() 中,以便使用特定配置初始化 RPC,例如要使用的 RPC 超时和 init_method

property init_method

指定如何初始化进程组的 URL。默认值为 env://

property rpc_timeout

一个浮点数,指示所有 RPC 使用的超时时间。如果 RPC 在此时间范围内未完成,它将使用指示超时异常完成。

TensorPipe 后端

TensorPipe 代理(默认)利用 TensorPipe 库,该库提供了一种专门适用于机器学习的原生点对点通信原语,从根本上解决了 Gloo 的一些限制。与 Gloo 相比,它的优势在于它是异步的,这允许大量传输同时发生,每个传输以自己的速度进行,互不阻塞。它只会在需要时按需在节点对之间打开管道,并且当一个节点失败时,只有与它相连的管道才会关闭,而所有其他管道将继续正常工作。此外,它能够支持多种不同的传输(当然还有 TCP,还有共享内存、NVLink、InfiniBand 等),并且可以自动检测它们的可用性并协商用于每个管道的最佳传输。

TensorPipe 后端是在 PyTorch v1.6 中引入的,并且正在积极开发中。目前,它只支持 CPU 张量,GPU 支持即将推出。它附带了基于 TCP 的传输,就像 Gloo 一样。它还能够自动将大型张量切片和复用到多个套接字和线程上,以实现非常高的带宽。代理将能够自行选择最佳传输,无需任何干预。

示例

>>> import os
>>> from torch.distributed import rpc
>>> os.environ['MASTER_ADDR'] = 'localhost'
>>> os.environ['MASTER_PORT'] = '29500'
>>>
>>> rpc.init_rpc(
>>>     "worker1",
>>>     rank=0,
>>>     world_size=2,
>>>     rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
>>>         num_worker_threads=8,
>>>         rpc_timeout=20 # 20 second timeout
>>>     )
>>> )
>>>
>>> # omitting init_rpc invocation on worker2
class torch.distributed.rpc.TensorPipeRpcBackendOptions(*, num_worker_threads=16, rpc_timeout=60.0, init_method='env://', device_maps=None, devices=None, _transports=None, _channels=None)[source]

用于 TensorPipeAgent 的后端选项,派生自 RpcBackendOptions

参数
  • num_worker_threads (int, optional) – TensorPipeAgent 用于执行请求的线程池中的线程数(默认值:16)。

  • rpc_timeout (float, optional) – RPC 请求的默认超时时间,以秒为单位(默认值:60 秒)。如果 RPC 在此时间范围内未完成,则将引发指示该情况的异常。调用者可以在必要时在 rpc_sync()rpc_async() 中为单个 RPC 覆盖此超时时间。

  • init_method (str, optional) – 用于初始化用于会合的分布式存储的 URL。它接受 init_process_group() 的相同参数的任何值(默认值:env://)。

  • device_maps (Dict[str, Dict], optional) – 从此工作器到被调用者的设备放置映射。键是被调用者的工作器名称,值是字典 (Dict of int, str, or torch.device),它将此工作器的设备映射到被调用者的工作器的设备。(默认值:None)

  • devices (List[int, str, or torch.device], optional) – RPC 代理使用的所有本地 CUDA 设备。默认情况下,它将被初始化为来自其自身 device_maps 的所有本地设备及其对等体 device_maps 中的相应设备。在处理 CUDA RPC 请求时,代理将适当地同步此 List 中所有设备的 CUDA 流。

property device_maps

设备映射位置。

property devices

本地代理使用的所有设备。

property init_method

指定如何初始化进程组的 URL。默认值为 env://

property num_worker_threads

TensorPipeAgent 用于执行请求的线程池中的线程数。

property rpc_timeout

一个浮点数,指示所有 RPC 使用的超时时间。如果 RPC 在此时间范围内未完成,它将使用指示超时异常完成。

set_device_map(to, device_map)[source]

设置每个 RPC 调用者和被调用者对之间的设备映射。此函数可以多次调用以增量添加设备放置配置。

参数
  • to (str) – 被调用者名称。

  • device_map (Dict of int, str, or torch.device) – 从此工作器到被调用者的设备放置映射。此映射必须是可逆的。

示例

>>> # both workers
>>> def add(x, y):
>>>     print(x)  # tensor([1., 1.], device='cuda:1')
>>>     return x + y, (x + y).to(2)
>>>
>>> # on worker 0
>>> options = TensorPipeRpcBackendOptions(
>>>     num_worker_threads=8,
>>>     device_maps={"worker1": {0: 1}}
>>>     # maps worker0's cuda:0 to worker1's cuda:1
>>> )
>>> options.set_device_map("worker1", {1: 2})
>>> # maps worker0's cuda:1 to worker1's cuda:2
>>>
>>> rpc.init_rpc(
>>>     "worker0",
>>>     rank=0,
>>>     world_size=2,
>>>     backend=rpc.BackendType.TENSORPIPE,
>>>     rpc_backend_options=options
>>> )
>>>
>>> x = torch.ones(2)
>>> rets = rpc.rpc_sync("worker1", add, args=(x.to(0), 1))
>>> # The first argument will be moved to cuda:1 on worker1. When
>>> # sending the return value back, it will follow the invert of
>>> # the device map, and hence will be moved back to cuda:0 and
>>> # cuda:1 on worker0
>>> print(rets[0])  # tensor([2., 2.], device='cuda:0')
>>> print(rets[1])  # tensor([2., 2.], device='cuda:1')
set_devices(devices)[source]

设置 TensorPipe RPC 代理使用的本地设备。在处理 CUDA RPC 请求时,TensorPipe RPC 代理将为 List 中的所有设备正确同步 CUDA 流。

参数

devices (List of int, str, or torch.device) – TensorPipe RPC 代理使用的本地设备。

注意

RPC 框架不会自动重试任何 rpc_sync()rpc_async()remote() 调用。原因是 RPC 框架无法确定操作是否幂等以及是否可以安全重试。因此,应用程序有责任处理故障,并在必要时重试。RPC 通信基于 TCP,因此可能会因网络故障或间歇性网络连接问题而发生故障。在这种情况下,应用程序需要以合理的回退策略适当地重试,以确保网络不会因过于激进的重试而不堪重负。

RRef

警告

在使用 CUDA 张量时,目前不支持 RRef

一个 RRef(远程引用)是对远程工作器上某种类型 T(例如 Tensor)的值的引用。此句柄使被引用的远程值在所有者上保持活动状态,但这并不意味着该值将在将来传输到本地工作器。RRef 可用于多机器训练,方法是保存对其他工作器上 nn.Modules 的引用,并在训练期间调用相应的函数来检索或修改其参数。有关更多详细信息,请参阅 远程引用协议

class torch.distributed.rpc.PyRRef(RRef)

一个封装了对远程工作器上某种类型的值的引用的类。此句柄将使被引用的远程值在工作器上保持活动状态。当 1) 应用程序代码和本地 RRef 上下文中都没有对它的引用,或者 2) 应用程序已调用了正常关闭时,UserRRef 将被删除。在已删除的 RRef 上调用方法会导致未定义的行为。RRef 实现仅提供尽力而为的错误检测,应用程序不应在 rpc.shutdown() 之后使用 UserRefs

警告

RRef 只能通过 RPC 模块进行序列化和反序列化。在没有 RPC 的情况下序列化和反序列化 RRef(例如,Python pickle、torch save() / load()、JIT save() / load() 等)会导致错误。

参数
  • value (object) – 要由此 RRef 包装的值。

  • type_hint (Type, optional) – 应传递给 TorchScript 编译器的 Python 类型,作为 value 的类型提示。

示例:

以下示例为简便起见省略了 RPC 初始化和关闭代码。有关这些详细信息,请参考 RPC 文档。

  1. 使用 rpc.remote 创建 RRef

>>> import torch
>>> import torch.distributed.rpc as rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3))
>>> # get a copy of value from the RRef
>>> x = rref.to_here()
  1. 从本地对象创建 RRef

>>> import torch
>>> from torch.distributed.rpc import RRef
>>> x = torch.zeros(2, 2)
>>> rref = RRef(x)
  1. 与其他工作器共享 RRef

>>> # On both worker0 and worker1:
>>> def f(rref):
>>>   return rref.to_here() + 1
>>> # On worker0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> from torch.distributed.rpc import RRef
>>> rref = RRef(torch.zeros(2, 2))
>>> # the following RPC shares the rref with worker1, reference
>>> # count is automatically updated.
>>> rpc.rpc_sync("worker1", f, args=(rref,))
backward(self: torch._C._distributed_rpc.PyRRef, dist_autograd_ctx_id: int = -1, retain_graph: bool = False) None

使用 RRef 作为反向传播的根节点运行反向传播。如果提供了 dist_autograd_ctx_id,我们将使用提供的 ctx_id 从 RRef 的所有者开始执行分布式反向传播。在这种情况下,应使用 get_gradients() 检索梯度。如果 dist_autograd_ctx_idNone,则假定这是一个本地自动微分图,并且我们仅执行本地反向传播。在本地情况下,调用此 API 的节点必须是 RRef 的所有者。RRef 的值应为标量张量。

参数
  • dist_autograd_ctx_id (int, optional) – 应检索梯度的分布式自动微分上下文 ID(默认值:-1)。

  • retain_graph (bool, optional) – 如果为 False,则用于计算 grad 的图将被释放。请注意,在几乎所有情况下,将此选项设置为 True 都是不需要的,并且通常可以用更有效的方式解决。通常,您需要将其设置为 True 以多次运行反向传播(默认值:False)。

示例:
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     rref.backward(context_id)
confirmed_by_owner(self: torch._C._distributed_rpc.PyRRef) bool

返回此 RRef 是否已由所有者确认。 OwnerRRef 始终返回 true,而 UserRRef 仅在所有者知道此 UserRRef 时返回 true。

is_owner(self: torch._C._distributed_rpc.PyRRef) bool

返回当前节点是否为此 RRef 的所有者。

local_value(self: torch._C._distributed_rpc.PyRRef) object

如果当前节点是所有者,则返回对本地值的引用。否则,将引发异常。

owner(self: torch._C._distributed_rpc.PyRRef) torch._C._distributed_rpc.WorkerInfo

返回拥有此 RRef 的节点的 worker 信息。

owner_name(self: torch._C._distributed_rpc.PyRRef) str

返回拥有此 RRef 的节点的 worker 名称。

remote(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object

创建一个辅助代理,以便使用 RRef 所有者作为目标轻松启动 remote,以在由此 RRef 引用的对象上运行函数。 更准确地说,rref.remote().func_name(*args, **kwargs) 等同于以下内容

>>> def run(rref, func_name, args, kwargs):
>>>   return getattr(rref.local_value(), func_name)(*args, **kwargs)
>>>
>>> rpc.remote(rref.owner(), run, args=(rref, func_name, args, kwargs))
参数

timeout (float, optional) – rref.remote() 的超时时间。 如果在此超时时间内未成功完成此 RRef 的创建,则下次尝试使用 RRef(例如 to_here)时,将引发超时。 如果未提供,将使用默认 RPC 超时时间。 请参阅 rpc.remote() 以了解 RRef 的特定超时语义。

示例:
>>> from torch.distributed import rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1))
>>> rref.remote().size().to_here()  # returns torch.Size([2, 2])
>>> rref.remote().view(1, 4).to_here()  # returns tensor([[1., 1., 1., 1.]])
rpc_async(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object

创建一个辅助代理,以便使用 RRef 所有者作为目标轻松启动 rpc_async,以在由此 RRef 引用的对象上运行函数。 更准确地说,rref.rpc_async().func_name(*args, **kwargs) 等同于以下内容

>>> def run(rref, func_name, args, kwargs):
>>>   return getattr(rref.local_value(), func_name)(*args, **kwargs)
>>>
>>> rpc.rpc_async(rref.owner(), run, args=(rref, func_name, args, kwargs))
参数

timeout (float, optional) – rref.rpc_async() 的超时时间。 如果调用未在此时间范围内完成,则会引发指示此情况的异常。 如果未提供此参数,将使用默认 RPC 超时时间。

示例:
>>> from torch.distributed import rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1))
>>> rref.rpc_async().size().wait()  # returns torch.Size([2, 2])
>>> rref.rpc_async().view(1, 4).wait()  # returns tensor([[1., 1., 1., 1.]])
rpc_sync(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object

创建一个辅助代理,以便使用 RRef 所有者作为目标轻松启动 rpc_sync,以在由此 RRef 引用的对象上运行函数。 更准确地说,rref.rpc_sync().func_name(*args, **kwargs) 等同于以下内容

>>> def run(rref, func_name, args, kwargs):
>>>   return getattr(rref.local_value(), func_name)(*args, **kwargs)
>>>
>>> rpc.rpc_sync(rref.owner(), run, args=(rref, func_name, args, kwargs))
参数

timeout (float, optional) – rref.rpc_sync() 的超时时间。 如果调用未在此时间范围内完成,则会引发指示此情况的异常。 如果未提供此参数,将使用默认 RPC 超时时间。

示例:
>>> from torch.distributed import rpc
>>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1))
>>> rref.rpc_sync().size()  # returns torch.Size([2, 2])
>>> rref.rpc_sync().view(1, 4)  # returns tensor([[1., 1., 1., 1.]])
to_here(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object

阻塞调用,将 RRef 的值从所有者复制到本地节点并返回。 如果当前节点是所有者,则返回对本地值的引用。

参数

timeout (float, optional) – to_here 的超时时间。 如果调用未在此时间范围内完成,则会引发指示此情况的异常。 如果未提供此参数,将使用默认 RPC 超时时间(60 秒)。

RemoteModule

警告

在使用 CUDA 张量时,目前不支持 RemoteModule

RemoteModule 是一种在不同进程上远程创建 nn.Module 的简便方法。 实际模块驻留在远程主机上,但本地主机拥有此模块的句柄,并类似于常规 nn.Module 调用此模块。 但是,调用需要对远程端进行 RPC 调用,并且如果需要,可以通过 RemoteModule 支持的其他 API 异步执行。

class torch.distributed.nn.api.remote_module.RemoteModule(*args, **kwargs)[source]

仅在 RPC 初始化后才能创建 RemoteModule 实例。

它在指定的远程节点上创建用户指定的模块。 它类似于常规的 nn.Module,不同之处在于 forward 方法在远程节点上执行。 它负责自动梯度记录,以确保反向传播将梯度传播回相应的远程模块。

它根据 module_clsforward 方法的签名生成两个方法 forward_asyncforwardforward_async 异步运行并返回一个 Future。 forward_asyncforward 的参数与 module_cls 返回的模块的 forward 方法相同。

例如,如果 module_cls 返回 nn.Linear 的实例,该实例具有 forward 方法签名:def forward(input: Tensor) -> Tensor:,则生成的 RemoteModule 将具有 2 个具有以下签名的方法

def forward(input: Tensor) -> Tensor:
def forward_async(input: Tensor) -> Future[Tensor]:
参数
  • remote_device (str) – 目标 worker 上我们要放置此模块的设备。 格式应为“<workername>/<device>”,其中设备字段可以解析为 torch.device 类型。 例如,“trainer0/cpu”、“trainer0”、“ps0/cuda:0”。 此外,设备字段可以是可选的,默认值为“cpu”。

  • module_cls (nn.Module) –

    要远程创建的模块的类。 例如,

    >>> class MyModule(nn.Module):
    >>>     def forward(input):
    >>>         return input + 1
    >>>
    >>> module_cls = MyModule
    

  • args (Sequence, optional) – 要传递给 module_cls 的参数。

  • kwargs (Dict, optional) – 要传递给 module_cls 的关键字参数。

返回值

一个远程模块实例,它包装了由用户提供的 Module 创建的 module_cls,它有一个阻塞的 forward 方法和一个异步的 forward_async 方法,该方法返回远程侧用户提供的模块上 forward 调用的未来。

示例:

在两个不同的进程中运行以下代码

>>> # On worker 0:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>> from torch import nn, Tensor
>>> from torch.distributed.nn.api.remote_module import RemoteModule
>>>
>>> rpc.init_rpc("worker0", rank=0, world_size=2)
>>> remote_linear_module = RemoteModule(
>>>     "worker1/cpu", nn.Linear, args=(20, 30),
>>> )
>>> input = torch.randn(128, 20)
>>> ret_fut = remote_linear_module.forward_async(input)
>>> ret = ret_fut.wait()
>>> rpc.shutdown()
>>> # On worker 1:
>>> import torch
>>> import torch.distributed.rpc as rpc
>>>
>>> rpc.init_rpc("worker1", rank=1, world_size=2)
>>> rpc.shutdown()

此外,可以在此 教程 中找到与 分布式数据并行 (DDP) 相结合的更实用的示例。

get_module_rref()

返回一个指向远程模块的 RRef (RRef[nn.Module])。

返回类型

RRef[Module]

remote_parameters(recurse=True)

返回一个指向远程模块参数的 RRef 列表。

这通常可以与 DistributedOptimizer 结合使用。

参数

recurse (bool) – 如果为 True,则返回远程模块和远程模块所有子模块的参数。否则,仅返回作为远程模块直接成员的参数。

返回值

指向远程模块参数的 RRef 列表 (List[RRef[nn.Parameter]])。

返回类型

List[RRef[Parameter]]

分布式自动微分框架

警告

使用 CUDA 张量时,目前不支持分布式自动微分

此模块提供了一个基于 RPC 的分布式自动微分框架,可用于模型并行训练等应用。简而言之,应用程序可以通过 RPC 发送和接收梯度记录张量。在正向传递中,我们记录何时将梯度记录张量通过 RPC 发送,并在反向传递期间使用此信息通过 RPC 执行分布式反向传递。有关更多详细信息,请参阅 分布式自动微分设计

torch.distributed.autograd.backward(context_id: int, roots: List[Tensor], retain_graph=False) None

使用提供的根启动分布式反向传递。这目前实现了 FAST 模式算法,该算法假设在同一分布式自动微分上下文中跨工作器发送的所有 RPC 消息在反向传递期间将成为自动微分图的一部分。

我们使用提供的根来发现自动微分图并计算适当的依赖关系。此方法会阻塞,直到整个自动微分计算完成。

我们在每个节点上的适当 torch.distributed.autograd.context 中累积梯度。将根据调用 torch.distributed.autograd.backward() 时传入的 context_id 来查找要使用的自动微分上下文。如果与给定 ID 相对应的有效自动微分上下文不存在,我们将抛出错误。您可以使用 get_gradients() API 检索累积的梯度。

参数
  • context_id (int) – 我们应该检索梯度的自动微分上下文 ID。

  • roots (list) – 代表自动微分计算根的张量。所有张量都应该是标量。

  • retain_graph (bool, optional) – 如果为 False,则用于计算梯度的图将被释放。请注意,在几乎所有情况下,将此选项设置为 True 都是不必要的,并且通常可以用更有效的方式解决。通常,您需要将其设置为 True 才能多次运行反向传递。

示例:
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     pred = model.forward()
>>>     loss = loss_func(pred, loss)
>>>     dist_autograd.backward(context_id, loss)
class torch.distributed.autograd.context[source]

在使用分布式自动微分时,上下文对象用于包装正向和反向传递。在 with 语句中生成的 context_id 对于唯一地识别所有工作器上的分布式反向传递是必需的。每个工作器都存储与该 context_id 关联的元数据,这对于正确执行分布式自动微分传递是必需的。

示例:
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     t1 = torch.rand((3, 3), requires_grad=True)
>>>     t2 = torch.rand((3, 3), requires_grad=True)
>>>     loss = rpc.rpc_sync("worker1", torch.add, args=(t1, t2)).sum()
>>>     dist_autograd.backward(context_id, [loss])
torch.distributed.autograd.get_gradients(context_id: int) Dict[Tensor, Tensor]

检索一个从张量到该张量的相应梯度的映射,该梯度在给定 context_id 的分布式自动微分反向传递过程中累积在提供的上下文中。

参数

context_id (int) – 我们应该检索梯度的自动微分上下文 ID。

返回值

一个映射,其中键是张量,值是该张量的关联梯度。

示例:
>>> import torch.distributed.autograd as dist_autograd
>>> with dist_autograd.context() as context_id:
>>>     t1 = torch.rand((3, 3), requires_grad=True)
>>>     t2 = torch.rand((3, 3), requires_grad=True)
>>>     loss = t1 + t2
>>>     dist_autograd.backward(context_id, [loss.sum()])
>>>     grads = dist_autograd.get_gradients(context_id)
>>>     print(grads[t1])
>>>     print(grads[t2])

分布式优化器

有关分布式优化器的文档,请参阅 torch.distributed.optim 页面。

设计说明

分布式自动微分设计说明涵盖了基于 RPC 的分布式自动微分框架的设计,该框架对于模型并行训练等应用很有用。

RRef 设计说明涵盖了 RRef (远程引用) 协议的设计,该协议用于通过框架引用远程工作器上的值。

教程

RPC 教程向用户介绍了 RPC 框架,提供了几个使用 torch.distributed.rpc API 的示例应用程序,并演示了如何使用 探查器 来探查基于 RPC 的工作负载。

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取针对初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获取问题的答案

查看资源