分布式 RPC 框架¶
分布式 RPC 框架提供了一组用于多机模型训练的机制,这些机制通过一组基元来实现远程通信,并提供了一个更高级的 API 来自动区分分布在多台机器上的模型。
警告
RPC 包中的 API 稳定。有多个正在进行的工作项目可以提高性能和错误处理,这些项目将在未来的版本中发布。
警告
CUDA 支持在 PyTorch 1.9 中引入,目前仍处于**测试版**功能。并非 RPC 包的所有功能都与 CUDA 支持兼容,因此不建议使用这些功能。这些不支持的功能包括:RRefs、JIT 兼容性、dist autograd 和 dist optimizer 以及分析。这些缺陷将在未来的版本中解决。
注意
请参考PyTorch 分布式概述,以简要介绍与分布式训练相关的所有功能。
基础¶
分布式 RPC 框架使远程运行函数变得容易,支持在不复制实际数据的情况下引用远程对象,并提供自动梯度和优化器 API,以便在 RPC 边界之间透明地运行反向传播和更新参数。这些功能可以分为四组 API。
**远程过程调用 (RPC)**支持在指定的目的地工作程序上运行函数,并使用给定的参数,并将返回值返回或创建一个对返回值的引用。主要有三种 RPC API:
rpc_sync()
(同步)、rpc_async()
(异步)和remote()
(异步,并返回对远程返回值的引用)。如果用户代码在没有返回值的情况下无法继续执行,请使用同步 API。否则,请使用异步 API 获取未来,并在需要在调用者处获得返回值时等待未来。remote()
API 在需要远程创建某些内容但永远不需要将其获取到调用者的场景中很有用。假设一个驱动程序进程正在设置一个参数服务器和一个训练器。驱动程序可以在参数服务器上创建一个嵌入表,然后将对嵌入表的引用共享给训练器,但它本身永远不会在本地使用嵌入表。在这种情况下,rpc_sync()
和rpc_async()
不再适用,因为它们始终意味着返回值将立即或在将来返回给调用者。**远程引用 (RRef)**充当对本地或远程对象的分布式共享指针。它可以与其他工作程序共享,引用计数将透明地处理。每个 RRef 只有一个所有者,并且该对象仅存在于该所有者上。持有 RRefs 的非所有者工作程序可以通过明确请求从所有者处获取对象的副本。当工作程序需要访问某些数据对象,但本身既不是创建者(
remote()
的调用者)也不是该对象的拥有者时,这很有用。正如我们将在下面讨论的那样,分布式优化器就是这类用例的一个示例。**分布式自动梯度**将参与前向传播的所有工作程序上的本地自动梯度引擎缝合在一起,并在反向传播期间自动联系它们以计算梯度。当进行例如分布式模型并行训练、参数服务器训练等时,如果前向传播需要跨越多台机器,这将特别有用。使用此功能,用户代码不再需要担心如何跨 RPC 边界发送梯度,以及应该以什么顺序启动本地自动梯度引擎,这在存在前向传播中嵌套和相互依赖的 RPC 调用时会变得非常复杂。
**分布式优化器**的构造函数接受一个
Optimizer()
(例如,SGD()
、Adagrad()
等)和一个参数 RRefs 列表,在每个不同的 RRef 所有者上创建一个Optimizer()
实例,并在运行step()
时相应地更新参数。当你进行分布式前向和反向传播时,参数和梯度将分散在多个工作程序上,因此需要在每个参与的工作程序上都使用一个优化器。分布式优化器将所有这些本地优化器包装到一个中,并提供了一个简洁的构造函数和step()
API。
RPC¶
在使用 RPC 和分布式自动梯度基元之前,必须进行初始化。要初始化 RPC 框架,我们需要使用init_rpc()
,它将初始化 RPC 框架、RRef 框架和分布式自动梯度。
- torch.distributed.rpc.init_rpc(name, backend=None, rank=-1, world_size=None, rpc_backend_options=None)[source]¶
初始化 RPC 原语,例如本地 RPC 代理和分布式自动微分,这将立即使当前进程准备好发送和接收 RPC。
- 参数
name (str) – 此节点的全局唯一名称。(例如,
Trainer3
、ParameterServer2
、Master
、Worker1
)。名称只能包含数字、字母、下划线、冒号和/或连字符,且长度必须小于 128 个字符。backend (BackendType, optional) – RPC 后端实现的类型。支持的值为
BackendType.TENSORPIPE
(默认)。有关更多信息,请参见 后端。rank (int) – 此节点的全局唯一 ID/排名。
world_size (int) – 组中的工作程序数量。
rpc_backend_options (RpcBackendOptions, optional) – 传递给 RpcAgent 构造函数的选项。它必须是
RpcBackendOptions
的代理特定子类,并包含代理特定初始化配置。默认情况下,对于所有代理,它将默认超时设置为 60 秒,并使用使用init_method = "env://"
初始化的底层进程组执行会合,这意味着需要正确设置环境变量MASTER_ADDR
和MASTER_PORT
。有关更多信息,请参见 后端,并查找哪些选项可用。
以下 API 允许用户远程执行函数,以及创建对远程数据对象的引用(RRef)。在这些 API 中,当将 Tensor
作为参数或返回值传递时,目标工作程序将尝试创建具有相同元数据(即形状、步幅等)的 Tensor
。我们有意禁止传输 CUDA 张量,因为如果源和目标工作程序上的设备列表不匹配,则可能会崩溃。在这种情况下,应用程序始终可以显式地将输入张量移动到调用者的 CPU 上,并在必要时将其移动到被调用者的所需设备上。
警告
RPC 中的 TorchScript 支持是一个原型功能,可能会发生更改。从 v1.5.0 开始,torch.distributed.rpc
支持将 TorchScript 函数作为 RPC 目标函数调用,这将有助于提高被调用者端的并行性,因为执行 TorchScript 函数不需要 GIL。
- torch.distributed.rpc.rpc_sync(to, func, args=None, kwargs=None, timeout=-1.0)[source]¶
进行阻塞式 RPC 调用以在工作程序
to
上运行函数func
。RPC 消息与 Python 代码的执行并行发送和接收。此方法是线程安全的。- 参数
to (str or WorkerInfo or int) – 目标工作程序的名称/排名/
WorkerInfo
。func (Callable) – 可调用函数,例如 Python 可调用对象、内置运算符(例如
add()
)和带注释的 TorchScript 函数。args (tuple) –
func
调用的参数元组。kwargs (dict) – 是
func
调用的关键字参数字典。timeout (float, optional) – 此 RPC 的超时时间(以秒为单位)。如果 RPC 未在此时间段内完成,则将引发指示超时错误的异常。值为 0 表示无限超时,即永远不会引发超时错误。如果未提供,则使用初始化时设置的默认值或使用
_set_rpc_timeout
设置的默认值。
- 返回值
返回使用
args
和kwargs
运行func
的结果。
- 示例:
确保
MASTER_ADDR
和MASTER_PORT
在两个工作程序上都已正确设置。有关更多详细信息,请参阅init_process_group()
API。例如,export MASTER_ADDR=localhost export MASTER_PORT=5678
然后在两个不同的进程中运行以下代码
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3)) >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
以下是使用 RPC 运行 TorchScript 函数的示例。
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.rpc_async(to, func, args=None, kwargs=None, timeout=-1.0)[source]¶
进行非阻塞式 RPC 调用以在工作程序
to
上运行函数func
。RPC 消息与 Python 代码的执行并行发送和接收。此方法是线程安全的。此方法将立即返回一个Future
,可以对其进行等待。- 参数
to (str or WorkerInfo or int) – 目标工作程序的名称/排名/
WorkerInfo
。func (Callable) – 可调用函数,例如 Python 可调用对象、内置运算符(例如
add()
)和带注释的 TorchScript 函数。args (tuple) –
func
调用的参数元组。kwargs (dict) – 是
func
调用的关键字参数字典。timeout (float, optional) – 此 RPC 的超时时间(以秒为单位)。如果 RPC 未在此时间段内完成,则将引发指示超时错误的异常。值为 0 表示无限超时,即永远不会引发超时错误。如果未提供,则使用初始化时设置的默认值或使用
_set_rpc_timeout
设置的默认值。
- 返回值
返回一个
Future
对象,可以对其进行等待。完成后,可以在Future
对象中检索在args
和kwargs
上运行func
的返回值。
警告
由于我们不支持通过网络发送 GPU 张量,因此不支持使用 GPU 张量作为
func
的参数或返回值。在使用它们作为func
的参数或返回值之前,您需要显式地将 GPU 张量复制到 CPU 上。警告
rpc_async
API 在将参数张量发送到网络之前不会复制它们的存储,这可以由不同的线程完成,具体取决于 RPC 后端类型。调用者应确保这些张量的內容在返回的Future
完成之前保持完整。- 示例:
确保
MASTER_ADDR
和MASTER_PORT
在两个工作程序上都已正确设置。有关更多详细信息,请参阅init_process_group()
API。例如,export MASTER_ADDR=localhost export MASTER_PORT=5678
然后在两个不同的进程中运行以下代码
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3)) >>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2)) >>> result = fut1.wait() + fut2.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
以下是使用 RPC 运行 TorchScript 函数的示例。
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3)) >>> ret = fut.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.remote(to, func, args=None, kwargs=None, timeout=-1.0)[source]¶
对 worker
to
进行远程调用,执行func
,并立即返回一个指向结果值的RRef
。workerto
将是返回的RRef
的所有者,而调用remote
的 worker 是用户。所有者管理其RRef
的全局引用计数,并且只有当全局没有对其的活动引用时,所有者RRef
才会被销毁。- 参数
to (str or WorkerInfo or int) – 目标工作程序的名称/排名/
WorkerInfo
。func (Callable) – 可调用函数,例如 Python 可调用对象、内置运算符(例如
add()
)和带注释的 TorchScript 函数。args (tuple) –
func
调用的参数元组。kwargs (dict) – 是
func
调用的关键字参数字典。timeout (float, optional) – 此远程调用的超时时间(以秒为单位)。如果在该超时时间内,无法在此 worker 上成功处理 worker
to
上此RRef
的创建,则下次尝试使用 RRef 时(例如to_here()
),将引发超时错误,指示此错误。值为 0 表示无限超时,即永远不会引发超时错误。如果没有提供,则使用初始化时或使用_set_rpc_timeout
设置的默认值。
- 返回值
指向结果值的用户的
RRef
实例。使用阻塞 APItorch.distributed.rpc.RRef.to_here()
在本地检索结果值。
警告
remote
API 在通过网络发送之前不会复制参数张量的存储,这可能由不同的线程完成,具体取决于 RPC 后端类型。调用者应确保这些张量的内容在返回的 RRef 被所有者确认之前保持完整,可以使用torch.distributed.rpc.RRef.confirmed_by_owner()
API 检查这一点。警告
remote
API 的超时等错误会尽力处理。这意味着当remote
启动的远程调用失败时(例如,出现超时错误),我们会尽力处理错误。这意味着错误会异步地处理并设置在生成的 RRef 上。如果应用程序在处理错误之前没有使用 RRef(例如to_here
或 fork 调用),则将来使用RRef
会适当地引发错误。但是,用户应用程序可能会在处理错误之前使用RRef
。在这种情况下,错误可能不会被引发,因为它们尚未被处理。示例
Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly on both workers. Refer to :meth:`~torch.distributed.init_process_group` API for more details. For example, export MASTER_ADDR=localhost export MASTER_PORT=5678 Then run the following code in two different processes: >>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1)) >>> x = rref1.to_here() + rref2.to_here() >>> rpc.shutdown() >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() Below is an example of running a TorchScript function using RPC. >>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar) >>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rref.to_here() >>> rpc.shutdown() >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.get_worker_info(worker_name=None)[source]¶
获取给定 worker 名称的
WorkerInfo
。使用此WorkerInfo
避免在每次调用时传递昂贵的字符串。- 参数
worker_name (str) – worker 的字符串名称。如果为
None
,则返回当前 worker 的 ID。(默认值为None
)- 返回值
给定
worker_name
的WorkerInfo
实例,或者如果worker_name
为None
,则为当前 worker 的WorkerInfo
。
- torch.distributed.rpc.shutdown(graceful=True, timeout=0)[source]¶
执行 RPC 代理的关闭,然后销毁 RPC 代理。这会阻止本地代理接受未完成的请求,并通过终止所有 RPC 线程来关闭 RPC 框架。如果
graceful=True
,则会阻塞,直到所有本地和远程 RPC 进程都到达此方法并等待所有未完成的工作完成。否则,如果graceful=False
,则为本地关闭,它不会等待其他 RPC 进程到达此方法。警告
对于
Future
对象,它们由rpc_async()
返回,在shutdown()
之后不应调用future.wait()
。- 参数
graceful (bool) – 是否进行优雅关闭。如果为 True,则会 1)等待直到没有
UserRRefs
的待处理系统消息并删除它们;2)阻塞,直到所有本地和远程 RPC 进程都到达此方法并等待所有未完成的工作完成。
- 示例:
确保
MASTER_ADDR
和MASTER_PORT
在两个工作程序上都已正确设置。有关更多详细信息,请参阅init_process_group()
API。例如,export MASTER_ADDR=localhost export MASTER_PORT=5678
然后在两个不同的进程中运行以下代码
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> # do some work >>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1)) >>> # ready to shutdown >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> # wait for worker 0 to finish work, and then shutdown. >>> rpc.shutdown()
- class torch.distributed.rpc.WorkerInfo¶
一个结构,封装了系统中 worker 的信息。包含 worker 的名称和 ID。此类不打算直接构造,而是可以通过
get_worker_info()
获取实例,并将结果传递给诸如rpc_sync()
、rpc_async()
、remote()
的函数,以避免在每次调用时复制字符串。- property id¶
全局唯一的 ID,用于识别 worker。
- property name¶
worker 的名称。
RPC 包还提供装饰器,允许应用程序指定在被调用方侧如何处理给定函数。
- torch.distributed.rpc.functions.async_execution(fn)[source]¶
用于函数的装饰器,指示函数的返回值保证为
Future
对象,并且此函数可以在 RPC 被调用方上异步运行。更具体地说,被调用方会提取由包装函数返回的Future
,并将后续处理步骤安装为该Future
的回调。安装的回调将在完成时从Future
读取值,并将该值作为 RPC 响应发送回来。这也意味着返回的Future
只存在于被调用方侧,并且永远不会通过 RPC 发送。当包装函数(fn
)的执行需要暂停和恢复时,此装饰器很有用,例如,包含rpc_async()
或等待其他信号。注意
为了启用异步执行,应用程序必须将此装饰器返回的函数对象传递给 RPC API。如果 RPC 检测到此装饰器安装的属性,它就知道此函数返回一个
Future
对象,并将相应地进行处理。但是,这并不意味着此装饰器在定义函数时必须是最外层的装饰器。例如,当与@staticmethod
或@classmethod
结合使用时,@rpc.functions.async_execution
需要是内部装饰器,以允许目标函数被识别为静态函数或类函数。此目标函数仍然可以异步执行,因为在访问时,静态函数或类方法会保留由@rpc.functions.async_execution
安装的属性。- 示例:
返回的
Future
对象可以来自rpc_async()
,then()
或Future
构造函数。以下示例展示了直接使用由then()
返回的Future
。>>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> @rpc.functions.async_execution >>> def async_add_chained(to, x, y, z): >>> # This function runs on "worker1" and returns immediately when >>> # the callback is installed through the `then(cb)` API. In the >>> # mean time, the `rpc_async` to "worker2" can run concurrently. >>> # When the return value of that `rpc_async` arrives at >>> # "worker1", "worker1" will run the lambda function accordingly >>> # and set the value for the previously returned `Future`, which >>> # will then trigger RPC to send the result back to "worker0". >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> async_add_chained, >>> args=("worker2", torch.ones(2), 1, 1) >>> ) >>> print(ret) # prints tensor([3., 3.])
当与 TorchScript 装饰器结合使用时,此装饰器必须是最外层的装饰器。
>>> from torch import Tensor >>> from torch.futures import Future >>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> @torch.jit.script >>> def script_add(x: Tensor, y: Tensor) -> Tensor: >>> return x + y >>> >>> @rpc.functions.async_execution >>> @torch.jit.script >>> def async_add(to: str, x: Tensor, y: Tensor) -> Future[Tensor]: >>> return rpc.rpc_async(to, script_add, (x, y)) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> async_add, >>> args=("worker2", torch.ones(2), 1) >>> ) >>> print(ret) # prints tensor([2., 2.])
当与静态方法或类方法结合使用时,此装饰器必须是最内层的装饰器。
>>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> class AsyncExecutionClass: >>> >>> @staticmethod >>> @rpc.functions.async_execution >>> def static_async_add(to, x, y, z): >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> @classmethod >>> @rpc.functions.async_execution >>> def class_async_add(cls, to, x, y, z): >>> ret_fut = torch.futures.Future() >>> rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: ret_fut.set_result(fut.wait() + z) >>> ) >>> return ret_fut >>> >>> @rpc.functions.async_execution >>> def bound_async_add(self, to, x, y, z): >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> AsyncExecutionClass.static_async_add, >>> args=("worker2", torch.ones(2), 1, 2) >>> ) >>> print(ret) # prints tensor([4., 4.]) >>> >>> ret = rpc.rpc_sync( >>> "worker1", >>> AsyncExecutionClass.class_async_add, >>> args=("worker2", torch.ones(2), 1, 2) >>> ) >>> print(ret) # prints tensor([4., 4.])
此装饰器也适用于 RRef 帮助程序,即 .
torch.distributed.rpc.RRef.rpc_sync()
,torch.distributed.rpc.RRef.rpc_async()
和torch.distributed.rpc.RRef.remote()
。>>> from torch.distributed import rpc >>> >>> # reuse the AsyncExecutionClass class above >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.rpc_sync().static_async_add("worker2", torch.ones(2), 1, 2) >>> print(ret) # prints tensor([4., 4.]) >>> >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.rpc_async().static_async_add("worker2", torch.ones(2), 1, 2).wait() >>> print(ret) # prints tensor([4., 4.]) >>> >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.remote().static_async_add("worker2", torch.ones(2), 1, 2).to_here() >>> print(ret) # prints tensor([4., 4.])
后端¶
RPC 模块可以利用不同的后端在节点之间执行通信。要使用的后端可以在 init_rpc()
函数中指定,通过传递 BackendType
枚举的特定值。无论使用什么后端,其余的 RPC API 都不会改变。每个后端还定义了其自身的 RpcBackendOptions
类子类,可以将该类的实例传递给 init_rpc()
以配置后端的行为。
- class torch.distributed.rpc.BackendType(value)¶
可用后端的枚举类。
PyTorch 附带内置的
BackendType.TENSORPIPE
后端。可以使用register_backend()
函数注册其他后端。
- class torch.distributed.rpc.RpcBackendOptions¶
封装传递到 RPC 后端的选项的抽象结构。可以将此类的实例传递给
init_rpc()
,以便使用特定配置(例如 RPC 超时和要使用的init_method
)初始化 RPC。- property init_method¶
指定如何初始化进程组的 URL。默认值为
env://
- property rpc_timeout¶
表示所有 RPC 使用的超时的浮点数。如果 RPC 在此时间范围内未完成,它将以异常完成,指示它已超时。
TensorPipe 后端¶
作为默认值的 TensorPipe 代理利用了 TensorPipe 库,该库提供了一种专门适用于机器学习的本机点对点通信原语,从根本上解决了 Gloo 的一些局限性。与 Gloo 相比,它的优势在于异步,这使得大量的传输可以同时发生,每个传输都以自己的速度进行,而不会阻塞彼此。它只会在需要时按需在节点对之间打开管道,当一个节点发生故障时,只有与其相关的管道会被关闭,而其他所有管道将继续正常工作。此外,它能够支持多种不同的传输(当然包括 TCP,还有共享内存、NVLink、InfiniBand 等),并且可以自动检测它们的可用性并协商为每个管道使用的最佳传输方式。
TensorPipe 后端是在 PyTorch v1.6 中引入的,并且正在积极开发中。目前,它只支持 CPU 张量,GPU 支持即将推出。它带有一个基于 TCP 的传输,就像 Gloo 一样。它还能够自动将大型张量分块和多路复用到多个套接字和线程,以实现非常高的带宽。代理能够自行选择最佳的传输方式,无需任何干预。
示例
>>> import os
>>> from torch.distributed import rpc
>>> os.environ['MASTER_ADDR'] = 'localhost'
>>> os.environ['MASTER_PORT'] = '29500'
>>>
>>> rpc.init_rpc(
>>> "worker1",
>>> rank=0,
>>> world_size=2,
>>> rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
>>> num_worker_threads=8,
>>> rpc_timeout=20 # 20 second timeout
>>> )
>>> )
>>>
>>> # omitting init_rpc invocation on worker2
- class torch.distributed.rpc.TensorPipeRpcBackendOptions(*, num_worker_threads=16, rpc_timeout=60.0, init_method='env://', device_maps=None, devices=None, _transports=None, _channels=None)[source]¶
TensorPipeAgent
的后端选项,继承自RpcBackendOptions
。- 参数
num_worker_threads (int, optional) –
TensorPipeAgent
用于执行请求的线程池中的线程数(默认值:16)。rpc_timeout (float, optional) – RPC 请求的默认超时时间(以秒为单位)(默认值:60 秒)。如果 RPC 在此时间范围内未完成,将引发指示此情况的异常。调用者可以根据需要在
rpc_sync()
和rpc_async()
中为单个 RPC 覆盖此超时时间。init_method (str, optional) – 用于初始化用于会合的分布式存储的 URL。它接受
init_process_group()
的相同参数接受的任何值(默认值:env://
)。device_maps (Dict[str, Dict], optional) – 从此工作程序到被调用者的设备放置映射。键是被调用者的工作程序名称,值是字典(
Dict
ofint
,str
或torch.device
),它将此工作程序的设备映射到被调用者的工作程序的设备。(默认值:None
)devices (List[int, str, or
torch.device
], optional) – RPC 代理使用的所有本地 CUDA 设备。默认情况下,它将初始化为来自其自身的device_maps
的所有本地设备及其对等方的device_maps
中的对应设备。在处理 CUDA RPC 请求时,代理将正确同步此List
中所有设备的 CUDA 流。
- property device_maps¶
设备映射位置。
- property devices¶
本地代理使用的所有设备。
- property init_method¶
指定如何初始化进程组的 URL。默认值为
env://
- property num_worker_threads¶
TensorPipeAgent
用于执行请求的线程池中的线程数。
- property rpc_timeout¶
表示所有 RPC 使用的超时的浮点数。如果 RPC 在此时间范围内未完成,它将以异常完成,指示它已超时。
- set_device_map(to, device_map)[source]¶
设置每个 RPC 调用者和被调用者对之间的设备映射。可以多次调用此函数以增量添加设备放置配置。
- 参数
to (str) – 被调用者的名称。
device_map (Dict of int, str, or torch.device) – 从此工作程序到被调用者的设备放置映射。此映射必须是可逆的。
示例
>>> # both workers >>> def add(x, y): >>> print(x) # tensor([1., 1.], device='cuda:1') >>> return x + y, (x + y).to(2) >>> >>> # on worker 0 >>> options = TensorPipeRpcBackendOptions( >>> num_worker_threads=8, >>> device_maps={"worker1": {0: 1}} >>> # maps worker0's cuda:0 to worker1's cuda:1 >>> ) >>> options.set_device_map("worker1", {1: 2}) >>> # maps worker0's cuda:1 to worker1's cuda:2 >>> >>> rpc.init_rpc( >>> "worker0", >>> rank=0, >>> world_size=2, >>> backend=rpc.BackendType.TENSORPIPE, >>> rpc_backend_options=options >>> ) >>> >>> x = torch.ones(2) >>> rets = rpc.rpc_sync("worker1", add, args=(x.to(0), 1)) >>> # The first argument will be moved to cuda:1 on worker1. When >>> # sending the return value back, it will follow the invert of >>> # the device map, and hence will be moved back to cuda:0 and >>> # cuda:1 on worker0 >>> print(rets[0]) # tensor([2., 2.], device='cuda:0') >>> print(rets[1]) # tensor([2., 2.], device='cuda:1')
- set_devices(devices)[source]¶
设置TensorPipe RPC代理使用的本地设备。在处理CUDA RPC请求时,TensorPipe RPC代理将为该
List
中的所有设备正确同步CUDA流。- 参数
devices (List of int, str, or torch.device) – TensorPipe RPC代理使用的本地设备。
注意
RPC框架不会自动重试任何rpc_sync()
、rpc_async()
和remote()
调用。原因是RPC框架无法确定操作是否幂等以及是否可以安全地重试。因此,应用程序有责任处理故障并在必要时重试。RPC通信基于TCP,因此由于网络故障或间歇性网络连接问题而可能发生故障。在这种情况下,应用程序需要适当地重试并使用合理的回退来确保网络不会因积极的重试而不堪重负。
RRef¶
警告
当前使用CUDA张量时不支持RRef。
一个RRef
(远程引用)是远程工作程序上某个类型T
(例如Tensor
)的值的引用。此句柄使引用的远程值在所有者上保持活动状态,但不能保证将来会将该值传输到本地工作程序。RRef可以在多机训练中使用,方法是持有对其他工作程序上nn.Modules的引用,并在训练期间调用适当的函数来检索或修改其参数。有关详细信息,请参见远程引用协议。
- class torch.distributed.rpc.PyRRef(RRef)¶
一个封装了对远程工作程序上某个类型的值的引用的类。此句柄将使引用的远程值在工作程序上保持活动状态。一个
UserRRef
将在以下情况下被删除:1) 应用程序代码和本地RRef上下文中都没有对它的引用,或者 2) 应用程序已调用了优雅关闭。在已删除的RRef上调用方法会导致未定义的行为。RRef实现仅提供尽力而为的错误检测,应用程序不应在rpc.shutdown()
之后使用UserRefs
。警告
RRef只能由RPC模块序列化和反序列化。在没有RPC的情况下序列化和反序列化RRef(例如,Python pickle,torch
save()
/load()
,JITsave()
/load()
等)会导致错误。- 参数
value (object) – 要由此RRef包装的值。
type_hint (Type, optional) – 应传递给
TorchScript
编译器作为value
类型提示的Python类型。
- 示例:
以下示例出于简单起见跳过了RPC初始化和关闭代码。有关这些详细信息,请参阅RPC文档。
使用rpc.remote创建RRef
>>> import torch >>> import torch.distributed.rpc as rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> # get a copy of value from the RRef >>> x = rref.to_here()
从本地对象创建RRef
>>> import torch >>> from torch.distributed.rpc import RRef >>> x = torch.zeros(2, 2) >>> rref = RRef(x)
与其他工作程序共享RRef
>>> # On both worker0 and worker1: >>> def f(rref): >>> return rref.to_here() + 1
>>> # On worker0: >>> import torch >>> import torch.distributed.rpc as rpc >>> from torch.distributed.rpc import RRef >>> rref = RRef(torch.zeros(2, 2)) >>> # the following RPC shares the rref with worker1, reference >>> # count is automatically updated. >>> rpc.rpc_sync("worker1", f, args=(rref,))
- backward(self: torch._C._distributed_rpc.PyRRef, dist_autograd_ctx_id: int = -1, retain_graph: bool = False) None ¶
使用RRef作为反向传播的根运行反向传播。如果提供了
dist_autograd_ctx_id
,我们将使用提供的ctx_id从RRef的所有者开始执行分布式反向传播。在这种情况下,应使用get_gradients()
来检索梯度。如果dist_autograd_ctx_id
为None
,则假定这是一个本地自动梯度图,我们只执行本地反向传播。在本地情况下,调用此API的节点必须是RRef的所有者。RRef的值应为标量张量。- 参数
- 示例:
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> rref.backward(context_id)
- confirmed_by_owner(self: torch._C._distributed_rpc.PyRRef) bool ¶
返回此
RRef
是否已由所有者确认。OwnerRRef
始终返回true,而UserRRef
仅在所有者知道此UserRRef
时才返回true。
- is_owner(self: torch._C._distributed_rpc.PyRRef) bool ¶
返回当前节点是否为此
RRef
的所有者。
- local_value(self: torch._C._distributed_rpc.PyRRef) object ¶
如果当前节点是所有者,则返回对本地值的引用。否则,抛出异常。
- owner(self: torch._C._distributed_rpc.PyRRef) torch._C._distributed_rpc.WorkerInfo ¶
返回拥有此
RRef
的节点的工作程序信息。
- owner_name(self: torch._C._distributed_rpc.PyRRef) str ¶
返回拥有此
RRef
的节点的工作器名称。
- remote(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object ¶
创建一个辅助代理,以便使用 RRef 所有者作为目标来轻松地启动
remote
,以在该 RRef 引用的对象上运行函数。更具体地说,rref.remote().func_name(*args, **kwargs)
等效于以下操作>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.remote(rref.owner(), run, args=(rref, func_name, args, kwargs))
- 参数
timeout (float, optional) –
rref.remote()
的超时时间。如果在此超时时间内未成功完成此RRef
的创建,则下次尝试使用 RRef(例如to_here
)时,将会引发超时错误。如果未提供,将使用默认的 RPC 超时时间。有关RRef
的特定超时语义,请参见rpc.remote()
。
- 示例:
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.remote().size().to_here() # returns torch.Size([2, 2]) >>> rref.remote().view(1, 4).to_here() # returns tensor([[1., 1., 1., 1.]])
- rpc_async(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object ¶
创建一个辅助代理,以便使用 RRef 所有者作为目标来轻松地启动
rpc_async
,以在该 RRef 引用的对象上运行函数。更具体地说,rref.rpc_async().func_name(*args, **kwargs)
等效于以下操作>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.rpc_async(rref.owner(), run, args=(rref, func_name, args, kwargs))
- 参数
timeout (float, optional) –
rref.rpc_async()
的超时时间。如果调用未在此时间范围内完成,则会引发指示该情况的异常。如果未提供此参数,将使用默认的 RPC 超时时间。
- 示例:
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.rpc_async().size().wait() # returns torch.Size([2, 2]) >>> rref.rpc_async().view(1, 4).wait() # returns tensor([[1., 1., 1., 1.]])
- rpc_sync(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object ¶
创建一个辅助代理,以便使用 RRef 所有者作为目标来轻松地启动
rpc_sync
,以在该 RRef 引用的对象上运行函数。更具体地说,rref.rpc_sync().func_name(*args, **kwargs)
等效于以下操作>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.rpc_sync(rref.owner(), run, args=(rref, func_name, args, kwargs))
- 参数
timeout (float, optional) –
rref.rpc_sync()
的超时时间。如果调用未在此时间范围内完成,则会引发指示该情况的异常。如果未提供此参数,将使用默认的 RPC 超时时间。
- 示例:
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.rpc_sync().size() # returns torch.Size([2, 2]) >>> rref.rpc_sync().view(1, 4) # returns tensor([[1., 1., 1., 1.]])
- to_here(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object ¶
阻塞调用,将 RRef 的值从所有者复制到本地节点并返回它。如果当前节点是所有者,则返回对本地值的引用。
- 参数
timeout (float, optional) –
to_here
的超时时间。如果调用未在此时间范围内完成,则会引发指示该情况的异常。如果未提供此参数,将使用默认的 RPC 超时时间(60 秒)。
有关 RRef 的更多信息
RemoteModule¶
警告
在使用 CUDA 张量时,当前不支持 RemoteModule
RemoteModule
是一种在不同进程上远程创建 nn.Module 的简便方法。实际的模块驻留在远程主机上,但本地主机拥有对此模块的句柄,并类似于常规 nn.Module 一样调用此模块。但是,调用会引发到远程端的 RPC 调用,如果需要,可以通过 RemoteModule 支持的额外 API 异步执行。
- class torch.distributed.nn.api.remote_module.RemoteModule(*args, **kwargs)[source]¶
只有在 RPC 初始化后才能创建 RemoteModule 实例。
它在指定的远程节点上创建用户指定的模块。它类似于常规的
nn.Module
,除了forward
方法是在远程节点上执行的。它会处理自动梯度记录,以确保反向传播将梯度传播回相应的远程模块。它根据
module_cls
的forward
方法签名生成两个方法forward_async
和forward
。forward_async
异步运行并返回一个 Future。forward_async
和forward
的参数与module_cls
返回的模块的forward
方法相同。例如,如果
module_cls
返回nn.Linear
的实例,该实例具有forward
方法签名:def forward(input: Tensor) -> Tensor:
,则生成的RemoteModule
将具有 2 个具有以下签名的方法def forward(input: Tensor) -> Tensor:
def forward_async(input: Tensor) -> Future[Tensor]:
- 参数
remote_device (str) – 目标工作器上的设备,我们希望在此处放置此模块。格式应为“<workername>/<device>”,其中设备字段可以解析为 torch.device 类型。例如,“trainer0/cpu”、“trainer0”、“ps0/cuda:0”。此外,设备字段可以是可选的,默认值为“cpu”。
module_cls (nn.Module) –
要远程创建的模块的类。例如,
>>> class MyModule(nn.Module): >>> def forward(input): >>> return input + 1 >>> >>> module_cls = MyModule
args (Sequence, optional) – 要传递给
module_cls
的参数。kwargs (Dict, optional) – 要传递给
module_cls
的关键字参数。
- 返回值
一个远程模块实例,它封装了用户提供的
module_cls
创建的Module
,它有一个阻塞的forward
方法和一个异步的forward_async
方法,该方法返回在远程端的用户提供的模块上forward
调用的未来。
- 示例:
在两个不同的进程中运行以下代码
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> from torch import nn, Tensor >>> from torch.distributed.nn.api.remote_module import RemoteModule >>> >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> remote_linear_module = RemoteModule( >>> "worker1/cpu", nn.Linear, args=(20, 30), >>> ) >>> input = torch.randn(128, 20) >>> ret_fut = remote_linear_module.forward_async(input) >>> ret = ret_fut.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch >>> import torch.distributed.rpc as rpc >>> >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
此外,可以在这个 教程 中找到一个更实用的例子,它与 DistributedDataParallel (DDP) 相结合。
- remote_parameters(recurse=True)¶
返回一个指向远程模块参数的
RRef
列表。这通常可以与
DistributedOptimizer
结合使用。
分布式自动梯度框架¶
警告
当前使用 CUDA 张量时不支持分布式自动梯度
此模块提供了一个基于 RPC 的分布式自动梯度框架,可用于模型并行训练等应用程序。简而言之,应用程序可以在 RPC 上发送和接收梯度记录张量。在正向传播过程中,我们记录何时将梯度记录张量通过 RPC 发送,并在反向传播过程中使用这些信息,使用 RPC 执行分布式反向传播。有关更多详细信息,请参阅 分布式自动梯度设计。
- torch.distributed.autograd.backward(context_id: int, roots: List[Tensor], retain_graph=False) None ¶
使用提供的根启动分布式反向传播。这目前实现了 FAST 模式算法,该算法假设在所有工作器上相同的分布式自动梯度上下文中发送的所有 RPC 消息在反向传播期间将成为自动梯度图的一部分。
我们使用提供的根来发现自动梯度图并计算适当的依赖关系。此方法阻塞,直到完成整个自动梯度计算。
我们将在每个节点上的适当
torch.distributed.autograd.context
中累积梯度。将根据在调用torch.distributed.autograd.backward()
时传入的context_id
来查找要使用的自动梯度上下文。如果不存在与给定 ID 对应的有效自动梯度上下文,我们将抛出错误。您可以使用get_gradients()
API 检索累积的梯度。- 参数
- 示例:
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> pred = model.forward() >>> loss = loss_func(pred, loss) >>> dist_autograd.backward(context_id, loss)
- class torch.distributed.autograd.context[source]¶
上下文对象,用于在使用分布式自动梯度时包装正向和反向传递。在
with
语句中生成的context_id
对于唯一标识所有工作器上的分布式反向传递是必需的。每个工作器存储与该context_id
关联的元数据,这是正确执行分布式自动梯度传递所必需的。- 示例:
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> loss = rpc.rpc_sync("worker1", torch.add, args=(t1, t2)).sum() >>> dist_autograd.backward(context_id, [loss])
- torch.distributed.autograd.get_gradients(context_id: int) Dict[Tensor, Tensor] ¶
检索一个从张量到与该张量相关的梯度的映射,该梯度在分布式自动梯度反向传递期间在与给定
context_id
对应的提供的上下文中累积。- 参数
context_id (int) – 我们应该检索梯度的自动梯度上下文 ID。
- 返回值
一个映射,其中键是张量,值是该张量的关联梯度。
- 示例:
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> loss = t1 + t2 >>> dist_autograd.backward(context_id, [loss.sum()]) >>> grads = dist_autograd.get_gradients(context_id) >>> print(grads[t1]) >>> print(grads[t2])
有关 RPC 自动梯度的更多信息
分布式优化器¶
有关分布式优化器的文档,请参阅 torch.distributed.optim 页面。
设计说明¶
分布式自动梯度设计说明介绍了基于 RPC 的分布式自动梯度框架的设计,该框架对于模型并行训练等应用程序非常有用。
RRef 设计说明介绍了框架中用于引用远程工作器上的值的 RRef (Remote REFerence) 协议的设计。
教程¶
RPC 教程介绍了 RPC 框架,提供了一些使用 torch.distributed.rpc API 的示例应用程序,并演示了如何使用 探查器 来探查基于 RPC 的工作负载。
将分布式数据并行与分布式 RPC 框架结合使用 (也介绍了 RemoteModule)