分布式 RPC 框架¶
分布式 RPC 框架通过一组原语为多机模型训练提供了机制,以实现远程通信,并提供更高级别的 API 来自动区分跨多台机器拆分的模型。
警告
RPC 包中的 API 是稳定的。有多个正在进行的工作项目旨在提高性能和错误处理能力,这些将在未来的版本中发布。
警告
PyTorch 1.9 引入了 CUDA 支持,目前仍是 测试版 功能。并非 RPC 包中的所有功能都已兼容 CUDA 支持,因此不建议使用这些功能。这些不支持的功能包括:RRef、JIT 兼容性、分布式 autograd 和分布式 optimizer 以及性能分析。这些不足之处将在未来的版本中解决。
注意
请参阅 PyTorch 分布式概述 以简要了解与分布式训练相关的所有功能。
基础知识¶
分布式 RPC 框架使得远程运行函数变得容易,支持引用远程对象而无需复制实际数据,并提供 autograd 和 optimizer API 以透明地跨 RPC 边界运行反向传播和更新参数。这些功能可以分为四组 API。
远程过程调用 (RPC) 支持在指定的目标 worker 上运行带有所提供参数的函数,并获取返回值或创建对返回值的引用。主要有三个 RPC API:
rpc_sync()
(同步)、rpc_async()
(异步) 和remote()
(异步并返回对远程返回值的引用)。如果用户代码在没有返回值的情况下无法继续执行,则使用同步 API。否则,使用异步 API 获取一个 Future,并在调用者需要返回值时等待该 Future。remote()
API 在需要远程创建某个对象但永远不需要将其获取到调用者时非常有用。想象一个场景:一个驱动进程正在设置参数服务器和 trainer。驱动程序可以在参数服务器上创建一个 embedding 表,然后与 trainer 共享对该 embedding 表的引用,但自身永远不会在本地使用该 embedding 表。在这种情况下,rpc_sync()
和rpc_async()
就不再合适了,因为它们总是意味着返回值将立即或在将来返回到调用者。远程引用 (RRef) 充当本地或远程对象的分布式共享指针。它可以与其他 worker 共享,引用计数将透明地处理。每个 RRef 只有一个所有者,对象仅存在于该所有者上。持有 RRef 的非所有者 worker 可以通过显式请求从所有者处获取对象的副本。当 worker 需要访问某个数据对象,但自身既不是创建者(即
remote()
的调用者)也不是该对象的所有者时,这很有用。分布式 optimizer,我们将在下面讨论,是此类用例的一个示例。分布式 Autograd 将参与前向传播的所有 worker 上的本地 autograd 引擎连接起来,并在反向传播期间自动联系它们来计算梯度。如果前向传播需要跨越多个机器进行,例如分布式模型并行训练、参数服务器训练等,这将特别有帮助。有了这项功能,用户代码不再需要担心如何在 RPC 边界上传送梯度以及以何种顺序启动本地 autograd 引擎,当前向传播中存在嵌套和相互依赖的 RPC 调用时,这可能会变得相当复杂。
分布式 Optimizer 的构造函数接受一个
Optimizer()
(例如,SGD()
、Adagrad()
等) 和参数 RRef 列表,在每个不同的 RRef 所有者上创建一个Optimizer()
实例,并在运行step()
时相应地更新参数。当您进行分布式前向和反向传播时,参数和梯度将分散到多个 worker 上,因此需要在每个参与的 worker 上都有一个 optimizer。分布式 Optimizer 将所有这些本地 optimizer 包装在一起,并提供了一个简洁的构造函数和step()
API。
RPC¶
在使用 RPC 和分布式 autograd 原语之前,必须先进行初始化。要初始化 RPC 框架,我们需要使用 init_rpc()
,这将初始化 RPC 框架、RRef 框架和分布式 autograd。
- torch.distributed.rpc.init_rpc(name, backend=None, rank=-1, world_size=None, rpc_backend_options=None)[source][source]¶
初始化 RPC 原语,例如本地 RPC 代理和分布式 autograd,这会立即使当前进程准备好发送和接收 RPC。
- 参数
name (str) – 此节点的全局唯一名称。(例如,
Trainer3
、ParameterServer2
、Master
、Worker1
) 名称只能包含数字、字母、下划线、冒号和/或短划线,且必须短于 128 个字符。backend (BackendType, 可选的) – RPC 后端实现的类型。支持的值为
BackendType.TENSORPIPE
(默认值)。有关更多信息,请参阅 Backends。rank (int) – 此节点的全局唯一 ID/级别。
world_size (int) – 组中的 worker 数量。
rpc_backend_options (RpcBackendOptions, 可选的) – 传递给 RpcAgent 构造函数的选项。它必须是
RpcBackendOptions
的特定于代理的子类,并包含特定于代理的初始化配置。默认情况下,对于所有代理,它将默认超时设置为 60 秒,并使用init_method = "env://"
初始化底层进程组来执行 rendezvous,这意味着环境变量MASTER_ADDR
和MASTER_PORT
需要正确设置。有关更多信息,请参阅 Backends 并找到哪些选项可用。
以下 API 允许用户远程执行函数以及创建对远程数据对象的引用 (RRef)。在这些 API 中,当传递一个 Tensor
作为参数或返回值时,目标 worker 将尝试创建一个具有相同元数据(即 shape
、stride
等)的 Tensor
。我们有意不允许传输 CUDA Tensor,因为如果源 worker 和目标 worker 上的设备列表不匹配,可能会导致崩溃。在这种情况下,应用程序总是可以在调用者端显式地将输入 Tensor 移动到 CPU,并在需要时在被调用者端将其移动到期望的设备上。
警告
RPC 中的 TorchScript 支持是原型功能,可能会更改。自 v1.5.0 起,torch.distributed.rpc
支持将 TorchScript 函数作为 RPC 目标函数调用,这将有助于提高被调用者端的并行性,因为执行 TorchScript 函数不需要 GIL。
- torch.distributed.rpc.rpc_sync(to, func, args=None, kwargs=None, timeout=-1.0)[source][source]¶
发起阻塞 RPC 调用,以在 worker
to
上运行函数func
。RPC 消息与 Python 代码的执行并行发送和接收。此方法是线程安全的。- 参数
to (str 或 WorkerInfo 或 int) – 目标 worker 的名称/级别/
WorkerInfo
。func (可调用对象) – 可调用函数,例如 Python 可调用对象、内置算子 (例如
add()
) 和注解的 TorchScript 函数。args (tuple) –
func
调用的参数元组。kwargs (dict) – 是
func
调用的关键字参数字典。timeout (float, 可选的) – 此 RPC 的超时秒数。如果 RPC 未在此时间内完成,则将抛出表示超时的异常。值为 0 表示无限超时,即永远不会抛出超时错误。如果未提供,则使用在初始化期间或通过
_set_rpc_timeout
设置的默认值。
- 返回
返回使用
args
和kwargs
运行func
的结果。
- 示例:
确保在两个 worker 上正确设置了
MASTER_ADDR
和MASTER_PORT
。有关更多详细信息,请参阅init_process_group()
API。例如,export MASTER_ADDR=localhost export MASTER_PORT=5678
然后在两个不同的进程中运行以下代码
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(2), 3)) >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
以下是使用 RPC 运行 TorchScript 函数的示例。
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> ret = rpc.rpc_sync("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.rpc_async(to, func, args=None, kwargs=None, timeout=-1.0)[source][source]¶
发起非阻塞 RPC 调用,以在 worker
to
上运行函数func
。RPC 消息与 Python 代码的执行并行发送和接收。此方法是线程安全的。此方法将立即返回一个Future
,可以等待(await)的对象。- 参数
to (str 或 WorkerInfo 或 int) – 目标 worker 的名称/级别/
WorkerInfo
。func (可调用对象) – 可调用函数,例如 Python 可调用对象、内置算子 (例如
add()
) 和注解的 TorchScript 函数。args (tuple) –
func
调用的参数元组。kwargs (dict) – 是
func
调用的关键字参数字典。timeout (float, 可选的) – 此 RPC 的超时秒数。如果 RPC 未在此时间内完成,则将抛出表示超时的异常。值为 0 表示无限超时,即永远不会抛出超时错误。如果未提供,则使用在初始化期间或通过
_set_rpc_timeout
设置的默认值。
- 返回
返回一个
Future
对象,可以等待。完成后,可以从Future
对象中检索使用args
和kwargs
对func
的调用的返回值。
警告
不支持将 GPU Tensor 用作
func
的参数或返回值,因为我们不支持通过网络发送 GPU Tensor。您需要在将 GPU Tensor 用作func
的参数或返回值之前,将其显式复制到 CPU。警告
rpc_async
API 在通过网络发送参数 Tensor 的存储之前不会进行复制,这可以由不同的线程完成,具体取决于 RPC 后端类型。调用者应确保这些 Tensor 的内容保持完整,直到返回的Future
完成。- 示例:
确保在两个 worker 上正确设置了
MASTER_ADDR
和MASTER_PORT
。有关更多详细信息,请参阅init_process_group()
API。例如,export MASTER_ADDR=localhost export MASTER_PORT=5678
然后在两个不同的进程中运行以下代码
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut1 = rpc.rpc_async("worker1", torch.add, args=(torch.ones(2), 3)) >>> fut2 = rpc.rpc_async("worker1", min, args=(1, 2)) >>> result = fut1.wait() + fut2.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
以下是使用 RPC 运行 TorchScript 函数的示例。
>>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar)
>>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> fut = rpc.rpc_async("worker1", my_script_add, args=(torch.ones(2), 3)) >>> ret = fut.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.remote(to, func, args=None, kwargs=None, timeout=-1.0)[source][source]¶
发起远程调用,以在 worker
to
上运行func
并立即返回对结果值的RRef
。Workerto
将是返回的RRef
的所有者,而调用remote
的 worker 是用户。所有者管理其RRef
的全局引用计数,并且仅当全局没有对它的活跃引用时,所有者RRef
才会被销毁。- 参数
to (str 或 WorkerInfo 或 int) – 目标 worker 的名称/级别/
WorkerInfo
。func (可调用对象) – 可调用函数,例如 Python 可调用对象、内置算子 (例如
add()
) 和注解的 TorchScript 函数。args (tuple) –
func
调用的参数元组。kwargs (dict) – 是
func
调用的关键字参数字典。timeout (float, 可选的) – 此远程调用的超时秒数。如果在 worker
to
上创建此RRef
未在此 worker 上于此超时内成功处理,则下次尝试使用 RRef(例如to_here()
)时,将抛出表示此失败的超时异常。值为 0 表示无限超时,即永远不会抛出超时错误。如果未提供,则使用在初始化期间或通过_set_rpc_timeout
设置的默认值。
- 返回
指向结果值的用户
RRef
实例。使用阻塞 APItorch.distributed.rpc.RRef.to_here()
在本地检索结果值。
警告
remote
API 在通过网络发送参数 Tensor 的存储之前不会进行复制,这可以由不同的线程完成,具体取决于 RPC 后端类型。调用者应确保这些 Tensor 的内容保持完整,直到返回的 RRef 被所有者确认,这可以使用torch.distributed.rpc.RRef.confirmed_by_owner()
API 检查。警告
对于
remote
API 的诸如超时之类的错误,是尽力而为地处理的。这意味着当由remote
启动的远程调用失败时,例如出现超时错误,我们采取尽力而为的方法来处理错误。这意味着错误会在异步基础上处理并设置到结果 RRef 上。如果在处理这些错误之前应用程序尚未使用该 RRef(例如to_here()
),那么将来使用该RRef
将会适当地抛出错误。但是,用户应用程序有可能在错误处理完成之前使用RRef
。在这种情况下,错误可能不会被抛出,因为它们尚未被处理。示例
Make sure that ``MASTER_ADDR`` and ``MASTER_PORT`` are set properly on both workers. Refer to :meth:`~torch.distributed.init_process_group` API for more details. For example, export MASTER_ADDR=localhost export MASTER_PORT=5678 Then run the following code in two different processes: >>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1)) >>> x = rref1.to_here() + rref2.to_here() >>> rpc.shutdown() >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown() Below is an example of running a TorchScript function using RPC. >>> # On both workers: >>> @torch.jit.script >>> def my_script_add(tensor: torch.Tensor, scalar: int): >>> return torch.add(tensor, scalar) >>> # On worker 0: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> rref = rpc.remote("worker1", my_script_add, args=(torch.ones(2), 3)) >>> rref.to_here() >>> rpc.shutdown() >>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
- torch.distributed.rpc.get_worker_info(worker_name=None)[source][source]¶
获取给定 worker 名称的
WorkerInfo
。使用此WorkerInfo
可避免在每次调用时传递开销大的字符串。- 参数
worker_name (str) – worker 的字符串名称。如果为
None
,则返回当前 worker 的 ID。(默认值None
)- 返回
给定
worker_name
的WorkerInfo
实例,或当前 worker 的WorkerInfo
(如果worker_name
为None
)。
- torch.distributed.rpc.shutdown(graceful=True, timeout=0)[source][source]¶
执行 RPC 代理的关闭,然后销毁 RPC 代理。这将阻止本地代理接受未完成的请求,并通过终止所有 RPC 线程来关闭 RPC 框架。如果
graceful=True
,这将阻塞,直到所有本地和远程 RPC 进程到达此方法并等待所有未完成的工作完成。否则,如果graceful=False
,这是本地关闭,它不会等待其他 RPC 进程到达此方法。警告
对于由
rpc_async()
返回的Future
对象,不应在shutdown()
之后调用future.wait()
。- 参数
graceful (bool) – 是否执行优雅关闭。如果为 True,这将 1) 等待直到没有针对
UserRRefs
的待处理系统消息并删除它们;2) 阻塞直到所有本地和远程 RPC 进程都到达此方法并等待所有未完成的工作完成。
- 示例:
确保在两个 worker 上正确设置了
MASTER_ADDR
和MASTER_PORT
。有关更多详细信息,请参阅init_process_group()
API。例如,export MASTER_ADDR=localhost export MASTER_PORT=5678
然后在两个不同的进程中运行以下代码
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> # do some work >>> result = rpc.rpc_sync("worker1", torch.add, args=(torch.ones(1), 1)) >>> # ready to shutdown >>> rpc.shutdown()
>>> # On worker 1: >>> import torch.distributed.rpc as rpc >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> # wait for worker 0 to finish work, and then shutdown. >>> rpc.shutdown()
- 类 torch.distributed.rpc.WorkerInfo¶
一种结构,封装了系统中工作进程(worker)的信息。包含工作进程的名称和 ID。这个类不应直接构造,而是可以通过
get_worker_info()
获取其实例,结果可以传递给诸如rpc_sync()
、rpc_async()
、remote()
的函数,以避免在每次调用时复制字符串。- property id¶
全局唯一 ID,用于标识工作进程。
- property name¶
工作进程的名称。
RPC 包还提供了装饰器,允许应用程序指定给定函数在被调用方(callee)应如何处理。
- torch.distributed.rpc.functions.async_execution(fn)[source][source]¶
一个函数装饰器,表明该函数的返回值保证是一个
Future
对象,并且该函数可以在 RPC 被调用方上异步运行。更具体地说,被调用方会提取被包装函数返回的Future
,并将后续处理步骤安装为该Future
的回调。安装的回调在Future
完成时会读取其值,并将该值作为 RPC 响应发送回去。这也意味着返回的Future
仅存在于被调用方,且永远不会通过 RPC 发送。当被包装函数(fn
)的执行需要暂停和恢复时,例如由于包含rpc_async()
或等待其他信号,这个装饰器非常有用。注意
为了启用异步执行,应用程序必须将此装饰器返回的函数对象传递给 RPC API。如果 RPC 检测到此装饰器安装的属性,就知道此函数返回的是
Future
对象,并将据此进行处理。然而,这并不意味着在定义函数时此装饰器必须是最外层的。例如,当与@staticmethod
或@classmethod
结合使用时,@rpc.functions.async_execution
需要作为内层装饰器,以便目标函数能被识别为静态方法或类方法。这个目标函数仍然可以异步执行,因为访问时,静态或类方法会保留@rpc.functions.async_execution
安装的属性。- 示例:
返回的
Future
对象可以来自rpc_async()
、then()
或Future
构造函数。下面的例子展示了直接使用then()
返回的Future
。>>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> @rpc.functions.async_execution >>> def async_add_chained(to, x, y, z): >>> # This function runs on "worker1" and returns immediately when >>> # the callback is installed through the `then(cb)` API. In the >>> # mean time, the `rpc_async` to "worker2" can run concurrently. >>> # When the return value of that `rpc_async` arrives at >>> # "worker1", "worker1" will run the lambda function accordingly >>> # and set the value for the previously returned `Future`, which >>> # will then trigger RPC to send the result back to "worker0". >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> async_add_chained, >>> args=("worker2", torch.ones(2), 1, 1) >>> ) >>> print(ret) # prints tensor([3., 3.])
当与 TorchScript 装饰器结合使用时,此装饰器必须是最外层的。
>>> from torch import Tensor >>> from torch.futures import Future >>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> @torch.jit.script >>> def script_add(x: Tensor, y: Tensor) -> Tensor: >>> return x + y >>> >>> @rpc.functions.async_execution >>> @torch.jit.script >>> def async_add(to: str, x: Tensor, y: Tensor) -> Future[Tensor]: >>> return rpc.rpc_async(to, script_add, (x, y)) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> async_add, >>> args=("worker2", torch.ones(2), 1) >>> ) >>> print(ret) # prints tensor([2., 2.])
当与静态方法或类方法结合使用时,此装饰器必须是内层的。
>>> from torch.distributed import rpc >>> >>> # omitting setup and shutdown RPC >>> >>> # On all workers >>> class AsyncExecutionClass: >>> >>> @staticmethod >>> @rpc.functions.async_execution >>> def static_async_add(to, x, y, z): >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> @classmethod >>> @rpc.functions.async_execution >>> def class_async_add(cls, to, x, y, z): >>> ret_fut = torch.futures.Future() >>> rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: ret_fut.set_result(fut.wait() + z) >>> ) >>> return ret_fut >>> >>> @rpc.functions.async_execution >>> def bound_async_add(self, to, x, y, z): >>> return rpc.rpc_async(to, torch.add, args=(x, y)).then( >>> lambda fut: fut.wait() + z >>> ) >>> >>> # On worker0 >>> ret = rpc.rpc_sync( >>> "worker1", >>> AsyncExecutionClass.static_async_add, >>> args=("worker2", torch.ones(2), 1, 2) >>> ) >>> print(ret) # prints tensor([4., 4.]) >>> >>> ret = rpc.rpc_sync( >>> "worker1", >>> AsyncExecutionClass.class_async_add, >>> args=("worker2", torch.ones(2), 1, 2) >>> ) >>> print(ret) # prints tensor([4., 4.])
此装饰器也适用于 RRef 助手方法,即 .
torch.distributed.rpc.RRef.rpc_sync()
、torch.distributed.rpc.RRef.rpc_async()
和torch.distributed.rpc.RRef.remote()
。>>> from torch.distributed import rpc >>> >>> # reuse the AsyncExecutionClass class above >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.rpc_sync().static_async_add("worker2", torch.ones(2), 1, 2) >>> print(ret) # prints tensor([4., 4.]) >>> >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.rpc_async().static_async_add("worker2", torch.ones(2), 1, 2).wait() >>> print(ret) # prints tensor([4., 4.]) >>> >>> rref = rpc.remote("worker1", AsyncExecutionClass) >>> ret = rref.remote().static_async_add("worker2", torch.ones(2), 1, 2).to_here() >>> print(ret) # prints tensor([4., 4.])
后端¶
RPC 模块可以利用不同的后端在节点之间执行通信。要使用的后端可以在 init_rpc()
函数中通过传递 BackendType
枚举的特定值来指定。无论使用哪种后端,RPC API 的其余部分都不会改变。每个后端还定义了 RpcBackendOptions
类的自己的子类,其一个实例也可以传递给 init_rpc()
来配置后端的行为。
- class torch.distributed.rpc.BackendType(value)¶
可用后端的枚举类。
PyTorch 内置了
BackendType.TENSORPIPE
后端。可以使用register_backend()
函数注册额外的后端。
- class torch.distributed.rpc.RpcBackendOptions¶
一个抽象结构,封装了传递给 RPC 后端的选项。此类的一个实例可以传递给
init_rpc()
以使用特定配置初始化 RPC,例如要使用的 RPC 超时时间和init_method
。- property init_method¶
指定如何初始化进程组的 URL。默认为
env://
。
- property rpc_timeout¶
一个浮点数,表示所有 RPC 使用的超时时间。如果 RPC 未在此时间范围内完成,它将以一个异常结束,表明已超时。
TensorPipe 后端¶
TensorPipe agent(默认后端)利用了 TensorPipe 库,该库提供了一种专门适用于机器学习的原生点对点通信原语,从根本上解决了 Gloo 的一些限制。与 Gloo 相比,它的优势在于异步性,这允许大量传输同时发生,每个传输以自己的速度进行,互不阻塞。它只会在需要时按需在节点对之间打开管道,当一个节点失败时,只有其相关的管道会关闭,而其他所有管道会正常工作。此外,它能够支持多种不同的传输方式(当然包括 TCP,但也包括共享内存、NVLink、InfiniBand 等),并且可以自动检测它们的可用性并协商为每个管道使用最佳的传输方式。
TensorPipe 后端已在 PyTorch v1.6 中引入,并正在积极开发中。目前,它仅支持 CPU 张量,GPU 支持即将推出。它自带一个基于 TCP 的传输方式,就像 Gloo 一样。它还能够自动将大型张量分块并通过多个 socket 和线程进行多路复用(multiplex),以实现非常高的带宽。agent 将能够自行选择最佳传输方式,无需干预。
示例
>>> import os
>>> from torch.distributed import rpc
>>> os.environ['MASTER_ADDR'] = 'localhost'
>>> os.environ['MASTER_PORT'] = '29500'
>>>
>>> rpc.init_rpc(
>>> "worker1",
>>> rank=0,
>>> world_size=2,
>>> rpc_backend_options=rpc.TensorPipeRpcBackendOptions(
>>> num_worker_threads=8,
>>> rpc_timeout=20 # 20 second timeout
>>> )
>>> )
>>>
>>> # omitting init_rpc invocation on worker2
- class torch.distributed.rpc.TensorPipeRpcBackendOptions(*, num_worker_threads=16, rpc_timeout=60.0, init_method='env://', device_maps=None, devices=None, _transports=None, _channels=None)[source][source]¶
TensorPipeAgent
的后端选项,派生自RpcBackendOptions
。- 参数
num_worker_threads (int, optional) –
TensorPipeAgent
用于执行请求的线程池中的线程数(默认值:16)。rpc_timeout (float, optional) – RPC 请求的默认超时时间,以秒为单位(默认值:60 秒)。如果 RPC 未在此时间范围内完成,将引发异常指示其已超时。调用者可以在
rpc_sync()
和rpc_async()
中覆盖此超时时间(如有必要)。init_method (str, optional) – 用于初始化用于 rendezvous 的分布式存储的 URL。它可以接受与
init_process_group()
的同名参数接受的任何值(默认值:env://
)。device_maps (Dict[str, Dict], optional) – 从此工作进程到被调用方的设备放置映射。键是被调用工作进程的名称,值是字典(
Dict
,其值可以是int
、str
或torch.device
),该字典将此工作进程的设备映射到被调用工作进程的设备。(默认值:None
)devices (List[int, str, or
torch.device
], optional) – RPC agent 使用的所有本地 CUDA 设备。默认情况下,它将初始化为来自其自身device_maps
的所有本地设备及其 peer 的device_maps
中相应的设备。处理 CUDA RPC 请求时,agent 会正确地同步此List
中所有设备的 CUDA streams。
- property device_maps¶
设备映射位置。
- property devices¶
本地 agent 使用的所有设备。
- property init_method¶
指定如何初始化进程组的 URL。默认为
env://
。
- property num_worker_threads¶
TensorPipeAgent
用于执行请求的线程池中的线程数。
- property rpc_timeout¶
一个浮点数,表示所有 RPC 使用的超时时间。如果 RPC 未在此时间范围内完成,它将以一个异常结束,表明已超时。
- set_device_map(to, device_map)[source][source]¶
设置每个 RPC 调用方和被调用方之间的设备映射。此函数可以多次调用以增量添加设备放置配置。
- 参数
to (str) – 被调用方的名称。
device_map (Dict of int, str, or torch.device) – 从此工作进程到被调用方的设备放置映射。此映射必须是可逆的(invertible)。
示例
>>> # both workers >>> def add(x, y): >>> print(x) # tensor([1., 1.], device='cuda:1') >>> return x + y, (x + y).to(2) >>> >>> # on worker 0 >>> options = TensorPipeRpcBackendOptions( >>> num_worker_threads=8, >>> device_maps={"worker1": {0: 1}} >>> # maps worker0's cuda:0 to worker1's cuda:1 >>> ) >>> options.set_device_map("worker1", {1: 2}) >>> # maps worker0's cuda:1 to worker1's cuda:2 >>> >>> rpc.init_rpc( >>> "worker0", >>> rank=0, >>> world_size=2, >>> backend=rpc.BackendType.TENSORPIPE, >>> rpc_backend_options=options >>> ) >>> >>> x = torch.ones(2) >>> rets = rpc.rpc_sync("worker1", add, args=(x.to(0), 1)) >>> # The first argument will be moved to cuda:1 on worker1. When >>> # sending the return value back, it will follow the invert of >>> # the device map, and hence will be moved back to cuda:0 and >>> # cuda:1 on worker0 >>> print(rets[0]) # tensor([2., 2.], device='cuda:0') >>> print(rets[1]) # tensor([2., 2.], device='cuda:1')
注意
RPC 框架不会自动重试任何 rpc_sync()
、 rpc_async()
和 remote()
调用。原因在于 RPC 框架无法确定操作是否是幂等的(idempotent)以及重试是否安全。因此,处理故障并在必要时重试是应用程序的责任。RPC 通信基于 TCP,因此可能由于网络故障或间歇性网络连接问题导致故障。在这种情况下,应用程序需要通过合理的退避(backoff)来适当重试,以确保网络不会因激进的重试而过载。
RRef¶
警告
使用 CUDA 张量时,RRef 当前不受支持。
一个 RRef
(Remote REFerence,远程引用) 是对远程工作进程上某种类型 T
的值(例如 Tensor
)的引用。此句柄使被引用的远程值在其所有者上保持活动状态,但不意味着该值将来会被传输到本地工作进程。RRef 可用于多机训练,通过持有指向其他工作进程上存在的 nn.Modules 的引用,并在训练期间调用适当的函数来检索或修改它们的参数。更多详细信息请参阅 远程引用协议(Remote Reference Protocol)。
- class torch.distributed.rpc.PyRRef(RRef)¶
一个类,封装了对远程工作进程上某种类型值的引用。此句柄会使被引用的远程值在该工作进程上保持活动。当 1) 应用程序代码和本地 RRef 上下文中都没有对它的引用时,或 2) 应用程序已调用了优雅关闭时,
UserRRef
将被删除。在已删除的 RRef 上调用方法会导致未定义的行为。RRef 实现只提供尽力而为的错误检测,且应用程序在rpc.shutdown()
之后不应使用UserRRefs
。警告
RRef 只能由 RPC 模块进行序列化和反序列化。不通过 RPC 进行 RRef 的序列化和反序列化(例如,Python pickle,torch
save()
/load()
,JITsave()
/load()
等)将导致错误。- 参数
value (object) – 将由本 RRef 包装的值。
type_hint (Type, optional) – 应传递给
TorchScript
编译器作为value
的类型提示的 Python 类型。
- 示例:
以下示例为简化起见,省略了 RPC 初始化和关闭代码。有关详细信息,请参阅 RPC 文档。
使用 rpc.remote 创建 RRef
>>> import torch >>> import torch.distributed.rpc as rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> # get a copy of value from the RRef >>> x = rref.to_here()
从本地对象创建 RRef
>>> import torch >>> from torch.distributed.rpc import RRef >>> x = torch.zeros(2, 2) >>> rref = RRef(x)
与其他工作进程共享 RRef
>>> # On both worker0 and worker1: >>> def f(rref): >>> return rref.to_here() + 1
>>> # On worker0: >>> import torch >>> import torch.distributed.rpc as rpc >>> from torch.distributed.rpc import RRef >>> rref = RRef(torch.zeros(2, 2)) >>> # the following RPC shares the rref with worker1, reference >>> # count is automatically updated. >>> rpc.rpc_sync("worker1", f, args=(rref,))
- backward(self: torch._C._distributed_rpc.PyRRef, dist_autograd_ctx_id: int = -1, retain_graph: bool = False) None ¶
使用 RRef 作为反向传播的根节点运行反向传播。如果提供了
dist_autograd_ctx_id
,我们将使用提供的 ctx_id 从 RRef 的所有者开始执行分布式反向传播。在这种情况下,应使用get_gradients()
来检索梯度。如果dist_autograd_ctx_id
为None
,则假定这是本地 autograd 图,并且我们只执行本地反向传播。在本地情况下,调用此 API 的节点必须是 RRef 的所有者。RRef 的值应为标量 Tensor。- 参数
- 示例:
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> rref.backward(context_id)
- confirmed_by_owner(self: torch._C._distributed_rpc.PyRRef) bool ¶
返回此
RRef
是否已由所有者确认。OwnerRRef
始终返回 true,而UserRRef
仅在所有者知道此UserRRef
时返回 true。
- is_owner(self: torch._C._distributed_rpc.PyRRef) bool ¶
返回当前节点是否是此
RRef
的所有者。
- local_value(self: torch._C._distributed_rpc.PyRRef) object ¶
如果当前节点是所有者,则返回对本地值的引用。否则,抛出异常。
- owner(self: torch._C._distributed_rpc.PyRRef) torch._C._distributed_rpc.WorkerInfo ¶
返回拥有此
RRef
的节点的工作进程信息。
- owner_name(self: torch._C._distributed_rpc.PyRRef) str ¶
返回拥有此
RRef
的节点的工作进程名称。
- remote(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object ¶
创建一个助手代理,以便使用 RRef 的所有者作为目标轻松启动
remote
以在此 RRef 引用的对象上运行函数。更具体地说,rref.remote().func_name(*args, **kwargs)
与以下代码等价:>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.remote(rref.owner(), run, args=(rref, func_name, args, kwargs))
- 参数
timeout (float, optional) –
rref.remote()
的超时时间。如果此RRef
的创建未在此超时时间内成功完成,则下次尝试使用 RRef(例如to_here
)时将引发超时错误。如果未提供,将使用默认的 RPC 超时时间。有关RRef
的特定超时语义,请参阅rpc.remote()
。
- 示例:
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.remote().size().to_here() # returns torch.Size([2, 2]) >>> rref.remote().view(1, 4).to_here() # returns tensor([[1., 1., 1., 1.]])
- rpc_async(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object ¶
创建一个助手代理,以便使用 RRef 的所有者作为目标轻松启动
rpc_async
以在此 RRef 引用的对象上运行函数。更具体地说,rref.rpc_async().func_name(*args, **kwargs)
与以下代码等价:>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.rpc_async(rref.owner(), run, args=(rref, func_name, args, kwargs))
- 参数
timeout (float, optional) –
rref.rpc_async()
的超时时间。如果调用未在此时间范围内完成,将引发异常指示已超时。如果未提供此参数,将使用默认的 RPC 超时时间。
- 示例:
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.rpc_async().size().wait() # returns torch.Size([2, 2]) >>> rref.rpc_async().view(1, 4).wait() # returns tensor([[1., 1., 1., 1.]])
- rpc_sync(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object ¶
创建一个助手代理,以便使用 RRef 的所有者作为目标轻松启动
rpc_sync
以在此 RRef 引用的对象上运行函数。更具体地说,rref.rpc_sync().func_name(*args, **kwargs)
与以下代码等价:>>> def run(rref, func_name, args, kwargs): >>> return getattr(rref.local_value(), func_name)(*args, **kwargs) >>> >>> rpc.rpc_sync(rref.owner(), run, args=(rref, func_name, args, kwargs))
- 参数
timeout (float, 可选) –
rref.rpc_sync()
的超时时间。如果调用未在此时间范围内完成,则会引发相应的异常。如果未提供此参数,将使用默认的 RPC 超时时间。
- 示例:
>>> from torch.distributed import rpc >>> rref = rpc.remote("worker1", torch.add, args=(torch.zeros(2, 2), 1)) >>> rref.rpc_sync().size() # returns torch.Size([2, 2]) >>> rref.rpc_sync().view(1, 4) # returns tensor([[1., 1., 1., 1.]])
- to_here(self: torch._C._distributed_rpc.PyRRef, timeout: float = -1.0) object ¶
阻塞式调用,用于将 RRef 的值从所有者复制到本地节点并返回。如果当前节点是所有者,则返回本地值的引用。
- 参数
timeout (float, 可选) –
to_here
的超时时间。如果调用未在此时间范围内完成,则会引发相应的异常。如果未提供此参数,将使用默认的 RPC 超时时间 (60s)。
RemoteModule¶
警告
使用 CUDA 张量时,RemoteModule 目前不受支持
RemoteModule
是一种在不同进程上远程创建 nn.Module 的简单方法。实际模块位于远程主机上,但本地主机拥有该模块的句柄,可以像调用常规 nn.Module 一样调用此模块。但是,调用会涉及对远程端的 RPC 调用,并且如果需要,可以通过 RemoteModule 支持的附加 API 异步执行。
- class torch.distributed.nn.api.remote_module.RemoteModule(*args, **kwargs)[source][source]¶
RemoteModule 实例只能在 RPC 初始化后创建。
它在指定的远程节点上创建一个用户指定的模块。它的行为类似于常规的
nn.Module
,区别在于forward
方法在远程节点上执行。它负责自动微分记录,以确保反向传播将梯度传回相应的远程模块。它根据
module_cls
的forward
方法签名生成两个方法:forward_async
和forward
。forward_async
异步运行并返回一个 Future。forward_async
和forward
的参数与module_cls
返回的模块的forward
方法的参数相同。例如,如果
module_cls
返回一个nn.Linear
实例,其forward
方法签名为:def forward(input: Tensor) -> Tensor:
,则生成的RemoteModule
将有以下两个方法签名:def forward(input: Tensor) -> Tensor:
def forward_async(input: Tensor) -> Future[Tensor]:
- 参数
remote_device (str) – 希望放置此模块的目标工作进程上的设备。格式应为“<workername>/<device>”,其中设备字段可以解析为 torch.device 类型。例如,“trainer0/cpu”、“trainer0”、“ps0/cuda:0”。此外,设备字段可以是可选的,默认值为“cpu”。
module_cls (nn.Module) –
用于远程创建模块的类。例如,
>>> class MyModule(nn.Module): >>> def forward(input): >>> return input + 1 >>> >>> module_cls = MyModule
args (Sequence, 可选) – 传递给
module_cls
的参数。kwargs (Dict, 可选) – 传递给
module_cls
的关键字参数。
- 返回
一个远程模块实例,它封装了由用户提供的
module_cls
创建的Module
,它有一个阻塞的forward
方法和一个异步的forward_async
方法,该方法返回在远程端用户提供的模块上进行的forward
调用的 future。
- 示例:
在两个不同的进程中运行以下代码
>>> # On worker 0: >>> import torch >>> import torch.distributed.rpc as rpc >>> from torch import nn, Tensor >>> from torch.distributed.nn.api.remote_module import RemoteModule >>> >>> rpc.init_rpc("worker0", rank=0, world_size=2) >>> remote_linear_module = RemoteModule( >>> "worker1/cpu", nn.Linear, args=(20, 30), >>> ) >>> input = torch.randn(128, 20) >>> ret_fut = remote_linear_module.forward_async(input) >>> ret = ret_fut.wait() >>> rpc.shutdown()
>>> # On worker 1: >>> import torch >>> import torch.distributed.rpc as rpc >>> >>> rpc.init_rpc("worker1", rank=1, world_size=2) >>> rpc.shutdown()
此外,可以在此教程中找到一个与 DistributedDataParallel (DDP) 结合的更实际的示例。
- remote_parameters(recurse=True)[source]¶
返回指向远程模块参数的
RRef
列表。这通常可以与
DistributedOptimizer
结合使用。- 参数
recurse (bool) – 如果为 True,则返回远程模块及其所有子模块的参数。否则,仅返回作为远程模块直接成员的参数。
- 返回
指向远程模块参数的
RRef
列表 (List[RRef[nn.Parameter]]
)。- 返回类型
list[torch.distributed.rpc.api.RRef[torch.nn.parameter.Parameter]]
分布式 Autograd 框架¶
警告
使用 CUDA 张量时,分布式自动微分目前不受支持
此模块提供了一个基于 RPC 的分布式自动微分框架,可用于模型并行训练等应用。简而言之,应用程序可以通过 RPC 发送和接收记录梯度的张量。在前向传播中,我们记录何时通过 RPC 发送记录梯度的张量,并在反向传播期间使用此信息通过 RPC 执行分布式反向传播。有关更多详细信息,请参见分布式自动微分设计。
- torch.distributed.autograd.backward(context_id: int, roots: List[Tensor], retain_graph=False) None ¶
使用提供的 roots 启动分布式反向传播。这当前实现了FAST 模式算法,该算法假设在所有工作进程的同一分布式自动微分上下文中发送的所有 RPC 消息在反向传播期间都是自动微分图的一部分。
我们使用提供的 roots 来发现自动微分图并计算适当的依赖关系。此方法会阻塞,直到整个自动微分计算完成。
我们将在每个节点上相应的
torch.distributed.autograd.context
中累积梯度。用于自动微分的上下文是根据调用torch.distributed.autograd.backward()
时传入的context_id
来查找的。如果没有对应于给定 ID 的有效自动微分上下文,我们将抛出错误。您可以使用get_gradients()
API 检索累积的梯度。- 参数
- 示例:
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> pred = model.forward() >>> loss = loss_func(pred, loss) >>> dist_autograd.backward(context_id, loss)
- class torch.distributed.autograd.context[source][source]¶
在使用分布式自动微分时,用于包装前向和反向传播的上下文对象。
with
语句中生成的context_id
是在所有工作进程上唯一标识分布式反向传播所必需的。每个工作进程存储与此context_id
关联的元数据,这是正确执行分布式自动微分传递所必需的。- 示例:
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> loss = rpc.rpc_sync("worker1", torch.add, args=(t1, t2)).sum() >>> dist_autograd.backward(context_id, [loss])
- torch.distributed.autograd.get_gradients(context_id: int) Dict[Tensor, Tensor] ¶
检索一个映射表,其中键是 Tensor,值是在提供的上下文中针对给定
context_id
累积的该 Tensor 的相应梯度,作为分布式自动微分反向传播的一部分。- 参数
context_id (int) – 应检索梯度的自动微分上下文 ID。
- 返回
一个映射表,其中键是 Tensor,值是该 Tensor 关联的梯度。
- 示例:
>>> import torch.distributed.autograd as dist_autograd >>> with dist_autograd.context() as context_id: >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> loss = t1 + t2 >>> dist_autograd.backward(context_id, [loss.sum()]) >>> grads = dist_autograd.get_gradients(context_id) >>> print(grads[t1]) >>> print(grads[t2])
有关 RPC 自动微分的更多信息
分布式优化器¶
有关分布式优化器的文档,请参见torch.distributed.optim 页面。
设计说明¶
分布式自动微分设计说明涵盖了基于 RPC 的分布式自动微分框架的设计,该框架适用于模型并行训练等应用。
RRef 设计说明涵盖了框架用于引用远程工作进程上值的 RRef(远程引用)协议的设计。
教程¶
RPC 教程向用户介绍了 RPC 框架,提供了几个使用 torch.distributed.rpc API 的示例应用程序,并演示了如何使用分析器 (profiler) 来分析基于 RPC 的工作负载。