分布式 RPC 框架¶
分布式 RPC 框架提供了一套用于多机模型训练的机制,通过一组用于远程通信的原语以及一个更高级别的 API 来自动微分跨多个机器拆分的模型。
警告
RPC 包中的 API 稳定。目前有许多正在进行的工作项来改进性能和错误处理,这些改进将在将来的版本中发布。
警告
CUDA 支持是在 PyTorch 1.9 中引入的,目前仍处于测试版阶段。RPC 包并非所有功能都与 CUDA 支持兼容,因此不建议使用这些功能。这些不受支持的功能包括:RRef、JIT 兼容性、dist autograd 和 dist 优化器以及性能分析。这些缺点将在未来的版本中得到解决。
注意
请参考 PyTorch 分布式概述,以简要介绍与分布式训练相关的所有功能。
基础知识¶
分布式 RPC 框架简化了远程运行函数的过程,支持在不复制实际数据的情况下引用远程对象,并提供自动微分和优化器 API,以透明地在 RPC 边界上执行反向传播和更新参数。这些功能可以分为四组 API。
远程过程调用 (RPC) 支持使用给定的参数在指定的目的地工作程序上运行函数,并获取返回值或创建对返回值的引用。有三个主要的 RPC API:
rpc_sync()
(同步)、rpc_async()
(异步)和remote()
(异步,并返回对远程返回值的引用)。如果用户代码在没有返回值的情况下无法继续,则使用同步 API。否则,使用异步 API 获取 future,并在需要返回值时在调用方等待 future。当需求是在远程创建某些东西,而无需将其获取到调用方时,remote()
API 非常有用。想象一下,驱动程序进程正在设置参数服务器和训练器。驱动程序可以在参数服务器上创建嵌入表,然后将对嵌入表的引用共享给训练器,但自身不会在本地使用嵌入表。在这种情况下,rpc_sync()
和rpc_async()
就不再合适了,因为它们总是意味着返回值将立即或在未来返回给调用方。远程引用 (RRef) 用作指向本地或远程对象的分布式共享指针。它可以与其他工作程序共享,并且引用计数将被透明地处理。每个 RRef 只有一个所有者,并且该对象仅存在于该所有者上。持有 RRef 的非所有者工作程序可以通过显式请求从所有者获取对象的副本。当工作程序需要访问某些数据对象,但自身既不是创建者(
remote()
的调用方)也不是对象的拥有者时,这很有用。分布式优化器,正如我们将在下面讨论的那样,就是这种用例的一个例子。分布式 Autograd 将所有参与正向传播的工作程序上的本地 autograd 引擎拼接在一起,并在反向传播期间自动联系它们以计算梯度。这在正向传播需要跨越多台机器进行时特别有用,例如在进行分布式模型并行训练、参数服务器训练等时。有了这个功能,用户代码不再需要担心如何跨 RPC 边界发送梯度以及应该以何种顺序启动本地 autograd 引擎,因为当正向传播中存在嵌套和相互依赖的 RPC 调用时,这可能会变得非常复杂。
分布式优化器 的构造函数接受一个
Optimizer()
(例如,SGD()
,Adagrad()
等)和一个参数 RRef 列表,在每个不同的 RRef 所有者上创建一个Optimizer()
实例,并在运行step()
时相应地更新参数。当您有分布式的前向和反向传递时,参数和梯度将散布在多个工作器上,因此需要在每个参与的工作器上有一个优化器。分布式优化器将所有这些本地优化器包装成一个,并提供简洁的构造函数和step()
API。
RPC¶
在使用 RPC 和分布式自动微分原语之前,必须进行初始化。要初始化 RPC 框架,我们需要使用 init_rpc()
,它将初始化 RPC 框架、RRef 框架和分布式自动微分。
- torch.distributed.rpc.init_rpc(name, backend=None, rank=-1, world_size=None, rpc_backend_options=None)[source]¶
初始化 RPC 原语,例如本地 RPC 代理和分布式自动微分,这会立即使当前进程准备好发送和接收 RPC。
- 参数
name (str) – 此节点的全局唯一名称。(例如,
Trainer3
,ParameterServer2
,Master
,Worker1
)名称只能包含数字、字母、下划线、冒号和/或连字符,并且必须小于 128 个字符。backend (BackendType, optional) – RPC 后端实现的类型。支持的值是
BackendType.TENSORPIPE
(默认)。有关更多信息,请参见 Backends。rank (int) – 此节点的全局唯一 ID/排名。
world_size (int) – 组中的工作器数量。
rpc_backend_options (RpcBackendOptions, optional) – 传递给 RpcAgent 构造函数的选项。它必须是
RpcBackendOptions
的代理特定子类,并且包含代理特定的初始化配置。默认情况下,对于所有代理,它将默认超时设置为 60 秒,并使用使用init_method = "env://"
初始化的基础进程组执行 rendezvous,这意味着需要正确设置环境变量MASTER_ADDR
和MASTER_PORT
。有关更多信息,请参见 Backends 查找哪些选项可用。
以下 API 允许用户远程执行函数以及创建对远程数据对象的引用(RRef)。在这些 API 中,当将 Tensor
作为参数或返回值传递时,目标工作器将尝试创建具有相同元数据(即形状、步幅等)的 Tensor
。我们有意禁止传输 CUDA 张量,因为如果源和目标工作器上的设备列表不匹配,它可能会崩溃。在这种情况下,应用程序始终可以显式地将输入张量移动到调用方的 CPU 上,并在需要时将它移动到被调用方的所需设备上。
警告
RPC 中的 TorchScript 支持是一个原型功能,可能会发生变化。从 v1.5.0 开始,torch.distributed.rpc
支持将 TorchScript 函数作为 RPC 目标函数调用,这将有助于提高被调用方侧面的并行性,因为执行 TorchScript 函数不需要 GIL。
- torch.distributed.rpc.rpc_sync(to, func, args=None, kwargs=None, timeout=-1.0)[source]¶
对工作器
to
上运行函数func
进行阻塞 RPC 调用。RPC 消息与 Python 代码的执行并行发送和接收。此方法是线程安全的。- 参数
to (str or WorkerInfo or int) – 目标工作器的名称/排名/
WorkerInfo
。func (Callable) – 可调用函数,例如 Python 可调用函数、内置运算符(例如
add()
)和带注释的 TorchScript 函数。args (tuple) –
func
调用的参数元组。kwargs (dict) – 是
func
调用的关键字参数字典。timeout (float, optional) – 此 RPC 的超时时间(以秒为单位)。如果 RPC 在此时间内未完成,则将引发一个指示它已超时的异常。值为 0 表示无限超时,即永远不会引发超时错误。如果未提供,则使用在初始化期间或使用
_set_rpc_timeout
设置的默认值。
- 返回值
返回使用
args
和kwargs
运行func
的结果。
- 示例:
确保在两个工作器上都正确设置了
MASTER_ADDR
和MASTER_PORT
。有关更多详细信息,请参阅init_process_group()
API。例如,export MASTER_ADDR=localhost export MASTER_PORT=5678
然后在两个不同的进程中运行以下代码
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3)) >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
下面是使用 RPC 运行 TorchScript 函数的示例。
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.rpc_async(to, func, args=None, kwargs=None, timeout=-1.0)[source]¶
对工作器
to
上运行函数func
进行非阻塞 RPC 调用。RPC 消息与 Python 代码的执行并行发送和接收。此方法是线程安全的。此方法将立即返回一个Future
,可以对其进行等待。- 参数
to (str or WorkerInfo or int) – 目标工作器的名称/排名/
WorkerInfo
。func (Callable) – 可调用函数,例如 Python 可调用函数、内置运算符(例如
add()
)和带注释的 TorchScript 函数。args (tuple) –
func
调用的参数元组。kwargs (dict) – 是
func
调用的关键字参数字典。timeout (float, optional) – 此 RPC 的超时时间(以秒为单位)。如果 RPC 在此时间内未完成,则将引发一个指示它已超时的异常。值为 0 表示无限超时,即永远不会引发超时错误。如果未提供,则使用在初始化期间或使用
_set_rpc_timeout
设置的默认值。
- 返回值
返回一个
Future
对象,可以对其进行等待。完成时,可以从Future
对象中检索在args
和kwargs
上运行func
的返回值。
警告
由于不支持通过网络传输 GPU 张量,因此不支持使用 GPU 张量作为
func
的参数或返回值。您需要显式地将 GPU 张量复制到 CPU,然后才能将其用作func
的参数或返回值。警告
rpc_async
API 不会在通过网络发送参数张量之前复制其存储空间,这可能会由不同的线程完成,具体取决于 RPC 后端类型。调用者应确保这些张量的内容在返回的Future
完成之前保持完整。- 示例:
确保在两个工作器上都正确设置了
MASTER_ADDR
和MASTER_PORT
。有关更多详细信息,请参阅init_process_group()
API。例如,export MASTER_ADDR=localhost export MASTER_PORT=5678
然后在两个不同的进程中运行以下代码
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3)) >>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2)) >>> result = fut1.wait() + fut2.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
下面是使用 RPC 运行 TorchScript 函数的示例。
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3)) >>> ret = fut.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.remote(to, func, args=None, kwargs=None, timeout=-1.0)[source]¶
在工作节点
to
上远程调用运行func
,并立即返回指向结果值的RRef
。工作节点to
将是返回的RRef
的所有者,而调用remote
的工作节点是用户。所有者管理其RRef
的全局引用计数,并且只有在全局范围内没有对它的有效引用时,所有者RRef
才会被销毁。- 参数
to (str or WorkerInfo or int) – 目标工作器的名称/排名/
WorkerInfo
。func (Callable) – 可调用函数,例如 Python 可调用函数、内置运算符(例如
add()
)和带注释的 TorchScript 函数。args (tuple) –
func
调用的参数元组。kwargs (dict) – 是
func
调用的关键字参数字典。timeout (float, optional) – 此远程调用的超时时间(以秒为单位)。如果在工作节点
to
上创建此RRef
未能在此超时时间内在此工作节点上成功处理,那么下次尝试使用 RRef(例如to_here()
)时,将引发超时错误,指示此错误。值为 0 表示无限超时,即永远不会引发超时错误。如果没有提供,则使用初始化时或使用_set_rpc_timeout
设置的默认值。
- 返回值
指向结果值的 用户
RRef
实例。使用阻塞式 APItorch.distributed.rpc.RRef.to_here()
在本地检索结果值。
警告
remote
API 不会在通过网络发送参数张量之前复制其存储空间,这可能会由不同的线程完成,具体取决于 RPC 后端类型。调用者应确保这些张量的内容在所有者确认返回的 RRef 之前保持完整,这可以使用torch.distributed.rpc.RRef.confirmed_by_owner()
API 进行检查。警告
如
remote
API 的超时等错误以尽力而为的方式处理。这意味着当由remote
启动的远程调用失败时,例如出现超时错误,我们会采取尽力而为的错误处理方法。这意味着错误会异步处理并设置在结果 RRef 上。如果用户应用程序在处理之前尚未使用 RRef(例如to_here
或 fork 调用),那么将来使用RRef
将会适当地引发错误。但是,用户应用程序可能在处理错误之前使用RRef
。在这种情况下,错误可能不会被引发,因为它们尚未被处理。示例
Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly on both workers. Refer to :meth:`~torch.distributed.init_process_group` API for more details. For example, export MASTER_ADDR=localhost export MASTER_PORT=5678 Then run the following code in two different processes: >>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1)) >>> x = rref1.to_here() + rref2.to_here() >>> rpc.shutdown() >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() Below is an example of running a TorchScript function using RPC. >>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar) >>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rref.to_here() >>> rpc.shutdown() >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.get_worker_info(worker_name=None)[source]¶
获取给定工作节点名称的
WorkerInfo
。使用此WorkerInfo
以避免在每次调用时传递一个代价高昂的字符串。- 参数
worker_name (str) – 工作节点的字符串名称。如果为
None
,则返回当前工作节点的 ID。(默认值为None
)- 返回值
给定
worker_name
的WorkerInfo
实例,或如果worker_name
为None
,则为当前工作节点的WorkerInfo
。
- torch.distributed.rpc.shutdown(graceful=True, timeout=0)[source]¶
执行 RPC 代理的关闭,然后销毁 RPC 代理。这会停止本地代理接受未完成的请求,并通过终止所有 RPC 线程来关闭 RPC 框架。如果
graceful=True
,这将阻塞,直到所有本地和远程 RPC 进程都到达此方法并等待所有未完成的工作完成。否则,如果graceful=False
,这是一个本地关闭,并且不会等待其他 RPC 进程到达此方法。警告
对于由
Future
对象返回的rpc_async()
,future.wait()
不应在shutdown()
之后调用。- 参数
graceful (bool) – 是否执行优雅关闭。如果为 True,这将 1) 等待直到没有
UserRRefs
的待处理系统消息并删除它们;2) 阻塞,直到所有本地和远程 RPC 进程都到达此方法并等待所有未完成的工作完成。
- 示例:
确保在两个工作器上都正确设置了
MASTER_ADDR
和MASTER_PORT
。有关更多详细信息,请参阅init_process_group()
API。例如,export MASTER_ADDR=localhost export MASTER_PORT=5678
然后在两个不同的进程中运行以下代码
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> # do some work >>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1)) >>> # ready to shutdown >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> # wait for worker 0 to finish work, and then shutdown. >>> rpc.shutdown()
- class torch.distributed.rpc.WorkerInfo¶
封装系统中工作节点信息的结构。包含工作节点的名称和 ID。不应直接构造此类,而是可以通过
get_worker_info()
检索实例,并将结果传递给rpc_sync()
、rpc_async()
、remote()
等函数,以避免在每次调用时复制字符串。- property id¶
全局唯一 ID,用于标识工作节点。
- property name¶
工作节点的名称。
RPC 包还提供装饰器,允许应用程序指定在被调用方侧如何处理给定函数。
- torch.distributed.rpc.functions.async_execution(fn)[source]¶
用于函数的装饰器,表示该函数的返回值保证为
Future
对象,并且该函数可以在 RPC 被调用方上异步运行。更具体地说,被调用方会提取由包装函数返回的Future
,并将后续的处理步骤安装为该Future
的回调。安装的回调会在完成时从Future
读取值,并将该值作为 RPC 响应发送回。这也意味着返回的Future
仅存在于被调用方侧,并且永远不会通过 RPC 发送。当包装函数 (fn
) 的执行需要暂停和恢复时,此装饰器很有用,例如,包含rpc_async()
或等待其他信号。注意
为了启用异步执行,应用程序必须将此装饰器返回的函数对象传递给 RPC API。如果 RPC 检测到此装饰器安装的属性,它就知道此函数返回一个
Future
对象,并会相应地进行处理。但是,这并不意味着此装饰器必须是定义函数时的最外层装饰器。例如,当与@staticmethod
或@classmethod
结合使用时,@rpc.functions.async_execution
需要是内部装饰器,以允许目标函数被识别为静态或类函数。此目标函数仍然可以异步执行,因为当访问时,静态或类方法会保留@rpc.functions.async_execution
安装的属性。- 示例:
返回的
Future
对象可以来自rpc_async()
、then()
或Future
构造函数。下面的示例演示了直接使用Future
,该对象由then()
返回。>>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> @rpc.functions.async_execution >>> def async_add_chained(to, x, y, z): >>> # This function runs on "worker1" and returns immediately when >>> # the callback is installed through the `then(cb)` API. In the >>> # mean time, the `rpc_async` to "worker2" can run concurrently. >>> # When the return value of that `rpc_async` arrives at >>> # "worker1", "worker1" will run the lambda function accordingly >>> # and set the value for the previously returned `Future`, which >>> # will then trigger RPC to send the result back to "worker0". >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> async_add_chained, >>> args=("worker2", torch.ones(2), 1, 1) >>> ) >>> print(ret) # prints tensor([3., 3.])
当与 TorchScript 装饰器结合使用时,此装饰器必须是最外层的装饰器。
>>> from torch import Tensor >>> from torch.futures import Future >>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> @torch.jit.script >>> def script_add(x: Tensor, y: Tensor) -> Tensor: >>> return x + y >>> >>> @rpc.functions.async_execution >>> @torch.jit.script >>> def async_add(to: str, x: Tensor, y: Tensor) -> Future[Tensor]: >>> return rpc.rpc_async(to, script_add, (x, y)) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> async_add, >>> args=("worker2", torch.ones(2), 1) >>> ) >>> print(ret) # prints tensor([2., 2.])
当与静态或类方法结合使用时,此装饰器必须是内部装饰器。
>>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> class AsyncExecutionClass: >>> >>> @staticmethod >>> @rpc.functions.async_execution >>> def static_async_add(to, x, y, z): >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> @classmethod >>> @rpc.functions.async_execution >>> def class_async_add(cls, to, x, y, z): >>> ret_fut = torch.futures.Future() >>> rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: ret_fut.set_result(fut.wait() + z) >>> ) >>> return ret_fut >>> >>> @rpc.functions.async_execution >>> def bound_async_add(self, to, x, y, z): >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> AsyncExecutionClass.static_async_add, >>> args=("worker2", torch.ones(2), 1, 2) >>> ) >>> print(ret) # prints tensor([4., 4.]) >>> >>> ret = rpc.rpc_sync( >>> "worker1", >>> AsyncExecutionClass.class_async_add, >>> args=("worker2", torch.ones(2), 1, 2) >>> ) >>> print(ret) # prints tensor([4., 4.])
此装饰器也适用于 RRef 助手,即 .
torch.distributed.rpc.RRef.rpc_sync()
、torch.distributed.rpc.RRef.rpc_async()
和torch.distributed.rpc.RRef.remote()
。>>> from torch.distributed import rpc >>> >>> # reuse the AsyncExecutionClass class above >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.rpc_sync().static_async_add("worker2", torch.ones(2), 1, 2) >>> print(ret) # prints tensor([4., 4.]) >>> >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.rpc_async().static_async_add("worker2", torch.ones(2), 1, 2).wait() >>> print(ret) # prints tensor([4., 4.]) >>> >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.remote().static_async_add("worker2", torch.ones(2), 1, 2).to_here() >>> print(ret) # prints tensor([4., 4.])
后端¶
RPC 模块可以利用不同的后端来执行节点之间的通信。要使用的后端可以在 init_rpc()
函数中指定,方法是传递 BackendType
枚举的某个值。无论使用哪种后端,RPC API 的其余部分都不会改变。每个后端还定义了其自己的 RpcBackendOptions
类子类,可以将该类子类的一个实例传递给 init_rpc()
来配置后端的行为。
- class torch.distributed.rpc.BackendType(value)¶
可用后端的枚举类。
PyTorch 附带内置的
BackendType.TENSORPIPE
后端。可以使用register_backend()
函数注册其他后端。
- class torch.distributed.rpc.RpcBackendOptions¶
一个抽象结构,封装传递到 RPC 后端的选项。可以将此类的实例传递到
init_rpc()
中,以便使用特定配置初始化 RPC,例如要使用的 RPC 超时和init_method
。- property init_method¶
指定如何初始化进程组的 URL。默认值为
env://
- property rpc_timeout¶
一个浮点数,指示所有 RPC 使用的超时时间。如果 RPC 在此时间范围内未完成,它将使用指示超时异常完成。
TensorPipe 后端¶
TensorPipe 代理(默认)利用 TensorPipe 库,该库提供了一种专门适用于机器学习的原生点对点通信原语,从根本上解决了 Gloo 的一些限制。与 Gloo 相比,它的优势在于它是异步的,这允许大量传输同时发生,每个传输以自己的速度进行,互不阻塞。它只会在需要时按需在节点对之间打开管道,并且当一个节点失败时,只有与它相连的管道才会关闭,而所有其他管道将继续正常工作。此外,它能够支持多种不同的传输(当然还有 TCP,还有共享内存、NVLink、InfiniBand 等),并且可以自动检测它们的可用性并协商用于每个管道的最佳传输。
TensorPipe 后端是在 PyTorch v1.6 中引入的,并且正在积极开发中。目前,它只支持 CPU 张量,GPU 支持即将推出。它附带了基于 TCP 的传输,就像 Gloo 一样。它还能够自动将大型张量切片和复用到多个套接字和线程上,以实现非常高的带宽。代理将能够自行选择最佳传输,无需任何干预。
示例
>>> import os
>>> from torch.distributed import rpc
>>> os.environ['MASTER_ADDR'] = 'localhost'
>>> os.environ['MASTER_PORT'] = '29500'
>>>
>>> rpc.init_rpc(
>>> "worker1",
>>> rank=0,
>>> world_size=2,
>>> rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
>>> num_worker_threads=8,
>>> rpc_timeout=20 # 20 second timeout
>>> )
>>> )
>>>
>>> # omitting init_rpc invocation on worker2
- class torch.distributed.rpc.TensorPipeRpcBackendOptions(*, num_worker_threads=16, rpc_timeout=60.0, init_method='env://', device_maps=None, devices=None, _transports=None, _channels=None)[source]¶
用于
TensorPipeAgent
的后端选项,派生自RpcBackendOptions
。- 参数
num_worker_threads (int, optional) –
TensorPipeAgent
用于执行请求的线程池中的线程数(默认值:16)。rpc_timeout (float, optional) – RPC 请求的默认超时时间,以秒为单位(默认值:60 秒)。如果 RPC 在此时间范围内未完成,则将引发指示该情况的异常。调用者可以在必要时在
rpc_sync()
和rpc_async()
中为单个 RPC 覆盖此超时时间。init_method (str, optional) – 用于初始化用于会合的分布式存储的 URL。它接受
init_process_group()
的相同参数的任何值(默认值:env://
)。device_maps (Dict[str, Dict], optional) – 从此工作器到被调用者的设备放置映射。键是被调用者的工作器名称,值是字典 (
Dict
ofint
,str
, ortorch.device
),它将此工作器的设备映射到被调用者的工作器的设备。(默认值:None
)devices (List[int, str, or
torch.device
], optional) – RPC 代理使用的所有本地 CUDA 设备。默认情况下,它将被初始化为来自其自身device_maps
的所有本地设备及其对等体device_maps
中的相应设备。在处理 CUDA RPC 请求时,代理将适当地同步此List
中所有设备的 CUDA 流。
- property device_maps¶
设备映射位置。
- property devices¶
本地代理使用的所有设备。
- property init_method¶
指定如何初始化进程组的 URL。默认值为
env://
- property num_worker_threads¶
TensorPipeAgent
用于执行请求的线程池中的线程数。
- property rpc_timeout¶
一个浮点数,指示所有 RPC 使用的超时时间。如果 RPC 在此时间范围内未完成,它将使用指示超时异常完成。
- set_device_map(to, device_map)[source]¶
设置每个 RPC 调用者和被调用者对之间的设备映射。此函数可以多次调用以增量添加设备放置配置。
- 参数
to (str) – 被调用者名称。
device_map (Dict of int, str, or torch.device) – 从此工作器到被调用者的设备放置映射。此映射必须是可逆的。
示例
>>> # both workers >>> def add(x, y): >>> print(x) # tensor([1., 1.], device='cuda:1') >>> return x + y, (x + y).to(2) >>> >>> # on worker 0 >>> options = TensorPipeRpcBackendOptions( >>> num_worker_threads=8, >>> device_maps={"worker1": {0: 1}} >>> # maps worker0's cuda:0 to worker1's cuda:1 >>> ) >>> options.set_device_map("worker1", {1: 2}) >>> # maps worker0's cuda:1 to worker1's cuda:2 >>> >>> rpc.init_rpc( >>> "worker0", >>> rank=0, >>> world_size=2, >>> backend=rpc.BackendType.TENSORPIPE, >>> rpc_backend_options=options >>> ) >>> >>> x = torch.ones(2) >>> rets = rpc.rpc_sync("worker1", add, args=(x.to(0), 1)) >>> # The first argument will be moved to cuda:1 on worker1. When >>> # sending the return value back, it will follow the invert of >>> # the device map, and hence will be moved back to cuda:0 and >>> # cuda:1 on worker0 >>> print(rets[0]) # tensor([2., 2.], device='cuda:0') >>> print(rets[1]) # tensor([2., 2.], device='cuda:1')
- set_devices(devices)[source]¶
设置 TensorPipe RPC 代理使用的本地设备。在处理 CUDA RPC 请求时,TensorPipe RPC 代理将为
List
中的所有设备正确同步 CUDA 流。- 参数
devices (List of int, str, or torch.device) – TensorPipe RPC 代理使用的本地设备。
注意
RPC 框架不会自动重试任何 rpc_sync()
、rpc_async()
和 remote()
调用。原因是 RPC 框架无法确定操作是否幂等以及是否可以安全重试。因此,应用程序有责任处理故障,并在必要时重试。RPC 通信基于 TCP,因此可能会因网络故障或间歇性网络连接问题而发生故障。在这种情况下,应用程序需要以合理的回退策略适当地重试,以确保网络不会因过于激进的重试而不堪重负。
RRef¶
警告
在使用 CUDA 张量时,目前不支持 RRef
一个 RRef
(远程引用)是对远程工作器上某种类型 T
(例如 Tensor
)的值的引用。此句柄使被引用的远程值在所有者上保持活动状态,但这并不意味着该值将在将来传输到本地工作器。RRef 可用于多机器训练,方法是保存对其他工作器上 nn.Modules 的引用,并在训练期间调用相应的函数来检索或修改其参数。有关更多详细信息,请参阅 远程引用协议。
- class torch.distributed.rpc.PyRRef(RRef)¶
一个封装了对远程工作器上某种类型的值的引用的类。此句柄将使被引用的远程值在工作器上保持活动状态。当 1) 应用程序代码和本地 RRef 上下文中都没有对它的引用,或者 2) 应用程序已调用了正常关闭时,
UserRRef
将被删除。在已删除的 RRef 上调用方法会导致未定义的行为。RRef 实现仅提供尽力而为的错误检测,应用程序不应在rpc.shutdown()
之后使用UserRefs
。警告
RRef 只能通过 RPC 模块进行序列化和反序列化。在没有 RPC 的情况下序列化和反序列化 RRef(例如,Python pickle、torch
save()
/load()
、JITsave()
/load()
等)会导致错误。- 参数
value (object) – 要由此 RRef 包装的值。
type_hint (Type, optional) – 应传递给
TorchScript
编译器的 Python 类型,作为value
的类型提示。
- 示例:
以下示例为简便起见省略了 RPC 初始化和关闭代码。有关这些详细信息,请参考 RPC 文档。
使用 rpc.remote 创建 RRef
>>> import torch >>> import torch.distributed.rpc as rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> # get a copy of value from the RRef >>> x = rref.to_here()
从本地对象创建 RRef
>>> import torch >>> from torch.distributed.rpc import RRef >>> x = torch.zeros(2, 2) >>> rref = RRef(x)
与其他工作器共享 RRef
>>> # On both worker0 and worker1: >>> def f(rref): >>> return rref.to_here() + 1
>>> # On worker0: >>> import torch >>> import torch.distributed.rpc as rpc >>> from torch.distributed.rpc import RRef >>> rref = RRef(torch.zeros(2, 2)) >>> # the following RPC shares the rref with worker1, reference >>> # count is automatically updated. >>> rpc.rpc_sync("worker1", f, args=(rref,))
- backward(self: torch._C._distributed_rpc.PyRRef, dist_autograd_ctx_id: int = -1, retain_graph: bool = False) None ¶
使用 RRef 作为反向传播的根节点运行反向传播。如果提供了
dist_autograd_ctx_id
,我们将使用提供的 ctx_id 从 RRef 的所有者开始执行分布式反向传播。在这种情况下,应使用get_gradients()
检索梯度。如果dist_autograd_ctx_id
为None
,则假定这是一个本地自动微分图,并且我们仅执行本地反向传播。在本地情况下,调用此 API 的节点必须是 RRef 的所有者。RRef 的值应为标量张量。- 参数
- 示例:
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> rref.backward(context_id)
- confirmed_by_owner(self: torch._C._distributed_rpc.PyRRef) bool ¶
返回此
RRef
是否已由所有者确认。OwnerRRef
始终返回 true,而UserRRef
仅在所有者知道此UserRRef
时返回 true。
- is_owner(self: torch._C._distributed_rpc.PyRRef) bool ¶
返回当前节点是否为此
RRef
的所有者。
- local_value(self: torch._C._distributed_rpc.PyRRef) object ¶
如果当前节点是所有者,则返回对本地值的引用。否则,将引发异常。
- owner(self: torch._C._distributed_rpc.PyRRef) torch._C._distributed_rpc.WorkerInfo ¶
返回拥有此
RRef
的节点的 worker 信息。
- owner_name(self: torch._C._distributed_rpc.PyRRef) str ¶
返回拥有此
RRef
的节点的 worker 名称。
- remote(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object ¶
创建一个辅助代理,以便使用 RRef 所有者作为目标轻松启动
remote
,以在由此 RRef 引用的对象上运行函数。 更准确地说,rref.remote().func_name(*args, **kwargs)
等同于以下内容>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.remote(rref.owner(), run, args=(rref, func_name, args, kwargs))
- 参数
timeout (float, optional) –
rref.remote()
的超时时间。 如果在此超时时间内未成功完成此RRef
的创建,则下次尝试使用 RRef(例如to_here
)时,将引发超时。 如果未提供,将使用默认 RPC 超时时间。 请参阅rpc.remote()
以了解RRef
的特定超时语义。
- 示例:
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.remote().size().to_here() # returns torch.Size([2, 2]) >>> rref.remote().view(1, 4).to_here() # returns tensor([[1., 1., 1., 1.]])
- rpc_async(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object ¶
创建一个辅助代理,以便使用 RRef 所有者作为目标轻松启动
rpc_async
,以在由此 RRef 引用的对象上运行函数。 更准确地说,rref.rpc_async().func_name(*args, **kwargs)
等同于以下内容>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.rpc_async(rref.owner(), run, args=(rref, func_name, args, kwargs))
- 参数
timeout (float, optional) –
rref.rpc_async()
的超时时间。 如果调用未在此时间范围内完成,则会引发指示此情况的异常。 如果未提供此参数,将使用默认 RPC 超时时间。
- 示例:
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.rpc_async().size().wait() # returns torch.Size([2, 2]) >>> rref.rpc_async().view(1, 4).wait() # returns tensor([[1., 1., 1., 1.]])
- rpc_sync(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object ¶
创建一个辅助代理,以便使用 RRef 所有者作为目标轻松启动
rpc_sync
,以在由此 RRef 引用的对象上运行函数。 更准确地说,rref.rpc_sync().func_name(*args, **kwargs)
等同于以下内容>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.rpc_sync(rref.owner(), run, args=(rref, func_name, args, kwargs))
- 参数
timeout (float, optional) –
rref.rpc_sync()
的超时时间。 如果调用未在此时间范围内完成,则会引发指示此情况的异常。 如果未提供此参数,将使用默认 RPC 超时时间。
- 示例:
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.rpc_sync().size() # returns torch.Size([2, 2]) >>> rref.rpc_sync().view(1, 4) # returns tensor([[1., 1., 1., 1.]])
- to_here(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object ¶
阻塞调用,将 RRef 的值从所有者复制到本地节点并返回。 如果当前节点是所有者,则返回对本地值的引用。
- 参数
timeout (float, optional) –
to_here
的超时时间。 如果调用未在此时间范围内完成,则会引发指示此情况的异常。 如果未提供此参数,将使用默认 RPC 超时时间(60 秒)。
RemoteModule¶
警告
在使用 CUDA 张量时,目前不支持 RemoteModule
RemoteModule
是一种在不同进程上远程创建 nn.Module 的简便方法。 实际模块驻留在远程主机上,但本地主机拥有此模块的句柄,并类似于常规 nn.Module 调用此模块。 但是,调用需要对远程端进行 RPC 调用,并且如果需要,可以通过 RemoteModule 支持的其他 API 异步执行。
- class torch.distributed.nn.api.remote_module.RemoteModule(*args, **kwargs)[source]¶
仅在 RPC 初始化后才能创建 RemoteModule 实例。
它在指定的远程节点上创建用户指定的模块。 它类似于常规的
nn.Module
,不同之处在于forward
方法在远程节点上执行。 它负责自动梯度记录,以确保反向传播将梯度传播回相应的远程模块。它根据
module_cls
的forward
方法的签名生成两个方法forward_async
和forward
。forward_async
异步运行并返回一个 Future。forward_async
和forward
的参数与module_cls
返回的模块的forward
方法相同。例如,如果
module_cls
返回nn.Linear
的实例,该实例具有forward
方法签名:def forward(input: Tensor) -> Tensor:
,则生成的RemoteModule
将具有 2 个具有以下签名的方法def forward(input: Tensor) -> Tensor:
def forward_async(input: Tensor) -> Future[Tensor]:
- 参数
remote_device (str) – 目标 worker 上我们要放置此模块的设备。 格式应为“<workername>/<device>”,其中设备字段可以解析为 torch.device 类型。 例如,“trainer0/cpu”、“trainer0”、“ps0/cuda:0”。 此外,设备字段可以是可选的,默认值为“cpu”。
module_cls (nn.Module) –
要远程创建的模块的类。 例如,
>>> class MyModule(nn.Module): >>> def forward(input): >>> return input + 1 >>> >>> module_cls = MyModule
args (Sequence, optional) – 要传递给
module_cls
的参数。kwargs (Dict, optional) – 要传递给
module_cls
的关键字参数。
- 返回值
一个远程模块实例,它包装了由用户提供的
Module
创建的module_cls
,它有一个阻塞的forward
方法和一个异步的forward_async
方法,该方法返回远程侧用户提供的模块上forward
调用的未来。
- 示例:
在两个不同的进程中运行以下代码
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> from torch import nn, Tensor >>> from torch.distributed.nn.api.remote_module import RemoteModule >>> >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> remote_linear_module = RemoteModule( >>> "worker1/cpu", nn.Linear, args=(20, 30), >>> ) >>> input = torch.randn(128, 20) >>> ret_fut = remote_linear_module.forward_async(input) >>> ret = ret_fut.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch >>> import torch.distributed.rpc as rpc >>> >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- remote_parameters(recurse=True)¶
返回一个指向远程模块参数的
RRef
列表。这通常可以与
DistributedOptimizer
结合使用。
分布式自动微分框架¶
警告
使用 CUDA 张量时,目前不支持分布式自动微分
此模块提供了一个基于 RPC 的分布式自动微分框架,可用于模型并行训练等应用。简而言之,应用程序可以通过 RPC 发送和接收梯度记录张量。在正向传递中,我们记录何时将梯度记录张量通过 RPC 发送,并在反向传递期间使用此信息通过 RPC 执行分布式反向传递。有关更多详细信息,请参阅 分布式自动微分设计。
- torch.distributed.autograd.backward(context_id: int, roots: List[Tensor], retain_graph=False) None ¶
使用提供的根启动分布式反向传递。这目前实现了 FAST 模式算法,该算法假设在同一分布式自动微分上下文中跨工作器发送的所有 RPC 消息在反向传递期间将成为自动微分图的一部分。
我们使用提供的根来发现自动微分图并计算适当的依赖关系。此方法会阻塞,直到整个自动微分计算完成。
我们在每个节点上的适当
torch.distributed.autograd.context
中累积梯度。将根据调用torch.distributed.autograd.backward()
时传入的context_id
来查找要使用的自动微分上下文。如果与给定 ID 相对应的有效自动微分上下文不存在,我们将抛出错误。您可以使用get_gradients()
API 检索累积的梯度。- 参数
- 示例:
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> pred = model.forward() >>> loss = loss_func(pred, loss) >>> dist_autograd.backward(context_id, loss)
- class torch.distributed.autograd.context[source]¶
在使用分布式自动微分时,上下文对象用于包装正向和反向传递。在
with
语句中生成的context_id
对于唯一地识别所有工作器上的分布式反向传递是必需的。每个工作器都存储与该context_id
关联的元数据,这对于正确执行分布式自动微分传递是必需的。- 示例:
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> loss = rpc.rpc_sync("worker1", torch.add, args=(t1, t2)).sum() >>> dist_autograd.backward(context_id, [loss])
- torch.distributed.autograd.get_gradients(context_id: int) Dict[Tensor, Tensor] ¶
检索一个从张量到该张量的相应梯度的映射,该梯度在给定
context_id
的分布式自动微分反向传递过程中累积在提供的上下文中。- 参数
context_id (int) – 我们应该检索梯度的自动微分上下文 ID。
- 返回值
一个映射,其中键是张量,值是该张量的关联梯度。
- 示例:
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> loss = t1 + t2 >>> dist_autograd.backward(context_id, [loss.sum()]) >>> grads = dist_autograd.get_gradients(context_id) >>> print(grads[t1]) >>> print(grads[t2])
分布式优化器¶
有关分布式优化器的文档,请参阅 torch.distributed.optim 页面。
设计说明¶
分布式自动微分设计说明涵盖了基于 RPC 的分布式自动微分框架的设计,该框架对于模型并行训练等应用很有用。
RRef 设计说明涵盖了 RRef (远程引用) 协议的设计,该协议用于通过框架引用远程工作器上的值。
教程¶
RPC 教程向用户介绍了 RPC 框架,提供了几个使用 torch.distributed.rpc API 的示例应用程序,并演示了如何使用 探查器 来探查基于 RPC 的工作负载。
将分布式数据并行与分布式 RPC 框架结合使用 (也涵盖了 RemoteModule)