• 文档 >
  • 分布式通信包 - torch.distributed
快捷方式

分布式通信包 - torch.distributed

注意

请参考 PyTorch 分布式概述,简要了解与分布式训练相关的所有功能。

后端

torch.distributed 支持三种内置后端,每种后端具有不同的功能。下表显示了哪些功能可用于 CPU/CUDA 张量。只有当用于构建 PyTorch 的实现支持 MPI 时,MPI 才支持 CUDA。

后端

gloo

mpi

nccl

设备

CPU

GPU

CPU

GPU

CPU

GPU

发送

?

接收

?

广播

?

all_reduce

?

reduce

?

all_gather

?

gather

?

scatter

?

reduce_scatter

all_to_all

?

barrier

?

PyTorch 自带的后端

PyTorch 分布式包支持 Linux (稳定版)、MacOS (稳定版) 和 Windows (原型版)。默认情况下,对于 Linux,Gloo 和 NCCL 后端已构建并包含在 PyTorch 分布式包中 (NCCL 仅在使用 CUDA 构建时)。MPI 是一个可选后端,只有在您从源代码构建 PyTorch 时才能包含。(例如,在安装了 MPI 的主机上构建 PyTorch。)

注意

从 PyTorch v1.8 开始,Windows 支持除 NCCL 之外的所有集合通信后端。如果 init_method 参数 init_process_group() 指向文件,则它必须遵循以下模式

  • 本地文件系统,init_method="file:///d:/tmp/some_file"

  • 共享文件系统,init_method="file://////{machine_name}/{share_folder_name}/some_file"

与 Linux 平台相同,您可以通过设置环境变量 MASTER_ADDR 和 MASTER_PORT 来启用 TcpStore。

使用哪个后端?

过去,我们经常被问到:“我应该使用哪个后端?”。

  • 经验法则

    • 对于分布式 GPU 训练,使用 NCCL 后端

    • 对于分布式 CPU 训练,使用 Gloo 后端。

  • 具有 InfiniBand 互连的 GPU 主机

    • 使用 NCCL,因为它是目前唯一支持 InfiniBand 和 GPUDirect 的后端。

  • 具有以太网互连的 GPU 主机

    • 使用 NCCL,因为它目前提供最佳的分布式 GPU 训练性能,特别是对于多进程单节点或多节点分布式训练。如果您在使用 NCCL 时遇到任何问题,请使用 Gloo 作为备选方案。(请注意,对于 GPU,Gloo 目前比 NCCL 运行得慢。)

  • 具有 InfiniBand 互连的 CPU 主机

    • 如果您的 InfiniBand 启用了 IP over IB,请使用 Gloo,否则,请改用 MPI。我们计划在即将发布的版本中为 Gloo 添加 InfiniBand 支持。

  • 具有以太网互连的 CPU 主机

    • 使用 Gloo,除非您有使用 MPI 的特殊理由。

常用环境变量

选择要使用的网络接口

默认情况下,NCCL 和 Gloo 后端都将尝试查找要使用的正确网络接口。如果自动检测到的接口不正确,您可以使用以下环境变量覆盖它 (适用于各自的后端)

  • NCCL_SOCKET_IFNAME,例如 export NCCL_SOCKET_IFNAME=eth0

  • GLOO_SOCKET_IFNAME,例如 export GLOO_SOCKET_IFNAME=eth0

如果您正在使用 Gloo 后端,则可以通过逗号分隔来指定多个接口,如下所示:export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3。后端将以轮询方式跨这些接口分派操作。所有进程都必须在此变量中指定相同数量的接口,这一点至关重要。

其他 NCCL 环境变量

调试 - 如果 NCCL 失败,您可以设置 NCCL_DEBUG=INFO 以打印显式警告消息以及基本的 NCCL 初始化信息。

您还可以使用 NCCL_DEBUG_SUBSYS 来获取有关 NCCL 特定方面的更多详细信息。例如,NCCL_DEBUG_SUBSYS=COLL 将打印集合调用的日志,这在调试挂起时可能很有用,尤其是那些由集合类型或消息大小不匹配引起的挂起。如果拓扑检测失败,则设置 NCCL_DEBUG_SUBSYS=GRAPH 以检查详细的检测结果并在需要 NCCL 团队的进一步帮助时保存为参考将很有帮助。

性能调优 - NCCL 基于其拓扑检测执行自动调优,以节省用户的调优工作。在某些基于套接字的系统上,用户仍然可以尝试调优 NCCL_SOCKET_NTHREADSNCCL_NSOCKS_PERTHREAD 以增加套接字网络带宽。NCCL 已为某些云提供商 (例如 AWS 或 GCP) 预先调优了这两个环境变量。

有关 NCCL 环境变量的完整列表,请参阅 NVIDIA NCCL 官方文档

基础知识

torch.distributed 包为运行在一台或多台机器上的多个计算节点之间的多进程并行性提供 PyTorch 支持和通信原语。类 torch.nn.parallel.DistributedDataParallel() 基于此功能构建,以提供同步分布式训练,作为任何 PyTorch 模型的包装器。这与 多进程包 - torch.multiprocessingtorch.nn.DataParallel() 提供的并行性类型不同,因为它支持多台网络连接的机器,并且用户必须为每个进程显式启动主训练脚本的单独副本。

在单机同步情况下,torch.distributedtorch.nn.parallel.DistributedDataParallel() 包装器可能仍然比其他数据并行方法 (包括 torch.nn.DataParallel()) 具有优势

  • 每个进程都维护自己的优化器,并在每次迭代时执行完整的优化步骤。虽然这可能看起来是多余的,因为梯度已经聚集在一起并在进程之间平均,因此对于每个进程都是相同的,但这意味着不需要参数广播步骤,从而减少了在节点之间传输张量所花费的时间。

  • 每个进程都包含一个独立的 Python 解释器,从而消除了从单个 Python 进程驱动多个执行线程、模型副本或 GPU 带来的额外解释器开销和 “GIL 抖动”。这对于大量使用 Python 运行时的模型尤其重要,包括具有循环层或许多小组件的模型。

初始化

在调用任何其他方法之前,需要使用 torch.distributed.init_process_group()torch.distributed.device_mesh.init_device_mesh() 函数初始化该包。两者都会阻塞,直到所有进程都加入。

警告

初始化不是线程安全的。进程组创建应从单个线程执行,以防止跨 ranks 的 ‘UUID’ 分配不一致,并防止初始化期间可能导致挂起的竞争。

torch.distributed.is_available()[source][source]

如果分布式包可用,则返回 True

否则,torch.distributed 不会公开任何其他 API。目前,torch.distributed 在 Linux、MacOS 和 Windows 上可用。从源代码构建 PyTorch 时,设置 USE_DISTRIBUTED=1 以启用它。目前,Linux 和 Windows 的默认值为 USE_DISTRIBUTED=1,MacOS 的默认值为 USE_DISTRIBUTED=0

返回类型

bool

torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None)[source][source]

初始化默认的分布式进程组。

这也将初始化分布式包。

初始化进程组有两种主要方法
  1. 显式指定 storerankworld_size

  2. 指定 init_method (URL 字符串),它指示在哪里/如何发现对等方。可选地指定 rankworld_size,或在 URL 中编码所有必需的参数并省略它们。

如果两者都未指定,则 init_method 假定为 “env://”。

参数
  • backend (strBackend, 可选) – 要使用的后端。根据构建时配置,有效值包括 mpiglooncclucc 或由第三方插件注册的后端。从 2.6 开始,如果未提供 backend,则 c10d 将使用为 device_id kwarg (如果提供) 指示的设备类型注册的后端。目前已知的默认注册是:nccl 用于 cudagloo 用于 cpu。如果既未提供 backend 也未提供 device_id,则 c10d 将检测运行时机器上的加速器,并使用为检测到的加速器 (或 cpu) 注册的后端。此字段可以作为小写字符串 (例如,"gloo") 给出,也可以通过 Backend 属性 (例如,Backend.GLOO) 访问。如果在使用 nccl 后端时每台机器使用多个进程,则每个进程都必须具有对其使用的每个 GPU 的独占访问权限,因为在进程之间共享 GPU 可能会导致死锁或 NCCL 无效使用。ucc 后端是实验性的。

  • init_method (str, 可选) – URL,指定如何初始化进程组。如果未指定 init_methodstore,则默认为 “env://”。与 store 互斥。

  • world_size (int, 可选) – 参与作业的进程数。如果指定了 store,则为必需。

  • rank (int, 可选) – 当前进程的 Rank (它应该是一个介于 0 和 world_size-1 之间的数字)。如果指定了 store,则为必需。

  • store (Store, 可选) – 所有工作进程都可访问的键/值存储,用于交换连接/地址信息。与 init_method 互斥。

  • timeout (timedelta, 可选) – 针对进程组执行的操作的超时时间。NCCL 的默认值为 10 分钟,其他后端的默认值为 30 分钟。这是在此持续时间之后,集合将被异步中止,并且进程将崩溃。这样做是因为 CUDA 执行是异步的,并且继续执行用户代码不再安全,因为失败的异步 NCCL 操作可能会导致后续的 CUDA 操作在损坏的数据上运行。当设置 TORCH_NCCL_BLOCKING_WAIT 时,进程将阻塞并等待此超时。

  • group_name (str, 可选, 已弃用) – 组名。此参数将被忽略

  • pg_options (ProcessGroupOptions, 可选) – 进程组选项,指定在构建特定进程组期间需要传入哪些其他选项。到目前为止,我们唯一支持的选项是 nccl 后端的 ProcessGroupNCCL.Options,可以指定 is_high_priority_stream,以便 nccl 后端可以在有计算内核等待时获取高优先级 cuda 流。有关配置 nccl 的其他可用选项,请参阅 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t

  • device_id (torch.device, 可选) – 一个单一、特定的设备,用于将此进程 “绑定” 到该设备,从而实现特定于后端的优化。目前,这有两个影响,仅在 NCCL 下:通信器会立即形成 (立即调用 ncclCommInit* 而不是正常的惰性调用),并且子组将在可能的情况下使用 ncclCommSplit,以避免不必要的组创建开销。如果您想尽早知道 NCCL 初始化错误,也可以使用此字段。

注意

要启用 backend == Backend.MPI,需要在支持 MPI 的系统上从源代码构建 PyTorch。

注意

对多个后端的支持是实验性的。目前,当未指定后端时,将创建 gloonccl 后端。 gloo 后端将用于 CPU 张量的集合,而 nccl 后端将用于 CUDA 张量的集合。可以通过传入格式为 “<device_type>:<backend_name>,<device_type>:<backend_name>” 的字符串来指定自定义后端,例如 “cpu:gloo,cuda:custom_backend”。

torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, *, mesh_dim_names=None)[source][source]

基于 device_typemesh_shapemesh_dim_names 参数初始化 DeviceMesh

这将创建一个具有 n 维数组布局的 DeviceMesh,其中 nmesh_shape 的长度。如果提供了 mesh_dim_names,则每个维度都标记为 mesh_dim_names[i]

注意

init_device_mesh 遵循 SPMD 编程模型,这意味着相同的 PyTorch Python 程序在集群中的所有进程/ranks 上运行。确保 mesh_shape (描述设备布局的 nD 数组的维度) 在所有 ranks 中都是相同的。不一致的 mesh_shape 可能会导致挂起。

注意

如果未找到进程组,则 init_device_mesh 将在后台初始化分布式通信所需的分布式进程组。

参数
  • device_type (str) – Mesh 的设备类型。当前支持:“cpu”、“cuda/cuda-like”。不允许传入带有 GPU 索引的设备类型,例如 “cuda:0”。

  • mesh_shape (Tuple[int]) – 定义描述设备布局的多维数组维度的元组。

  • mesh_dim_names (Tuple[str], 可选) – 要分配给描述设备布局的多维数组的每个维度的 mesh 维度名称的元组。其长度必须与 mesh_shape 的长度匹配。mesh_dim_names 中的每个字符串都必须是唯一的。

返回

表示设备布局的 DeviceMesh 对象。

返回类型

DeviceMesh

示例:
>>> from torch.distributed.device_mesh import init_device_mesh
>>>
>>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,))
>>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp"))
torch.distributed.is_initialized()[source][source]

检查是否已初始化默认进程组。

返回类型

bool

torch.distributed.is_mpi_available()[source][source]

检查 MPI 后端是否可用。

返回类型

bool

torch.distributed.is_nccl_available()[source][source]

检查 NCCL 后端是否可用。

返回类型

bool

torch.distributed.is_gloo_available()[source][source]

检查 Gloo 后端是否可用。

返回类型

bool

torch.distributed.distributed_c10d.is_xccl_available()[source][source]

检查 XCCL 后端是否可用。

返回类型

bool

torch.distributed.is_torchelastic_launched()[source][source]

检查此进程是否使用 torch.distributed.elastic (又名 torchelastic) 启动。

TORCHELASTIC_RUN_ID 环境变量的存在被用作确定当前进程是否使用 torchelastic 启动的代理。这是一个合理的代理,因为 TORCHELASTIC_RUN_ID 映射到 rendezvous id,后者始终是一个非空值,指示用于对等方发现的作业 id。

返回类型

bool


目前支持三种初始化方法

TCP 初始化

有两种使用 TCP 进行初始化的方法,这两种方法都需要一个所有进程都可访问的网络地址和一个所需的 world_size。第一种方法需要指定属于 rank 0 进程的地址。此初始化方法要求所有进程都手动指定 ranks。

请注意,最新的分布式包不再支持多播地址。group_name 也已被弃用。

import torch.distributed as dist

# Use address of one of the machines
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456',
                        rank=args.rank, world_size=4)

共享文件系统初始化

另一种初始化方法是使用一个文件系统,该文件系统在组中的所有机器之间共享和可见,以及所需的 world_size。URL 应以 file:// 开头,并包含共享文件系统上 (在现有目录中) 不存在文件的路径。文件系统初始化将自动创建该文件 (如果该文件不存在),但不会删除该文件。因此,您有责任确保在下次在同一文件路径/名称上调用 init_process_group() 之前清理该文件。

请注意,最新的分布式包不再支持自动 rank 分配,group_name 也已被弃用。

警告

此方法假定文件系统支持使用 fcntl 进行锁定 - 大多数本地系统和 NFS 都支持它。

警告

此方法将始终创建文件,并尽最大努力在程序结束时清理和删除该文件。换句话说,每次使用文件初始化方法进行初始化时,都需要一个全新的空文件才能成功初始化。如果再次使用先前初始化(碰巧未被清理)使用的同一文件,则这是意外行为,并且通常会导致死锁和故障。因此,即使此方法将尽最大努力清理文件,但如果自动删除碰巧不成功,您也有责任确保在训练结束时删除该文件,以防止下次再次重复使用同一文件。如果您计划在同一文件名上多次调用 init_process_group(),这一点尤其重要。换句话说,如果文件未被删除/清理,并且您在该文件上再次调用 init_process_group(),则可能会出现故障。这里的经验法则是,确保每次调用 init_process_group() 时,文件都不存在或为空。

import torch.distributed as dist

# rank should always be specified
dist.init_process_group(backend, init_method='file:///mnt/nfs/sharedfile',
                        world_size=4, rank=args.rank)

环境变量初始化

此方法将从环境变量中读取配置,从而允许用户完全自定义信息的获取方式。需要设置的变量是

  • MASTER_PORT - 必需;必须是 rank 0 机器上的空闲端口

  • MASTER_ADDR - 必需(rank 0 除外);rank 0 节点的地址

  • WORLD_SIZE - 必需;可以在此处或在 init 函数调用中设置

  • RANK - 必需;可以在此处或在 init 函数调用中设置

rank 0 的机器将用于设置所有连接。

这是默认方法,意味着不必指定 init_method(或者可以是 env://)。

初始化后

一旦运行了 torch.distributed.init_process_group(),就可以使用以下函数。要检查进程组是否已初始化,请使用 torch.distributed.is_initialized()

class torch.distributed.Backend(name)[source][source]

用于后端的类似枚举的类。

可用后端:GLOO、NCCL、UCC、MPI、XCCL 和其他已注册的后端。

此类的取值是小写字符串,例如,"gloo"。它们可以作为属性访问,例如,Backend.NCCL

此类可以直接调用以解析字符串,例如,Backend(backend_str) 将检查 backend_str 是否有效,如果有效,则返回解析后的小写字符串。它也接受大写字符串,例如,Backend("GLOO") 返回 "gloo"

注意

条目 Backend.UNDEFINED 存在,但仅用作某些字段的初始值。用户既不应直接使用它,也不应假定它的存在。

classmethod register_backend(name, func, extended_api=False, devices=None)[source][source]

使用给定的名称和实例化函数注册新的后端。

此类方法供第三方 ProcessGroup 扩展注册新的后端使用。

参数
  • name (str) – ProcessGroup 扩展的后端名称。它应与 init_process_group() 中的名称匹配。

  • func (function) – 实例化后端的函数处理程序。该函数应在后端扩展中实现,并接受四个参数,包括 storerankworld_sizetimeout

  • extended_api (bool, 可选) – 后端是否支持扩展参数结构。默认值:False。如果设置为 True,后端将获得 c10d::DistributedBackendOptions 的实例,以及后端实现定义的过程组选项对象。

  • device (strlist of str, 可选) – 此后端支持的设备类型,例如 “cpu”、“cuda” 等。如果为 None,则假定同时支持 “cpu” 和 “cuda”

注意

对第三方后端的支持是实验性的,可能会发生变化。

torch.distributed.get_backend(group=None)[source][source]

返回给定进程组的后端。

参数

group (ProcessGroup, 可选) – 要操作的进程组。默认值是常规主进程组。如果指定了另一个特定组,则调用进程必须是 group 的一部分。

返回

给定进程组的后端,以小写字符串形式表示。

返回类型

后端

torch.distributed.get_rank(group=None)[source][source]

返回当前进程在提供的 group 中的 rank,否则返回默认值。

Rank 是分配给分布式进程组中每个进程的唯一标识符。它们始终是从 0 到 world_size 的连续整数。

参数

group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,则将使用默认进程组。

返回

进程组的 rank,如果不是组的一部分,则为 -1

返回类型

int

torch.distributed.get_world_size(group=None)[source][source]

返回当前进程组中的进程数。

参数

group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,则将使用默认进程组。

返回

进程组的 world size,如果不是组的一部分,则为 -1

返回类型

int

关闭

重要的是在退出时通过调用 destroy_process_group() 来清理资源。

要遵循的最简单模式是在训练脚本中不再需要通信的点(通常在 main() 的末尾附近),通过对 group 参数使用默认值 None 调用 destroy_process_group() 来销毁每个进程组和后端。每次训练器进程应调用一次,而不是在外部进程启动器级别调用。

如果在超时时间内 pg 中的所有 rank 未调用 destroy_process_group(),尤其是在应用程序中存在多个进程组(例如用于 N-D 并行)时,则可能会在退出时挂起。这是因为 ProcessGroupNCCL 的析构函数会调用 ncclCommAbort,这必须集体调用,但如果由 python 的 GC 调用,则调用 ProcessGroupNCCL 析构函数的顺序是不确定的。调用 destroy_process_group() 通过确保 ncclCommAbort 在 rank 之间以一致的顺序调用来提供帮助,并避免在 ProcessGroupNCCL 的析构函数期间调用 ncclCommAbort。

重新初始化

destroy_process_group 也可用于销毁单个进程组。一种用例可能是容错训练,其中进程组可能会被销毁,然后在运行时初始化一个新的进程组。在这种情况下,至关重要的是,在调用销毁之后和随后初始化之前,使用 torch.distributed 原语以外的某些方法同步训练器进程。由于实现此同步的难度,此行为当前不受支持/未经测试,并且被认为是已知问题。如果此用例阻止了您,请提交 github 问题或 RFC。


默认情况下,集合运算在默认组(也称为 world)上运行,并且需要所有进程进入分布式函数调用。但是,某些工作负载可以从更细粒度的通信中获益。这就是分布式组发挥作用的地方。new_group() 函数可用于创建新组,其中包含所有进程的任意子集。它返回一个不透明的组句柄,该句柄可以作为 group 参数提供给所有集合运算(集合运算是用于以某些众所周知的编程模式交换信息的分布式函数)。

torch.distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False, group_desc=None, device_id=None)[source][source]

创建一个新的分布式组。

此函数要求主组中的所有进程(即作为分布式作业一部分的所有进程)都进入此函数,即使它们不是该组的成员。此外,应在所有进程中以相同的顺序创建组。

警告

安全并发使用:当使用具有 NCCL 后端的多个进程组时,用户必须确保跨 rank 的集合运算具有全局一致的执行顺序。

如果进程内的多个线程发出集合运算,则必须进行显式同步以确保一致的排序。

当使用 torch.distributed 通信 API 的异步变体时,将返回一个 work 对象,并且通信内核在单独的 CUDA 流上排队,从而允许通信和计算重叠。一旦在一个进程组上发出一个或多个异步操作,则必须通过在使用另一个进程组之前调用 work.wait() 将它们与其他 CUDA 流同步。

有关更多详细信息,请参阅 并发使用多个 NCCL 通信器

参数
  • ranks (list[int]) – 组成员的 rank 列表。如果为 None,则将设置为所有 rank。默认值为 None

  • timeout (timedelta, 可选) – 有关详细信息和默认值,请参阅 init_process_group

  • backend (strBackend, 可选) – 要使用的后端。根据构建时配置,有效值为 gloonccl。默认情况下,使用与全局组相同的后端。此字段应以小写字符串形式给出(例如,"gloo"),也可以通过 Backend 属性访问(例如,Backend.GLOO)。如果传入 None,则将使用与默认进程组对应的后端。默认值为 None

  • pg_options (ProcessGroupOptions, 可选) – 进程组选项,用于指定在构造特定进程组期间需要传入的其他选项。例如,对于 nccl 后端,可以指定 is_high_priority_stream,以便进程组可以拾取高优先级 cuda 流。有关配置 nccl 的其他可用选项,请参阅 https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t

  • use_local_synchronization (bool, 可选) – 在进程组创建结束时执行组本地屏障。这与非成员 rank 不需要调用 API 并且不加入屏障的不同之处在于。

  • group_desc (str, 可选) – 用于描述进程组的字符串。

  • device_id (torch.device, 可选) – 要将此进程“绑定”到的单个特定设备,如果给定此字段,则 new_group 调用将尝试立即为该设备初始化通信后端。

返回

可以提供给集合运算调用的分布式组的句柄,或者如果 rank 不是 ranks 的一部分,则为 GroupMember.NON_GROUP_MEMBER。

注意:use_local_synchronization 不适用于 MPI。

注意:虽然 use_local_synchronization=True 在较大的集群和小型进程组中可能会显着加快速度,但必须小心,因为它会更改集群行为,因为非成员 rank 不会加入组屏障 ()。

注意:use_local_synchronization=True 可能会导致死锁,当每个 rank 创建多个重叠的进程组时。为避免这种情况,请确保所有 rank 遵循相同的全局创建顺序。

torch.distributed.get_group_rank(group, global_rank)[source][source]

将全局 rank 转换为组 rank。

global_rank 必须是 group 的一部分,否则会引发 RuntimeError。

参数
  • group (ProcessGroup) – 要查找相对 rank 的 ProcessGroup。

  • global_rank (int) – 要查询的全局 rank。

返回

global_rank 相对于 group 的组 rank

返回类型

int

注意:在默认进程组上调用此函数会返回标识

torch.distributed.get_global_rank(group, group_rank)[source][source]

将组 rank 转换为全局 rank。

group_rank 必须是 group 的一部分,否则会引发 RuntimeError。

参数
  • group (ProcessGroup) – 要从中查找全局 rank 的 ProcessGroup。

  • group_rank (int) – 要查询的组 rank。

返回

group_rank 相对于 group 的全局 rank

返回类型

int

注意:在默认进程组上调用此函数会返回标识

torch.distributed.get_process_group_ranks(group)[source][source]

获取与 group 关联的所有 rank。

参数

group (ProcessGroup) – 要从中获取所有 rank 的 ProcessGroup。

返回

按组 rank 排序的全局 rank 列表。

返回类型

List[int]

DeviceMesh

DeviceMesh 是一个更高级别的抽象,用于管理进程组(或 NCCL 通信器)。它允许用户轻松创建节点间和节点内进程组,而无需担心如何为不同的子进程组正确设置 rank,并且它有助于轻松管理这些分布式进程组。init_device_mesh() 函数可用于创建新的 DeviceMesh,并使用描述设备拓扑的网格形状。

class torch.distributed.device_mesh.DeviceMesh(device_type, mesh, *, mesh_dim_names=None, _init_backend=True)[source][source]

DeviceMesh 表示设备的网格,其中设备的布局可以表示为 n 维数组,并且 n 维数组的每个值都是默认进程组 rank 的全局 ID。

DeviceMesh 可用于描述集群中设备的布局,并充当集群内设备列表之间通信的代理。

DeviceMesh 可以用作上下文管理器。

注意

DeviceMesh 遵循 SPMD 编程模型,这意味着相同的 PyTorch Python 程序在集群中的所有进程/rank 上运行。因此,用户需要确保 mesh 数组(描述设备布局)在所有 rank 中都相同。不一致的 mesh 会导致静默挂起。

参数
  • device_type (str) – 网格的设备类型。当前支持:“cpu”、“cuda/类 cuda”。

  • mesh (ndarray) – 多维数组或整数张量,用于描述设备的布局,其中 ID 是默认进程组的全局 ID。

返回

表示设备布局的 DeviceMesh 对象。

返回类型

DeviceMesh

以下程序以 SPMD 方式在每个进程/rank 上运行。在此示例中,我们有 2 个主机,每个主机有 4 个 GPU。在网格的第一个维度上进行归约将跨列(0, 4)、…和(3, 7)进行归约,在网格的第二个维度上进行归约将跨行(0, 1, 2, 3)和(4, 5, 6, 7)进行归约。

示例:
>>> from torch.distributed.device_mesh import DeviceMesh
>>>
>>> # Initialize device mesh as (2, 4) to represent the topology
>>> # of cross-host(dim 0), and within-host (dim 1).
>>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])
static from_group(group, device_type, mesh=None, *, mesh_dim_names=None)[source][source]

使用来自现有 ProcessGroupdevice_type 构造 DeviceMesh

构造的设备网格的维度数等于传递的组数。如果传递了多个组,则需要 mesh 参数。

返回类型

DeviceMesh

get_all_groups()[source][source]

返回所有网格维度的 ProcessGroup 列表。

返回

ProcessGroup 对象的列表。

返回类型

List[ProcessGroup]

get_coordinate()[source][source]

返回此 rank 相对于网格所有维度的相对索引。如果此 rank 不是网格的一部分,则返回 None。

返回类型

Optional[List[int]]

get_group(mesh_dim=None)[source][source]

返回 mesh_dim 指定的单个 ProcessGroup,或者,如果未指定 mesh_dim 且 DeviceMesh 是一维的,则返回网格中唯一的 ProcessGroup。

参数
  • mesh_dim (str/python:int, 可选) – 它可以是网格维度的名称或索引

  • None. (网格维度的默认值为) –

返回

ProcessGroup 对象。

返回类型

ProcessGroup

get_local_rank(mesh_dim=None)[source][source]

返回 DeviceMesh 给定 mesh_dim 的本地 rank。

参数
  • mesh_dim (str/python:int, 可选) – 它可以是网格维度的名称或索引

  • None. (网格维度的默认值为) –

返回

一个表示本地 rank 的整数。

返回类型

int

以下程序以 SPMD 方式在每个进程/rank 上运行。在此示例中,我们有 2 个主机,每个主机有 4 个 GPU。在 rank 0、1、2、3 上调用 mesh_2d.get_local_rank(mesh_dim=0) 将返回 0。在 rank 4、5、6、7 上调用 mesh_2d.get_local_rank(mesh_dim=0) 将返回 1。在 rank 0、4 上调用 mesh_2d.get_local_rank(mesh_dim=1) 将返回 0。在 rank 1、5 上调用 mesh_2d.get_local_rank(mesh_dim=1) 将返回 1。在 rank 2、6 上调用 mesh_2d.get_local_rank(mesh_dim=1) 将返回 2。在 rank 3、7 上调用 mesh_2d.get_local_rank(mesh_dim=1) 将返回 3。

示例:
>>> from torch.distributed.device_mesh import DeviceMesh
>>>
>>> # Initialize device mesh as (2, 4) to represent the topology
>>> # of cross-host(dim 0), and within-host (dim 1).
>>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])
get_rank()[source][source]

返回当前的全局 rank。

返回类型

int

点对点通信

torch.distributed.send(tensor, dst=None, group=None, tag=0, group_dst=None)[source][source]

同步发送张量。

警告

tag 不支持 NCCL 后端。

参数
  • tensor (Tensor) – 要发送的张量。

  • dst (int) – 全局进程组上的目标 rank(与 group 参数无关)。目标 rank 不应与当前进程的 rank 相同。

  • group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,则将使用默认进程组。

  • tag (int, 可选) – 用于将发送与远程接收匹配的标签

  • group_dst (int, 可选) – group 上的目标 rank。指定 dstgroup_dst 都是无效的。

torch.distributed.recv(tensor, src=None, group=None, tag=0, group_src=None)[source][source]

同步接收张量。

警告

tag 不支持 NCCL 后端。

参数
  • tensor (Tensor) – 用于填充接收数据的张量。

  • src (int, 可选) – 全局进程组上的源 rank(与 group 参数无关)。如果未指定,将从任何进程接收。

  • group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,则将使用默认进程组。

  • tag (int, 可选) – 用于将接收与远程发送匹配的标签

  • group_src (int, 可选) – group 上的源 rank。指定 srcgroup_src 都是无效的。

返回

如果不在组中,则发送者 rank 为 -1

返回类型

int

isend()irecv() 在使用时返回分布式请求对象。一般来说,此对象的类型是未指定的,因为它们永远不应手动创建,但它们保证支持两种方法

  • is_completed() - 如果操作已完成,则返回 True

  • wait() - 将阻塞进程直到操作完成。is_completed() 保证在它返回后返回 True。

torch.distributed.isend(tensor, dst=None, group=None, tag=0, group_dst=None)[source][source]

异步发送张量。

警告

在请求完成之前修改 tensor 会导致未定义的行为。

警告

tag 不支持 NCCL 后端。

与阻塞的 send 不同,isend 允许 src == dst rank,即发送给自己。

参数
  • tensor (Tensor) – 要发送的张量。

  • dst (int) – 全局进程组上的目标 rank(与 group 参数无关)

  • group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,则将使用默认进程组。

  • tag (int, 可选) – 用于将发送与远程接收匹配的标签

  • group_dst (int, 可选) – group 上的目标 rank。指定 dstgroup_dst 都是无效的

返回

分布式请求对象。如果不在组中,则为 None

返回类型

Optional[Work]

torch.distributed.irecv(tensor, src=None, group=None, tag=0, group_src=None)[source][source]

异步接收张量。

警告

tag 不支持 NCCL 后端。

与阻塞的 recv 不同,irecv 允许 src == dst rank,即从自己接收。

参数
  • tensor (Tensor) – 用于填充接收数据的张量。

  • src (int, 可选) – 全局进程组上的源 rank(与 group 参数无关)。如果未指定,将从任何进程接收。

  • group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,则将使用默认进程组。

  • tag (int, 可选) – 用于将接收与远程发送匹配的标签

  • group_src (int, 可选) – group 上的源 rank。指定 srcgroup_src 都是无效的。

返回

分布式请求对象。如果不在组中,则为 None

返回类型

Optional[Work]

torch.distributed.send_object_list(object_list, dst=None, group=None, device=None, group_dst=None)[source][source]

同步发送 object_list 中的可 pickle 对象。

类似于 send(),但可以传入 Python 对象。请注意,object_list 中的所有对象都必须是可 pickle 的才能发送。

参数
  • object_list (List[Any]) – 要发送的输入对象列表。每个对象都必须是可 pickle 的。接收者必须提供大小相等的列表。

  • dst (int) – 要将 object_list 发送到的目标 rank。目标 rank 基于全局进程组(与 group 参数无关)

  • group (Optional[ProcessGroup]) – (ProcessGroup, 可选): 要在其上工作的进程组。如果为 None,则将使用默认进程组。默认为 None

  • device (torch.device, optional) – 如果不为 None,则对象将被序列化并转换为张量,这些张量在发送之前会被移动到 device。默认为 None

  • group_dst (int, 可选) – group 上的目标 rank。必须指定 dstgroup_dst 中的一个,但不能同时指定两者

返回

None.

注意

对于基于 NCCL 的进程组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由 torch.cuda.current_device() 给出,用户有责任确保将其设置为使每个 rank 都有一个单独的 GPU,通过 torch.cuda.set_device()

警告

send_object_list() 隐式使用 pickle 模块,该模块已知是不安全的。有可能构造恶意 pickle 数据,这些数据将在反 pickle 期间执行任意代码。仅对您信任的数据调用此函数。

警告

使用 GPU 张量调用 send_object_list() 不受良好支持且效率低下,因为它会产生 GPU -> CPU 传输,因为张量将被 pickle 化。请考虑改用 send()

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> # Assumes backend is not NCCL
>>> device = torch.device("cpu")
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 2.
>>>     objects = ["foo", 12, {1: 2}] # any picklable object
>>>     dist.send_object_list(objects, dst=1, device=device)
>>> else:
>>>     objects = [None, None, None]
>>>     dist.recv_object_list(objects, src=0, device=device)
>>> objects
['foo', 12, {1: 2}]
torch.distributed.recv_object_list(object_list, src=None, group=None, device=None, group_src=None)[source][source]

同步接收 object_list 中的可 pickle 对象。

类似于 recv(),但可以接收 Python 对象。

参数
  • object_list (List[Any]) – 要接收到的对象列表。必须提供大小等于要发送的列表大小的列表。

  • src (int, 可选) – 从其接收 object_list 的源 rank。源 rank 基于全局进程组(与 group 参数无关)如果设置为 None,将从任何 rank 接收。默认为 None

  • group (Optional[ProcessGroup]) – (ProcessGroup, 可选): 要在其上工作的进程组。如果为 None,则将使用默认进程组。默认为 None

  • device (torch.device, optional) – 如果不为 None,则在此设备上接收。默认为 None

  • group_src (int, 可选) – group 上的源 rank。指定 srcgroup_src 都是无效的。

返回

发送者 rank。如果 rank 不在组中,则为 -1。如果 rank 在组中,则 object_list 将包含来自 src rank 的已发送对象。

注意

对于基于 NCCL 的进程组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由 torch.cuda.current_device() 给出,用户有责任确保将其设置为使每个 rank 都有一个单独的 GPU,通过 torch.cuda.set_device()

警告

recv_object_list() 隐式使用 pickle 模块,该模块已知是不安全的。有可能构造恶意 pickle 数据,这些数据将在反 pickle 期间执行任意代码。仅对您信任的数据调用此函数。

警告

使用 GPU 张量调用 recv_object_list() 不受良好支持且效率低下,因为它会产生 GPU -> CPU 传输,因为张量将被 pickle 化。请考虑改用 recv()

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> # Assumes backend is not NCCL
>>> device = torch.device("cpu")
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 2.
>>>     objects = ["foo", 12, {1: 2}] # any picklable object
>>>     dist.send_object_list(objects, dst=1, device=device)
>>> else:
>>>     objects = [None, None, None]
>>>     dist.recv_object_list(objects, src=0, device=device)
>>> objects
['foo', 12, {1: 2}]
torch.distributed.batch_isend_irecv(p2p_op_list)[source][source]

异步发送或接收一批张量,并返回请求列表。

处理 p2p_op_list 中的每个操作,并返回相应的请求。目前支持 NCCL、Gloo 和 UCC 后端。

参数

p2p_op_list (List[P2POp]) – 点对点操作列表(每个运算符的类型为 torch.distributed.P2POp)。列表中 isend/irecv 的顺序很重要,它需要与远程端的相应 isend/irecv 匹配。

返回

通过调用 op_list 中的相应 op 返回的分布式请求对象列表。

返回类型

List[Work]

示例

>>> send_tensor = torch.arange(2, dtype=torch.float32) + 2 * rank
>>> recv_tensor = torch.randn(2, dtype=torch.float32)
>>> send_op = dist.P2POp(dist.isend, send_tensor, (rank + 1)%world_size)
>>> recv_op = dist.P2POp(dist.irecv, recv_tensor, (rank - 1 + world_size)%world_size)
>>> reqs = batch_isend_irecv([send_op, recv_op])
>>> for req in reqs:
>>>     req.wait()
>>> recv_tensor
tensor([2, 3])     # Rank 0
tensor([0, 1])     # Rank 1

注意

请注意,当此 API 与 NCCL PG 后端一起使用时,用户必须使用 torch.cuda.set_device 设置当前的 GPU 设备,否则会导致意外的挂起问题。

此外,如果此 API 是传递给 dist.P2POpgroup 中的第一个集合调用,则 group 的所有 rank 都必须参与此 API 调用;否则,行为未定义。如果此 API 调用不是 group 中的第一个集合调用,则允许仅涉及 group 的 rank 子集的批量 P2P 操作。

class torch.distributed.P2POp(op, tensor, peer=None, group=None, tag=0, group_peer=None)[source][source]

一个为 batch_isend_irecv 构建点对点操作的类。

此类构建 P2P 操作的类型、通信缓冲区、对等 rank、进程组和标签。此类的实例将传递给 batch_isend_irecv 以进行点对点通信。

参数
  • op (Callable) – 用于向对等进程发送数据或从对等进程接收数据的函数。op 的类型为 torch.distributed.isendtorch.distributed.irecv

  • tensor (Tensor) – 要发送或接收的张量。

  • peer (int, 可选) – 目标或源 rank。

  • group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,则将使用默认进程组。

  • tag (int, 可选) – 用于将发送与接收匹配的标签。

  • group_peer (int, 可选) – 目标或源 rank。

同步和异步集合操作

每个集合操作函数都支持以下两种操作类型,具体取决于传递给集合的 async_op 标志的设置

同步操作 - 默认模式,当 async_op 设置为 False 时。当函数返回时,保证执行集合操作。对于 CUDA 操作,不保证 CUDA 操作已完成,因为 CUDA 操作是异步的。对于 CPU 集合,任何进一步利用集合调用输出的函数调用都将按预期运行。对于 CUDA 集合,在同一 CUDA 流上利用输出的函数调用将按预期运行。用户必须注意在不同流下运行时进行同步。有关 CUDA 语义(例如流同步)的详细信息,请参阅 CUDA 语义。请参阅下面的脚本,以查看 CPU 和 CUDA 操作的这些语义差异的示例。

异步操作 - 当 async_op 设置为 True 时。集合操作函数返回一个分布式请求对象。一般来说,您不需要手动创建它,并且它保证支持两种方法

  • is_completed() - 对于 CPU 集合,如果完成则返回 True。对于 CUDA 操作,如果操作已成功排队到 CUDA 流中,并且可以在默认流上使用输出而无需进一步同步,则返回 True

  • wait() - 对于 CPU 集合,将阻塞进程直到操作完成。对于 CUDA 集合,将阻塞直到操作已成功排队到 CUDA 流中,并且可以在默认流上使用输出而无需进一步同步。

  • get_future() - 返回 torch._C.Future 对象。NCCL 支持,GLOO 和 MPI 上的大多数操作也支持,点对点操作除外。注意:随着我们继续采用 Futures 并合并 API,get_future() 调用可能会变得冗余。

示例

以下代码可用作 CUDA 操作在使用分布式集合时的语义参考。它显示了在不同 CUDA 流上使用集合输出时显式同步的必要性

# Code runs on each rank.
dist.init_process_group("nccl", rank=rank, world_size=2)
output = torch.tensor([rank]).cuda(rank)
s = torch.cuda.Stream()
handle = dist.all_reduce(output, async_op=True)
# Wait ensures the operation is enqueued, but not necessarily complete.
handle.wait()
# Using result on non-default stream.
with torch.cuda.stream(s):
    s.wait_stream(torch.cuda.default_stream())
    output.add_(100)
if rank == 0:
    # if the explicit call to wait_stream was omitted, the output below will be
    # non-deterministically 1 or 101, depending on whether the allreduce overwrote
    # the value after the add completed.
    print(output)

集合函数

torch.distributed.broadcast(tensor, src=None, group=None, async_op=False, group_src=None)[source][source]

将张量广播到整个组。

tensor 在参与集合的所有进程中必须具有相同数量的元素。

参数
  • tensor (Tensor) – 如果 src 是当前进程的 rank,则为要发送的数据,否则为用于保存接收数据的张量。

  • src (int) – 全局进程组上的源 rank(与 group 参数无关)。

  • group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,则将使用默认进程组。

  • async_op (bool, 可选) – 此操作是否应为异步操作

  • group_src (int) – group 上的源 rank。必须指定 group_srcsrc 中的一个,但不能同时指定两者。

返回

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不在组中,则为 None

torch.distributed.broadcast_object_list(object_list, src=None, group=None, device=None, group_src=None)[source][source]

object_list 中的可 pickle 对象广播到整个组。

类似于 broadcast(),但可以传入 Python 对象。请注意,object_list 中的所有对象都必须是可 pickle 的才能广播。

参数
  • object_list (List[Any]) – 要广播的输入对象列表。每个对象都必须是可 pickle 的。只有 src rank 上的对象将被广播,但每个 rank 都必须提供大小相等的列表。

  • src (int) – 从其广播 object_list 的源 rank。源 rank 基于全局进程组(与 group 参数无关)

  • group (Optional[ProcessGroup]) – (ProcessGroup, 可选): 要在其上工作的进程组。如果为 None,则将使用默认进程组。默认为 None

  • device (torch.device, optional) – 如果不为 None,则对象将被序列化并转换为张量,这些张量在广播之前会被移动到 device。默认为 None

  • group_src (int) – group 上的源 rank。不得指定 group_srcsrc 中的一个,但不能同时指定两者。

返回

None。如果 rank 在组中,则 object_list 将包含来自 src rank 的广播对象。

注意

对于基于 NCCL 的进程组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由 torch.cuda.current_device() 给出,用户有责任确保将其设置为使每个 rank 都有一个单独的 GPU,通过 torch.cuda.set_device()

注意

请注意,此 API 与 broadcast() 集合略有不同,因为它不提供 async_op 句柄,因此将是一个阻塞调用。

警告

broadcast_object_list() 隐式使用 pickle 模块,该模块已知是不安全的。有可能构造恶意 pickle 数据,这些数据将在反 pickle 期间执行任意代码。仅对您信任的数据调用此函数。

警告

使用 GPU 张量调用 broadcast_object_list() 不受良好支持且效率低下,因为它会产生 GPU -> CPU 传输,因为张量将被 pickle 化。请考虑改用 broadcast()

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 3.
>>>     objects = ["foo", 12, {1: 2}] # any picklable object
>>> else:
>>>     objects = [None, None, None]
>>> # Assumes backend is not NCCL
>>> device = torch.device("cpu")
>>> dist.broadcast_object_list(objects, src=0, device=device)
>>> objects
['foo', 12, {1: 2}]
torch.distributed.all_reduce(tensor, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source][source]

跨所有机器归约张量数据,以便所有机器都获得最终结果。

调用 tensor 后,所有进程中的 tensor 将在位级别上完全相同。

支持复数张量。

参数
  • tensor (Tensor) – 集体通信的输入和输出。此函数为原地操作。

  • op (可选) – torch.distributed.ReduceOp 枚举类型中的一个值。指定用于元素级归约的操作。

  • group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,则将使用默认进程组。

  • async_op (bool, 可选) – 此操作是否应为异步操作

返回

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不在组中,则为 None

示例

>>> # All tensors below are of torch.int64 type.
>>> # We have 2 process groups, 2 ranks.
>>> device = torch.device(f'cuda:{rank}')
>>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank
>>> tensor
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> dist.all_reduce(tensor, op=ReduceOp.SUM)
>>> tensor
tensor([4, 6], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1
>>> # All tensors below are of torch.cfloat type.
>>> # We have 2 process groups, 2 ranks.
>>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat, device=device) + 2 * rank * (1+1j)
>>> tensor
tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0
tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1
>>> dist.all_reduce(tensor, op=ReduceOp.SUM)
>>> tensor
tensor([4.+4.j, 6.+6.j], device='cuda:0') # Rank 0
tensor([4.+4.j, 6.+6.j], device='cuda:1') # Rank 1
torch.distributed.reduce(tensor, dst=None, op=<RedOpType.SUM: 0>, group=None, async_op=False, group_dst=None)[source][source]

跨所有机器归约张量数据。

只有 rank 为 dst 的进程会接收到最终结果。

参数
  • tensor (Tensor) – 集体通信的输入和输出。此函数为原地操作。

  • dst (int) – 全局进程组上的目标 rank(与 group 参数无关)

  • op (可选) – torch.distributed.ReduceOp 枚举类型中的一个值。指定用于元素级归约的操作。

  • group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,则将使用默认进程组。

  • async_op (bool, 可选) – 此操作是否应为异步操作

  • group_dst (int) – group 上的目标 rank。必须指定 group_dstdst 中的一个,但不能同时指定两者。

返回

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不在组中,则为 None

torch.distributed.all_gather(tensor_list, tensor, group=None, async_op=False)[source][source]

在列表中收集来自整个组的张量。

支持复数和大小不均的张量。

参数
  • tensor_list (list[Tensor]) – 输出列表。它应该包含正确大小的张量,用于集体通信的输出。支持大小不均的张量。

  • tensor (Tensor) – 从当前进程广播的张量。

  • group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,则将使用默认进程组。

  • async_op (bool, 可选) – 此操作是否应为异步操作

返回

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不在组中,则为 None

示例

>>> # All tensors below are of torch.int64 dtype.
>>> # We have 2 process groups, 2 ranks.
>>> device = torch.device(f'cuda:{rank}')
>>> tensor_list = [torch.zeros(2, dtype=torch.int64, device=device) for _ in range(2)]
>>> tensor_list
[tensor([0, 0], device='cuda:0'), tensor([0, 0], device='cuda:0')] # Rank 0
[tensor([0, 0], device='cuda:1'), tensor([0, 0], device='cuda:1')] # Rank 1
>>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank
>>> tensor
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> dist.all_gather(tensor_list, tensor)
>>> tensor_list
[tensor([1, 2], device='cuda:0'), tensor([3, 4], device='cuda:0')] # Rank 0
[tensor([1, 2], device='cuda:1'), tensor([3, 4], device='cuda:1')] # Rank 1
>>> # All tensors below are of torch.cfloat dtype.
>>> # We have 2 process groups, 2 ranks.
>>> tensor_list = [torch.zeros(2, dtype=torch.cfloat, device=device) for _ in range(2)]
>>> tensor_list
[tensor([0.+0.j, 0.+0.j], device='cuda:0'), tensor([0.+0.j, 0.+0.j], device='cuda:0')] # Rank 0
[tensor([0.+0.j, 0.+0.j], device='cuda:1'), tensor([0.+0.j, 0.+0.j], device='cuda:1')] # Rank 1
>>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat, device=device) + 2 * rank * (1+1j)
>>> tensor
tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0
tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1
>>> dist.all_gather(tensor_list, tensor)
>>> tensor_list
[tensor([1.+1.j, 2.+2.j], device='cuda:0'), tensor([3.+3.j, 4.+4.j], device='cuda:0')] # Rank 0
[tensor([1.+1.j, 2.+2.j], device='cuda:1'), tensor([3.+3.j, 4.+4.j], device='cuda:1')] # Rank 1
torch.distributed.all_gather_into_tensor(output_tensor, input_tensor, group=None, async_op=False)[source][source]

从所有 rank 收集张量,并将它们放入单个输出张量中。

此函数要求每个进程上的所有张量大小相同。

参数
  • output_tensor (Tensor) – 输出张量,用于容纳来自所有 rank 的张量元素。它必须具有正确的大小,具有以下形式之一:(i) 沿主维度的所有输入张量的拼接;有关“拼接”的定义,请参见 torch.cat();(ii) 沿主维度的所有输入张量的堆叠;有关“堆叠”的定义,请参见 torch.stack()。以下示例可能更好地解释支持的输出形式。

  • input_tensor (Tensor) – 要从当前 rank 收集的张量。与 all_gather API 不同,此 API 中的输入张量在所有 rank 中必须具有相同的大小。

  • group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,则将使用默认进程组。

  • async_op (bool, 可选) – 此操作是否应为异步操作

返回

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不在组中,则为 None

示例

>>> # All tensors below are of torch.int64 dtype and on CUDA devices.
>>> # We have two ranks.
>>> device = torch.device(f'cuda:{rank}')
>>> tensor_in = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank
>>> tensor_in
tensor([1, 2], device='cuda:0') # Rank 0
tensor([3, 4], device='cuda:1') # Rank 1
>>> # Output in concatenation form
>>> tensor_out = torch.zeros(world_size * 2, dtype=torch.int64, device=device)
>>> dist.all_gather_into_tensor(tensor_out, tensor_in)
>>> tensor_out
tensor([1, 2, 3, 4], device='cuda:0') # Rank 0
tensor([1, 2, 3, 4], device='cuda:1') # Rank 1
>>> # Output in stack form
>>> tensor_out2 = torch.zeros(world_size, 2, dtype=torch.int64, device=device)
>>> dist.all_gather_into_tensor(tensor_out2, tensor_in)
>>> tensor_out2
tensor([[1, 2],
        [3, 4]], device='cuda:0') # Rank 0
tensor([[1, 2],
        [3, 4]], device='cuda:1') # Rank 1

警告

Gloo 后端不支持此 API。

torch.distributed.all_gather_object(object_list, obj, group=None)[source][source]

将来自整个组的可 pickle 对象收集到一个列表中。

类似于 all_gather(),但可以传入 Python 对象。请注意,对象必须是可 pickle 的才能被收集。

参数
  • object_list (list[Any]) – 输出列表。它的大小应与此集体通信组的大小相同,并将包含输出。

  • obj (Any) – 要从当前进程广播的可 pickle 的 Python 对象。

  • group (ProcessGroup, optional) – 要在其上工作的进程组。如果为 None,则将使用默认进程组。默认为 None

返回

无。如果调用 rank 是此组的一部分,则集体通信的输出将填充到输入 object_list 中。如果调用 rank 不是组的一部分,则传入的 object_list 将保持不变。

注意

请注意,此 API 与 all_gather() 集体通信略有不同,因为它不提供 async_op 句柄,因此将是一个阻塞调用。

注意

对于基于 NCCL 的进程组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由 torch.cuda.current_device() 给出,用户有责任确保将其设置为每个 rank 都有一个单独的 GPU,通过 torch.cuda.set_device()

警告

all_gather_object() 隐式使用 pickle 模块,已知该模块是不安全的。可以构造恶意 pickle 数据,在反 pickle 化期间执行任意代码。仅对信任的数据调用此函数。

警告

使用 GPU 张量调用 all_gather_object() 的支持不是很好,效率低下,因为它会产生 GPU -> CPU 的传输,因为张量将被 pickle 化。请考虑改用 all_gather()

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> # Assumes world_size of 3.
>>> gather_objects = ["foo", 12, {1: 2}] # any picklable object
>>> output = [None for _ in gather_objects]
>>> dist.all_gather_object(output, gather_objects[dist.get_rank()])
>>> output
['foo', 12, {1: 2}]
torch.distributed.gather(tensor, gather_list=None, dst=None, group=None, async_op=False, group_dst=None)[source][source]

在单个进程中收集张量列表。

此函数要求每个进程上的所有张量大小相同。

参数
  • tensor (Tensor) – 输入张量。

  • gather_list (list[Tensor], optional) – 用于收集数据的适当大小、相同大小的张量列表(默认为 None,必须在目标 rank 上指定)

  • dst (int, optional) – 全局进程组上的目标 rank(与 group 参数无关)。(如果 dstgroup_dst 均为 None,则默认为全局 rank 0)

  • group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,则将使用默认进程组。

  • async_op (bool, 可选) – 此操作是否应为异步操作

  • group_dst (int, 可选) – group 上的目标 rank。指定 dstgroup_dst 都是无效的

返回

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不在组中,则为 None

注意

请注意,gather_list 中的所有张量必须具有相同的大小。

示例:
>>> # We have 2 process groups, 2 ranks.
>>> tensor_size = 2
>>> device = torch.device(f'cuda:{rank}')
>>> tensor = torch.ones(tensor_size, device=device) + rank
>>> if dist.get_rank() == 0:
>>>     gather_list = [torch.zeros_like(tensor, device=device) for i in range(2)]
>>> else:
>>>     gather_list = None
>>> dist.gather(tensor, gather_list, dst=0)
>>> # Rank 0 gets gathered data.
>>> gather_list
[tensor([1., 1.], device='cuda:0'), tensor([2., 2.], device='cuda:0')] # Rank 0
None                                                                   # Rank 1
torch.distributed.gather_object(obj, object_gather_list=None, dst=None, group=None, group_dst=None)[source][source]

在单个进程中收集来自整个组的可 pickle 对象。

类似于 gather(),但可以传入 Python 对象。请注意,对象必须是可 pickle 的才能被收集。

参数
  • obj (Any) – 输入对象。必须是可 pickle 的。

  • object_gather_list (list[Any]) – 输出列表。在 dst rank 上,它的大小应与此集体通信组的大小相同,并将包含输出。在非 dst rank 上必须为 None。(默认为 None

  • dst (int, optional) – 全局进程组上的目标 rank(与 group 参数无关)。(如果 dstgroup_dst 均为 None,则默认为全局 rank 0)

  • group (Optional[ProcessGroup]) – (ProcessGroup, 可选): 要在其上工作的进程组。如果为 None,则将使用默认进程组。默认为 None

  • group_dst (int, 可选) – group 上的目标 rank。指定 dstgroup_dst 都是无效的

返回

无。在 dst rank 上,object_gather_list 将包含集体通信的输出。

注意

请注意,此 API 与 gather 集体通信略有不同,因为它不提供 async_op 句柄,因此将是一个阻塞调用。

注意

对于基于 NCCL 的进程组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由 torch.cuda.current_device() 给出,用户有责任确保将其设置为每个 rank 都有一个单独的 GPU,通过 torch.cuda.set_device()

警告

gather_object() 隐式使用 pickle 模块,已知该模块是不安全的。可以构造恶意 pickle 数据,在反 pickle 化期间执行任意代码。仅对信任的数据调用此函数。

警告

使用 GPU 张量调用 gather_object() 的支持不是很好,效率低下,因为它会产生 GPU -> CPU 的传输,因为张量将被 pickle 化。请考虑改用 gather()

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> # Assumes world_size of 3.
>>> gather_objects = ["foo", 12, {1: 2}] # any picklable object
>>> output = [None for _ in gather_objects]
>>> dist.gather_object(
...     gather_objects[dist.get_rank()],
...     output if dist.get_rank() == 0 else None,
...     dst=0
... )
>>> # On rank 0
>>> output
['foo', 12, {1: 2}]
torch.distributed.scatter(tensor, scatter_list=None, src=None, group=None, async_op=False, group_src=None)[source][source]

将张量列表分发到组中的所有进程。

每个进程将接收到一个张量,并将其数据存储在 tensor 参数中。

支持复数张量。

参数
  • tensor (Tensor) – 输出张量。

  • scatter_list (list[Tensor]) – 要分发的张量列表(默认为 None,必须在源 rank 上指定)

  • src (int) – 全局进程组上的源 rank(与 group 参数无关)。(如果 srcgroup_src 均为 None,则默认为全局 rank 0)

  • group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,则将使用默认进程组。

  • async_op (bool, 可选) – 此操作是否应为异步操作

  • group_src (int, optional) – group 上的源 rank。不能同时指定 srcgroup_src

返回

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不在组中,则为 None

注意

请注意,scatter_list 中的所有张量必须具有相同的大小。

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> tensor_size = 2
>>> device = torch.device(f'cuda:{rank}')
>>> output_tensor = torch.zeros(tensor_size, device=device)
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 2.
>>>     # Only tensors, all of which must be the same size.
>>>     t_ones = torch.ones(tensor_size, device=device)
>>>     t_fives = torch.ones(tensor_size, device=device) * 5
>>>     scatter_list = [t_ones, t_fives]
>>> else:
>>>     scatter_list = None
>>> dist.scatter(output_tensor, scatter_list, src=0)
>>> # Rank i gets scatter_list[i].
>>> output_tensor
tensor([1., 1.], device='cuda:0') # Rank 0
tensor([5., 5.], device='cuda:1') # Rank 1
torch.distributed.scatter_object_list(scatter_object_output_list, scatter_object_input_list=None, src=None, group=None, group_src=None)[source][source]

scatter_object_input_list 中的可 pickle 对象分发到整个组。

类似于 scatter(),但可以传入 Python 对象。在每个 rank 上,分发的对象将存储为 scatter_object_output_list 的第一个元素。请注意,scatter_object_input_list 中的所有对象都必须是可 pickle 的才能被分发。

参数
  • scatter_object_output_list (List[Any]) – 非空列表,其第一个元素将存储分发到此 rank 的对象。

  • scatter_object_input_list (List[Any], optional) – 要分发的输入对象列表。每个对象必须是可 pickle 的。只有 src rank 上的对象将被分发,非 src rank 的参数可以为 None

  • src (int) – 从其分发 scatter_object_input_list 的源 rank。源 rank 基于全局进程组(与 group 参数无关)。(如果 srcgroup_src 均为 None,则默认为全局 rank 0)

  • group (Optional[ProcessGroup]) – (ProcessGroup, 可选): 要在其上工作的进程组。如果为 None,则将使用默认进程组。默认为 None

  • group_src (int, optional) – group 上的源 rank。不能同时指定 srcgroup_src

返回

None。如果 rank 是组的一部分,则 scatter_object_output_list 的第一个元素将设置为分发到此 rank 的对象。

注意

请注意,此 API 与 scatter 集体通信略有不同,因为它不提供 async_op 句柄,因此将是一个阻塞调用。

警告

scatter_object_list() 隐式使用 pickle 模块,已知该模块是不安全的。可以构造恶意 pickle 数据,在反 pickle 化期间执行任意代码。仅对信任的数据调用此函数。

警告

使用 GPU 张量调用 scatter_object_list() 的支持不是很好,效率低下,因为它会产生 GPU -> CPU 的传输,因为张量将被 pickle 化。请考虑改用 scatter()

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> if dist.get_rank() == 0:
>>>     # Assumes world_size of 3.
>>>     objects = ["foo", 12, {1: 2}] # any picklable object
>>> else:
>>>     # Can be any list on non-src ranks, elements are not used.
>>>     objects = [None, None, None]
>>> output_list = [None]
>>> dist.scatter_object_list(output_list, objects, src=0)
>>> # Rank i gets objects[i]. For example, on rank 2:
>>> output_list
[{1: 2}]
torch.distributed.reduce_scatter(output, input_list, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source][source]

先归约,然后将张量列表分发到组中的所有进程。

参数
  • output (Tensor) – 输出张量。

  • input_list (list[Tensor]) – 要归约和分发的张量列表。

  • op (可选) – torch.distributed.ReduceOp 枚举类型中的一个值。指定用于元素级归约的操作。

  • group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,则将使用默认进程组。

  • async_op (bool, optional) – 此操作是否应为异步操作。

返回

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不是组的一部分,则为 None。

torch.distributed.reduce_scatter_tensor(output, input, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source][source]

先归约,然后将张量分发到组中的所有 rank。

参数
  • output (Tensor) – 输出张量。它在所有 rank 中应具有相同的大小。

  • input (Tensor) – 要归约和分发的输入张量。其大小应为输出张量大小乘以世界大小。输入张量可以具有以下形状之一:(i) 沿主维度的输出张量的拼接,或 (ii) 沿主维度的输出张量的堆叠。有关“拼接”的定义,请参见 torch.cat()。有关“堆叠”的定义,请参见 torch.stack()

  • group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,则将使用默认进程组。

  • async_op (bool, optional) – 此操作是否应为异步操作。

返回

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不是组的一部分,则为 None。

示例

>>> # All tensors below are of torch.int64 dtype and on CUDA devices.
>>> # We have two ranks.
>>> device = torch.device(f'cuda:{rank}')
>>> tensor_out = torch.zeros(2, dtype=torch.int64, device=device)
>>> # Input in concatenation form
>>> tensor_in = torch.arange(world_size * 2, dtype=torch.int64, device=device)
>>> tensor_in
tensor([0, 1, 2, 3], device='cuda:0') # Rank 0
tensor([0, 1, 2, 3], device='cuda:1') # Rank 1
>>> dist.reduce_scatter_tensor(tensor_out, tensor_in)
>>> tensor_out
tensor([0, 2], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1
>>> # Input in stack form
>>> tensor_in = torch.reshape(tensor_in, (world_size, 2))
>>> tensor_in
tensor([[0, 1],
        [2, 3]], device='cuda:0') # Rank 0
tensor([[0, 1],
        [2, 3]], device='cuda:1') # Rank 1
>>> dist.reduce_scatter_tensor(tensor_out, tensor_in)
>>> tensor_out
tensor([0, 2], device='cuda:0') # Rank 0
tensor([4, 6], device='cuda:1') # Rank 1

警告

Gloo 后端不支持此 API。

torch.distributed.all_to_all_single(output, input, output_split_sizes=None, input_split_sizes=None, group=None, async_op=False)[source][source]

分割输入张量,然后将分割列表分发到组中的所有进程。

稍后,从组中所有进程接收到的张量将被拼接,并作为单个输出张量返回。

支持复数张量。

参数
  • output (Tensor) – 收集的拼接输出张量。

  • input (Tensor) – 要分发的输入张量。

  • output_split_sizes – (list[Int], optional): 如果指定为 None 或空,则为维度 0 的输出分割大小,output 张量的维度 0 必须能被 world_size 整除。

  • input_split_sizes – (list[Int], optional): 如果指定为 None 或空,则为维度 0 的输入分割大小,input 张量的维度 0 必须能被 world_size 整除。

  • group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,则将使用默认进程组。

  • async_op (bool, optional) – 此操作是否应为异步操作。

返回

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不是组的一部分,则为 None。

警告

all_to_all_single 是实验性的,可能会发生变化。

示例

>>> input = torch.arange(4) + rank * 4
>>> input
tensor([0, 1, 2, 3])     # Rank 0
tensor([4, 5, 6, 7])     # Rank 1
tensor([8, 9, 10, 11])   # Rank 2
tensor([12, 13, 14, 15]) # Rank 3
>>> output = torch.empty([4], dtype=torch.int64)
>>> dist.all_to_all_single(output, input)
>>> output
tensor([0, 4, 8, 12])    # Rank 0
tensor([1, 5, 9, 13])    # Rank 1
tensor([2, 6, 10, 14])   # Rank 2
tensor([3, 7, 11, 15])   # Rank 3
>>> # Essentially, it is similar to following operation:
>>> scatter_list = list(input.chunk(world_size))
>>> gather_list  = list(output.chunk(world_size))
>>> for i in range(world_size):
>>>     dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i)
>>> # Another example with uneven split
>>> input
tensor([0, 1, 2, 3, 4, 5])                                       # Rank 0
tensor([10, 11, 12, 13, 14, 15, 16, 17, 18])                     # Rank 1
tensor([20, 21, 22, 23, 24])                                     # Rank 2
tensor([30, 31, 32, 33, 34, 35, 36])                             # Rank 3
>>> input_splits
[2, 2, 1, 1]                                                     # Rank 0
[3, 2, 2, 2]                                                     # Rank 1
[2, 1, 1, 1]                                                     # Rank 2
[2, 2, 2, 1]                                                     # Rank 3
>>> output_splits
[2, 3, 2, 2]                                                     # Rank 0
[2, 2, 1, 2]                                                     # Rank 1
[1, 2, 1, 2]                                                     # Rank 2
[1, 2, 1, 1]                                                     # Rank 3
>>> output = ...
>>> dist.all_to_all_single(output, input, output_splits, input_splits)
>>> output
tensor([ 0,  1, 10, 11, 12, 20, 21, 30, 31])                     # Rank 0
tensor([ 2,  3, 13, 14, 22, 32, 33])                             # Rank 1
tensor([ 4, 15, 16, 23, 34, 35])                                 # Rank 2
tensor([ 5, 17, 18, 24, 36])                                     # Rank 3
>>> # Another example with tensors of torch.cfloat type.
>>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j)
>>> input
tensor([1+1j, 2+2j, 3+3j, 4+4j])                                # Rank 0
tensor([5+5j, 6+6j, 7+7j, 8+8j])                                # Rank 1
tensor([9+9j, 10+10j, 11+11j, 12+12j])                          # Rank 2
tensor([13+13j, 14+14j, 15+15j, 16+16j])                        # Rank 3
>>> output = torch.empty([4], dtype=torch.int64)
>>> dist.all_to_all_single(output, input)
>>> output
tensor([1+1j, 5+5j, 9+9j, 13+13j])                              # Rank 0
tensor([2+2j, 6+6j, 10+10j, 14+14j])                            # Rank 1
tensor([3+3j, 7+7j, 11+11j, 15+15j])                            # Rank 2
tensor([4+4j, 8+8j, 12+12j, 16+16j])                            # Rank 3
torch.distributed.all_to_all(output_tensor_list, input_tensor_list, group=None, async_op=False)[source][source]

将输入张量列表分发到组中的所有进程,并在输出列表中返回收集的张量列表。

支持复数张量。

参数
  • output_tensor_list (list[Tensor]) – 要收集的张量列表,每个 rank 一个。

  • input_tensor_list (list[Tensor]) – 要分发的张量列表,每个 rank 一个。

  • group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,则将使用默认进程组。

  • async_op (bool, optional) – 此操作是否应为异步操作。

返回

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不是组的一部分,则为 None。

警告

all_to_all 是实验性的,可能会发生变化。

示例

>>> input = torch.arange(4) + rank * 4
>>> input = list(input.chunk(4))
>>> input
[tensor([0]), tensor([1]), tensor([2]), tensor([3])]     # Rank 0
[tensor([4]), tensor([5]), tensor([6]), tensor([7])]     # Rank 1
[tensor([8]), tensor([9]), tensor([10]), tensor([11])]   # Rank 2
[tensor([12]), tensor([13]), tensor([14]), tensor([15])] # Rank 3
>>> output = list(torch.empty([4], dtype=torch.int64).chunk(4))
>>> dist.all_to_all(output, input)
>>> output
[tensor([0]), tensor([4]), tensor([8]), tensor([12])]    # Rank 0
[tensor([1]), tensor([5]), tensor([9]), tensor([13])]    # Rank 1
[tensor([2]), tensor([6]), tensor([10]), tensor([14])]   # Rank 2
[tensor([3]), tensor([7]), tensor([11]), tensor([15])]   # Rank 3
>>> # Essentially, it is similar to following operation:
>>> scatter_list = input
>>> gather_list  = output
>>> for i in range(world_size):
>>>     dist.scatter(gather_list[i], scatter_list if i == rank else [], src=i)
>>> input
tensor([0, 1, 2, 3, 4, 5])                                       # Rank 0
tensor([10, 11, 12, 13, 14, 15, 16, 17, 18])                     # Rank 1
tensor([20, 21, 22, 23, 24])                                     # Rank 2
tensor([30, 31, 32, 33, 34, 35, 36])                             # Rank 3
>>> input_splits
[2, 2, 1, 1]                                                     # Rank 0
[3, 2, 2, 2]                                                     # Rank 1
[2, 1, 1, 1]                                                     # Rank 2
[2, 2, 2, 1]                                                     # Rank 3
>>> output_splits
[2, 3, 2, 2]                                                     # Rank 0
[2, 2, 1, 2]                                                     # Rank 1
[1, 2, 1, 2]                                                     # Rank 2
[1, 2, 1, 1]                                                     # Rank 3
>>> input = list(input.split(input_splits))
>>> input
[tensor([0, 1]), tensor([2, 3]), tensor([4]), tensor([5])]                   # Rank 0
[tensor([10, 11, 12]), tensor([13, 14]), tensor([15, 16]), tensor([17, 18])] # Rank 1
[tensor([20, 21]), tensor([22]), tensor([23]), tensor([24])]                 # Rank 2
[tensor([30, 31]), tensor([32, 33]), tensor([34, 35]), tensor([36])]         # Rank 3
>>> output = ...
>>> dist.all_to_all(output, input)
>>> output
[tensor([0, 1]), tensor([10, 11, 12]), tensor([20, 21]), tensor([30, 31])]   # Rank 0
[tensor([2, 3]), tensor([13, 14]), tensor([22]), tensor([32, 33])]           # Rank 1
[tensor([4]), tensor([15, 16]), tensor([23]), tensor([34, 35])]              # Rank 2
[tensor([5]), tensor([17, 18]), tensor([24]), tensor([36])]                  # Rank 3
>>> # Another example with tensors of torch.cfloat type.
>>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j)
>>> input = list(input.chunk(4))
>>> input
[tensor([1+1j]), tensor([2+2j]), tensor([3+3j]), tensor([4+4j])]            # Rank 0
[tensor([5+5j]), tensor([6+6j]), tensor([7+7j]), tensor([8+8j])]            # Rank 1
[tensor([9+9j]), tensor([10+10j]), tensor([11+11j]), tensor([12+12j])]      # Rank 2
[tensor([13+13j]), tensor([14+14j]), tensor([15+15j]), tensor([16+16j])]    # Rank 3
>>> output = list(torch.empty([4], dtype=torch.int64).chunk(4))
>>> dist.all_to_all(output, input)
>>> output
[tensor([1+1j]), tensor([5+5j]), tensor([9+9j]), tensor([13+13j])]          # Rank 0
[tensor([2+2j]), tensor([6+6j]), tensor([10+10j]), tensor([14+14j])]        # Rank 1
[tensor([3+3j]), tensor([7+7j]), tensor([11+11j]), tensor([15+15j])]        # Rank 2
[tensor([4+4j]), tensor([8+8j]), tensor([12+12j]), tensor([16+16j])]        # Rank 3
torch.distributed.barrier(group=None, async_op=False, device_ids=None)[source][source]

同步所有进程。

此集体通信会阻塞进程,直到整个组进入此函数,如果 async_op 为 False,或者如果异步工作句柄在 wait() 上被调用。

参数
  • group (ProcessGroup, 可选) – 要操作的进程组。如果为 None,则将使用默认进程组。

  • async_op (bool, 可选) – 此操作是否应为异步操作

  • device_ids ([int], optional) – 设备/GPU ID 列表。

返回

如果 async_op 设置为 True,则为异步工作句柄。如果不是 async_op 或不在组中,则为 None

注意

ProcessGroupNCCL 现在会阻塞 CPU 线程,直到屏障集体通信完成。

torch.distributed.monitored_barrier(group=None, timeout=None, wait_all_ranks=False)[source][source]

同步进程,类似于 torch.distributed.barrier,但考虑了可配置的超时。

它可以报告在提供的超时时间内未通过此屏障的 rank。具体来说,对于非零 rank,将阻塞直到处理来自 rank 0 的发送/接收。Rank 0 将阻塞,直到处理来自其他 rank 的所有发送/接收,并将报告未及时响应的 rank 的失败。请注意,如果一个 rank 未到达 monitored_barrier(例如,由于挂起),则所有其他 rank 将在 monitored_barrier 中失败。

此集体通信将阻塞组中的所有进程/rank,直到整个组成功退出该函数,使其可用于调试和同步。但是,它可能会对性能产生影响,应仅用于调试或需要在主机端完全同步点的场景。出于调试目的,可以在应用程序的集体通信调用之前插入此屏障,以检查是否有任何 rank 不同步。

注意

请注意,此集体通信仅在 GLOO 后端中受支持。

参数
  • group (ProcessGroup, optional) – 要在其上工作的进程组。如果 None,则将使用默认进程组。

  • timeout (datetime.timedelta, optional) – monitored_barrier 的超时。如果 None,则将使用默认进程组超时。

  • wait_all_ranks (bool, optional) – 是否收集所有失败的 rank。默认情况下,这是 False,rank 0 上的 monitored_barrier 将在遇到的第一个失败 rank 时抛出异常,以便快速失败。通过设置 wait_all_ranks=Truemonitored_barrier 将收集所有失败的 rank,并抛出一个错误,其中包含有关所有失败 rank 的信息。

返回

None.

示例:
>>> # Note: Process group initialization omitted on each rank.
>>> import torch.distributed as dist
>>> if dist.get_rank() != 1:
>>>     dist.monitored_barrier() # Raises exception indicating that
>>> # rank 1 did not call into monitored_barrier.
>>> # Example with wait_all_ranks=True
>>> if dist.get_rank() == 0:
>>>     dist.monitored_barrier(wait_all_ranks=True) # Raises exception
>>> # indicating that ranks 1, 2, ... world_size - 1 did not call into
>>> # monitored_barrier.
class torch.distributed.Work

Work 对象表示 PyTorch 分布式包中待处理的异步操作的句柄。它由非阻塞集体通信操作返回,例如 dist.all_reduce(tensor, async_op=True)

boxed(self: torch._C._distributed_c10d.Work) object
exception(self: torch._C._distributed_c10d.Work) std::__exception_ptr::exception_ptr
get_future(self: torch._C._distributed_c10d.Work) torch.Future
返回

Work 完成相关联的 torch.futures.Future 对象。 例如,可以通过 fut = process_group.allreduce(tensors).get_future() 检索 future 对象。

示例:

以下是一个简单的 allreduce DDP 通信钩子的示例,该示例使用 get_future` API 来检索与 ``allreduce 完成相关联的 Future

>>> def allreduce(process_group: dist.ProcessGroup, bucket: dist.GradBucket): -> torch.futures.Future
>>>     group_to_use = process_group if process_group is not None else torch.distributed.group.WORLD
>>>     tensor = bucket.buffer().div_(group_to_use.size())
>>>     return torch.distributed.all_reduce(tensor, group=group_to_use, async_op=True).get_future()
>>> ddp_model.register_comm_hook(state=None, hook=allreduce)

警告

get_future API 支持 NCCL,以及部分 GLOO 和 MPI 后端(不支持点对点操作,如 send/recv),并将返回 torch.futures.Future

在上面的示例中,allreduce 工作将在 GPU 上使用 NCCL 后端完成,fut.wait() 将在同步适当的 NCCL 流与 PyTorch 的当前设备流后返回,以确保我们可以进行异步 CUDA 执行,并且它不会等待整个操作在 GPU 上完成。 请注意,CUDAFuture 不支持 TORCH_NCCL_BLOCKING_WAIT 标志或 NCCL 的 barrier()。 此外,如果通过 fut.then() 添加了回调函数,它将等待 WorkNCCL 的 NCCL 流与 ProcessGroupNCCL 的专用回调流同步,并在回调流上运行回调后内联调用回调。 fut.then() 将返回另一个 CUDAFuture,其中包含回调的返回值和一个记录回调流的 CUDAEvent

  1. 对于 CPU 工作,当工作完成且 value() 张量准备就绪时,fut.done() 返回 true。

  2. 对于 GPU 工作,fut.done() 仅在操作已入队时返回 true。

  3. 对于混合 CPU-GPU 工作(例如,使用 GLOO 发送 GPU 张量),当张量已到达各自的节点时,fut.done() 返回 true,但不必在各自的 GPU 上同步(类似于 GPU 工作)。

get_future_result(self: torch._C._distributed_c10d.Work) torch.Future
返回

int 类型的 torch.futures.Future 对象,它映射到 WorkResult 的枚举类型。 例如,可以通过 fut = process_group.allreduce(tensor).get_future_result() 检索 future 对象。

示例:

用户可以使用 fut.wait() 阻塞等待工作完成,并通过 fut.value() 获取 WorkResult。 此外,用户可以使用 fut.then(call_back_func) 注册一个回调函数,以便在工作完成时调用,而不会阻塞当前线程。

警告

get_future_result API 支持 NCCL

is_completed(self: torch._C._distributed_c10d.Work) bool
is_success(self: torch._C._distributed_c10d.Work) bool
result(self: torch._C._distributed_c10d.Work) list[torch.Tensor]
source_rank(self: torch._C._distributed_c10d.Work) int
synchronize(self: torch._C._distributed_c10d.Work) None
static unbox(arg0: object) torch._C._distributed_c10d.Work
wait(self: torch._C._distributed_c10d.Work, timeout: datetime.timedelta = datetime.timedelta(0)) bool
返回

true/false.

示例:
try

work.wait(timeout)

except

# some handling

警告

在正常情况下,用户无需设置超时。 调用 wait() 与调用 synchronize() 相同:让当前流阻塞等待 NCCL 工作完成。 但是,如果设置了超时,它将阻塞 CPU 线程,直到 NCCL 工作完成或超时。 如果超时,将抛出异常。

class torch.distributed.ReduceOp

用于可用规约操作的类似枚举的类:SUMPRODUCTMINMAXBANDBORBXORPREMUL_SUM

使用 NCCL 后端时,BANDBORBXOR 规约不可用。

AVG 在跨 ranks 求和之前将值除以 world size。AVG 仅适用于 NCCL 后端,且仅适用于 NCCL 2.10 或更高版本。

PREMUL_SUM 在本地将输入乘以给定的标量,然后再进行规约。PREMUL_SUM 仅适用于 NCCL 后端,且仅适用于 NCCL 2.11 或更高版本。 用户应使用 torch.distributed._make_nccl_premul_sum

此外,复杂张量不支持 MAXMINPRODUCT

可以通过属性访问此类的value,例如,ReduceOp.SUM。 它们用于指定规约集合的策略,例如 reduce()

此类不支持 __members__ 属性。

class torch.distributed.reduce_op

已弃用的用于规约操作的类似枚举的类:SUMPRODUCTMINMAX

建议改用 ReduceOp

分布式键值存储

分布式包附带一个分布式键值存储,可用于在组中的进程之间共享信息,以及在 torch.distributed.init_process_group() 中初始化分布式包(通过显式创建存储作为指定 init_method 的替代方法。) 键值存储有 3 种选择:TCPStoreFileStoreHashStore

class torch.distributed.Store

所有存储实现的基类,例如 PyTorch 分布式提供的 3 种存储实现:(TCPStoreFileStoreHashStore)。

__init__(self: torch._C._distributed_c10d.Store) None
add(self: torch._C._distributed_c10d.Store, arg0: str, arg1: int) int

首次调用给定 key 的 add 会在存储中创建一个与 key 关联的计数器,初始化为 amount。 后续使用相同 key 调用 add 会将计数器递增指定的 amount。 使用已通过 set() 在存储中设置的键调用 add() 将导致异常。

参数
  • key (str) – 存储中要递增其计数器的键。

  • amount (int) – 计数器将递增的数量。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.add("first_key", 1)
>>> store.add("first_key", 6)
>>> # Should return 7
>>> store.get("first_key")
append(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) None

根据提供的 keyvalue 将键值对追加到存储中。 如果存储中不存在 key,则将创建它。

参数
  • key (str) – 要追加到存储的键。

  • value (str) – 与要添加到存储的 key 关联的值。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.append("first_key", "po")
>>> store.append("first_key", "tato")
>>> # Should return "potato"
>>> store.get("first_key")
check(self: torch._C._distributed_c10d.Store, arg0: list[str]) bool

调用以检查给定的 keys 列表是否在存储中存储了值。 在正常情况下,此调用会立即返回,但仍然会遇到一些边缘死锁情况,例如,在 TCPStore 被销毁后调用 check。 使用要检查是否存储在存储中的键列表调用 check()

参数

keys (lisr[str]) – 查询是否存储在存储中的键。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.add("first_key", 1)
>>> # Should return 7
>>> store.check(["first_key"])
compare_set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str, arg2: str) bytes

根据提供的 key 将键值对插入到存储中,并在插入之前执行 expected_valuedesired_value 之间的比较。 只有当 keyexpected_value 已经存在于存储中,或者 expected_value 是空字符串时,才会设置 desired_value

参数
  • key (str) – 要在存储中检查的键。

  • expected_value (str) – 与在插入前要检查的 key 关联的值。

  • desired_value (str) – 与要添加到存储的 key 关联的值。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("key", "first_value")
>>> store.compare_set("key", "first_value", "second_value")
>>> # Should return "second_value"
>>> store.get("key")
delete_key(self: torch._C._distributed_c10d.Store, arg0: str) bool

从存储中删除与 key 关联的键值对。 如果成功删除键,则返回 true,否则返回 false

警告

delete_key API 仅受 TCPStoreHashStore 支持。 将此 API 与 FileStore 一起使用将导致异常。

参数

key (str) – 要从存储中删除的键

返回

如果 key 已删除,则为 True,否则为 False

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, HashStore can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key")
>>> # This should return true
>>> store.delete_key("first_key")
>>> # This should return false
>>> store.delete_key("bad_key")
get(self: torch._C._distributed_c10d.Store, arg0: str) bytes

检索存储中与给定 key 关联的值。 如果存储中不存在 key,则该函数将等待 timeout(在初始化存储时定义),然后抛出异常。

参数

key (str) – 该函数将返回与此键关联的值。

返回

如果 key 在存储中,则为与 key 关联的值。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key", "first_value")
>>> # Should return "first_value"
>>> store.get("first_key")
has_extended_api(self: torch._C._distributed_c10d.Store) bool

如果存储支持扩展操作,则返回 true。

multi_get(self: torch._C._distributed_c10d.Store, arg0: list[str]) list[bytes]

检索 keys 中的所有值。 如果 keys 中的任何键在存储中不存在,则该函数将等待 timeout

参数

keys (List[str]) – 要从存储中检索的键。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key", "po")
>>> store.set("second_key", "tato")
>>> # Should return [b"po", b"tato"]
>>> store.multi_get(["first_key", "second_key"])
multi_set(self: torch._C._distributed_c10d.Store, arg0: list[str], arg1: list[str]) None

根据提供的 keysvalues 将键值对列表插入到存储中

参数
  • keys (List[str]) – 要插入的键。

  • values (List[str]) – 要插入的值。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.multi_set(["first_key", "second_key"], ["po", "tato"])
>>> # Should return b"po"
>>> store.get("first_key")
num_keys(self: torch._C._distributed_c10d.Store) int

返回存储中设置的键的数量。 请注意,此数字通常比 set()add() 添加的键的数量大 1,因为一个键用于协调使用存储的所有 worker。

警告

TCPStore 一起使用时,num_keys 返回写入底层文件的键的数量。 如果存储被销毁,并且使用同一文件创建了另一个存储,则原始键将被保留。

返回

存储中存在的键的数量。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key", "first_value")
>>> # This should return 2
>>> store.num_keys()
set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) None

根据提供的 keyvalue 将键值对插入到存储中。 如果 key 已经存在于存储中,它将使用新提供的 value 覆盖旧值。

参数
  • key (str) – 要添加到存储的键。

  • value (str) – 与要添加到存储的 key 关联的值。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set("first_key", "first_value")
>>> # Should return "first_value"
>>> store.get("first_key")
set_timeout(self: torch._C._distributed_c10d.Store, arg0: datetime.timedelta) None

设置存储的默认超时。 此超时在初始化期间以及 wait()get() 中使用。

参数

timeout (timedelta) – 要在存储中设置的超时。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> store.set_timeout(timedelta(seconds=10))
>>> # This will throw an exception after 10 seconds
>>> store.wait(["bad_key"])
property timeout

获取存储的超时。

wait(*args, **kwargs)

重载函数。

  1. wait(self: torch._C._distributed_c10d.Store, arg0: list[str]) -> None

等待将 keys 中的每个键添加到存储中。如果并非所有键都在 timeout(在存储初始化期间设置)之前设置,则 wait 将抛出异常。

参数

keys (list) – 等待在存储中设置的键列表。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> # This will throw an exception after 30 seconds
>>> store.wait(["bad_key"])
  1. wait(self: torch._C._distributed_c10d.Store, arg0: list[str], arg1: datetime.timedelta) -> None

等待将 keys 中的每个键添加到存储中,如果键在提供的 timeout 之前未设置,则抛出异常。

参数
  • keys (list) – 等待在存储中设置的键列表。

  • timeout (timedelta) – 等待键被添加的时间,如果在超时时间内未添加则抛出异常。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Using TCPStore as an example, other store types can also be used
>>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30))
>>> # This will throw an exception after 10 seconds
>>> store.wait(["bad_key"], timedelta(seconds=10))
class torch.distributed.TCPStore

一个基于 TCP 的分布式键值存储实现。服务器存储保存数据,而客户端存储可以通过 TCP 连接到服务器存储并执行诸如 set() (插入键值对), get() (检索键值对) 等操作。应该始终初始化一个服务器存储,因为客户端存储将等待服务器建立连接。

参数
  • host_name (str) – 服务器存储应在其上运行的主机名或 IP 地址。

  • port (int) – 服务器存储应在其上监听传入请求的端口。

  • world_size (int, optional) – 存储用户的总数(客户端数量 + 服务器的 1)。默认为 None(None 表示存储用户的数量不固定)。

  • is_master (bool, optional) – 初始化服务器存储时为 True,客户端存储时为 False。默认为 False。

  • timeout (timedelta, optional) – 存储在初始化期间以及用于诸如 get()wait() 等方法的超时时间。默认为 timedelta(seconds=300)

  • wait_for_workers (bool, optional) – 是否等待所有 worker 连接到服务器存储。仅当 world_size 是固定值时才适用。默认为 True。

  • multi_tenant (bool, optional) – 如果为 True,则当前进程中具有相同主机/端口的所有 TCPStore 实例将使用相同的底层 TCPServer。默认为 False。

  • master_listen_fd (int, optional) – 如果指定,则底层 TCPServer 将监听此文件描述符,该描述符必须是已绑定到 port 的套接字。在某些情况下,这对于避免端口分配竞争很有用。默认为 None(意味着服务器创建一个新的套接字并尝试将其绑定到 port)。

  • use_libuv (bool, optional) – 如果为 True,则对 TCPServer 后端使用 libuv。默认为 True。

示例:
>>> import torch.distributed as dist
>>> from datetime import timedelta
>>> # Run on process 1 (server)
>>> server_store = dist.TCPStore("127.0.0.1", 1234, 2, True, timedelta(seconds=30))
>>> # Run on process 2 (client)
>>> client_store = dist.TCPStore("127.0.0.1", 1234, 2, False)
>>> # Use any of the store methods from either the client or server after initialization
>>> server_store.set("first_key", "first_value")
>>> client_store.get("first_key")
__init__(self: torch._C._distributed_c10d.TCPStore, host_name: str, port: int, world_size: Optional[int] = None, is_master: bool = False, timeout: datetime.timedelta = datetime.timedelta(seconds=300), wait_for_workers: bool = True, multi_tenant: bool = False, master_listen_fd: Optional[int] = None, use_libuv: bool = True) None

创建一个新的 TCPStore。

property host

获取存储监听请求的主机名。

property libuvBackend

如果正在使用 libuv 后端,则返回 True。

property port

获取存储监听请求的端口号。

class torch.distributed.HashStore

一个基于底层哈希映射的线程安全存储实现。此存储可以在同一进程内使用(例如,由其他线程),但不能跨进程使用。

示例:
>>> import torch.distributed as dist
>>> store = dist.HashStore()
>>> # store can be used from other threads
>>> # Use any of the store methods after initialization
>>> store.set("first_key", "first_value")
__init__(self: torch._C._distributed_c10d.HashStore) None

创建一个新的 HashStore。

class torch.distributed.FileStore

一个存储实现,它使用文件来存储底层的键值对。

参数
  • file_name (str) – 用于存储键值对的文件的路径

  • world_size (int, optional) – 使用存储的进程总数。默认为 -1(负值表示存储用户的数量不固定)。

示例:
>>> import torch.distributed as dist
>>> store1 = dist.FileStore("/tmp/filestore", 2)
>>> store2 = dist.FileStore("/tmp/filestore", 2)
>>> # Use any of the store methods from either the client or server after initialization
>>> store1.set("first_key", "first_value")
>>> store2.get("first_key")
__init__(self: torch._C._distributed_c10d.FileStore, file_name: str, world_size: int = -1) None

创建一个新的 FileStore。

property path

获取 FileStore 用于存储键值对的文件的路径。

class torch.distributed.PrefixStore

任何 3 种键值存储(TCPStoreFileStoreHashStore)的包装器,它为插入到存储中的每个键添加前缀。

参数
  • prefix (str) – 前缀字符串,在插入到存储之前,它会被添加到每个键的前面。

  • store (torch.distributed.store) – 构成底层键值存储的存储对象。

__init__(self: torch._C._distributed_c10d.PrefixStore, prefix: str, store: torch._C._distributed_c10d.Store) None

创建一个新的 PrefixStore。

property underlying_store

获取 PrefixStore 包装的底层存储对象。

集体通信性能分析

请注意,您可以使用 torch.profiler (推荐,仅在 1.8.1 之后可用)或 torch.autograd.profiler 来分析此处提到的集体通信和点对点通信 API 的性能。所有开箱即用的后端(glooncclmpi)都受支持,集体通信使用情况将按预期在性能分析输出/跟踪中呈现。分析您的代码与分析任何常规的 torch 算子相同

import torch
import torch.distributed as dist
with torch.profiler():
    tensor = torch.randn(20, 10)
    dist.all_reduce(tensor)

有关性能分析器功能的完整概述,请参阅 性能分析器文档

多 GPU 集体函数

警告

多 GPU 函数(代表每个 CPU 线程多个 GPU)已被弃用。截至今日,PyTorch Distributed 的首选编程模型是每个线程一个设备,本文档中的 API 就是例证。如果您是后端开发人员,并且想要支持每个线程多个设备,请联系 PyTorch Distributed 的维护人员。

第三方后端

除了内置的 GLOO/MPI/NCCL 后端之外,PyTorch distributed 还通过运行时注册机制支持第三方后端。有关如何通过 C++ 扩展开发第三方后端的参考,请参阅 教程 - 自定义 C++ 和 CUDA 扩展 以及 test/cpp_extensions/cpp_c10d_extension.cpp。第三方后端的性能由其自身的实现决定。

新的后端派生自 c10d::ProcessGroup,并在导入时通过 torch.distributed.Backend.register_backend() 注册后端名称和实例化接口。

当手动导入此后端并使用相应的后端名称调用 torch.distributed.init_process_group() 时,torch.distributed 包将在新的后端上运行。

警告

对第三方后端的支持是实验性的,可能会发生变化。

启动实用程序

torch.distributed 包还在 torch.distributed.launch 中提供了一个启动实用程序。此辅助实用程序可用于为分布式训练启动每个节点的多个进程。

模块 torch.distributed.launch

torch.distributed.launch 是一个模块,它在每个训练节点上生成多个分布式训练进程。

警告

此模块将被弃用,转而使用 torchrun

该实用程序可用于单节点分布式训练,其中将生成每个节点一个或多个进程。该实用程序可用于 CPU 训练或 GPU 训练。如果该实用程序用于 GPU 训练,则每个分布式进程将在单个 GPU 上运行。这可以实现显著改进的单节点训练性能。它也可以用于多节点分布式训练,通过在每个节点上生成多个进程,同样可以显著改进多节点分布式训练性能。这对于具有直接 GPU 支持的多个 Infiniband 接口的系统尤其有利,因为它们都可以用于聚合通信带宽。

在单节点分布式训练或多节点分布式训练这两种情况下,此实用程序都将启动给定数量的进程/节点 (--nproc-per-node)。如果用于 GPU 训练,则此数字需要小于或等于当前系统上的 GPU 数量 (nproc_per_node),并且每个进程将在单个 GPU 上运行,GPU 范围为 *GPU 0 到 GPU (nproc_per_node - 1)*。

如何使用此模块

  1. 单节点多进程分布式训练

python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
           YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other
           arguments of your training script)
  1. 多节点多进程分布式训练:(例如,两个节点)

节点 1:(IP:192.168.1.1,并且具有空闲端口:1234)

python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node-rank=0 --master-addr="192.168.1.1"
           --master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)

节点 2

python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
           --nnodes=2 --node-rank=1 --master-addr="192.168.1.1"
           --master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
           and all other arguments of your training script)
  1. 要查找此模块提供的可选参数

python -m torch.distributed.launch --help

重要提示

1. 此实用程序和多进程分布式(单节点或多节点)GPU 训练目前仅在使用 NCCL 分布式后端时才能实现最佳性能。因此,NCCL 后端是推荐用于 GPU 训练的后端。

2. 在您的训练程序中,您必须解析命令行参数:--local-rank=LOCAL_PROCESS_RANK,它将由此模块提供。如果您的训练程序使用 GPU,则应确保您的代码仅在 LOCAL_PROCESS_RANK 的 GPU 设备上运行。这可以通过以下方式完成

解析 local_rank 参数

>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument("--local-rank", "--local_rank", type=int)
>>> args = parser.parse_args()

使用以下任一方法将您的设备设置为 local rank

>>> torch.cuda.set_device(args.local_rank)  # before your code runs

>>> with torch.cuda.device(args.local_rank):
>>>    # your code to run
>>>    ...

在版本 2.0.0 中更改: 启动器会将 --local-rank=<rank> 参数传递给您的脚本。从 PyTorch 2.0.0 开始,带破折号的 --local-rank 优先于以前使用的带下划线的 --local_rank

为了向后兼容,用户可能需要在其参数解析代码中处理这两种情况。这意味着在参数解析器中同时包含 "--local-rank""--local_rank"。如果仅提供 "--local_rank",启动器将触发错误:“error: unrecognized arguments: –local-rank=<rank>”。对于仅支持 PyTorch 2.0.0+ 的训练代码,包含 "--local-rank" 应该就足够了。

3. 在您的训练程序中,您应该在开始时调用以下函数来启动分布式后端。强烈建议使用 init_method=env://。其他 init 方法(例如 tcp://)可能有效,但 env:// 是此模块官方支持的方法。

>>> torch.distributed.init_process_group(backend='YOUR BACKEND',
>>>                                      init_method='env://')

4. 在您的训练程序中,您可以选择使用常规的分布式函数或使用 torch.nn.parallel.DistributedDataParallel() 模块。如果您的训练程序使用 GPU 进行训练,并且您想使用 torch.nn.parallel.DistributedDataParallel() 模块,以下是如何配置它。

>>> model = torch.nn.parallel.DistributedDataParallel(model,
>>>                                                   device_ids=[args.local_rank],
>>>                                                   output_device=args.local_rank)

请确保将 device_ids 参数设置为您的代码将要运行的唯一 GPU 设备 ID。这通常是进程的本地 rank。换句话说,device_ids 需要是 [args.local_rank],并且 output_device 需要是 args.local_rank 才能使用此实用程序

5. 将 local_rank 传递给子进程的另一种方法是通过环境变量 LOCAL_RANK。当您使用 --use-env=True 启动脚本时,将启用此行为。您必须调整上面的子进程示例,以将 args.local_rank 替换为 os.environ['LOCAL_RANK'];当您指定此标志时,启动器将不会传递 --local-rank

警告

local_rank 不是全局唯一的:它仅在每台机器上的每个进程中是唯一的。因此,不要使用它来决定您是否应该写入网络文件系统等。有关如果您不正确地执行此操作可能导致问题的示例,请参阅 https://github.com/pytorch/pytorch/issues/12042

Spawn 实用程序

多进程包 - torch.multiprocessing 包还在 torch.multiprocessing.spawn() 中提供了一个 spawn 函数。此辅助函数可用于生成多个进程。它的工作原理是传入您要运行的函数,并生成 N 个进程来运行它。这也可以用于多进程分布式训练。

有关如何使用它的参考,请参阅 PyTorch 示例 - ImageNet 实现

请注意,此函数需要 Python 3.4 或更高版本。

调试 torch.distributed 应用程序

由于难以理解的挂起、崩溃或跨 rank 的不一致行为,调试分布式应用程序可能具有挑战性。torch.distributed 提供了一套工具,以自助方式帮助调试训练应用程序

Python 断点

在分布式环境中使用 python 的调试器非常方便,但由于它不是开箱即用的,因此许多人根本不使用它。PyTorch 提供了围绕 pdb 的自定义包装器,可以简化此过程。

torch.distributed.breakpoint 使此过程变得容易。在内部,它以两种方式自定义了 pdb 的断点行为,但在其他方面表现得像普通的 pdb。 1. 仅在一个 rank(由用户指定)上附加调试器。 2. 通过使用 torch.distributed.barrier() 确保所有其他 rank 停止,一旦调试的 rank 发出 continue,它将释放。 3. 重新路由来自子进程的 stdin,使其连接到您的终端。

要使用它,只需在所有 rank 上发出 torch.distributed.breakpoint(rank),在每种情况下都使用相同的 rank 值。

监控的 Barrier

从 v1.10 开始,torch.distributed.monitored_barrier() 作为 torch.distributed.barrier() 的替代方案存在,当崩溃时会提供有关哪个 rank 可能存在故障的有用信息,即并非所有 rank 都在提供的超时时间内调用 torch.distributed.monitored_barrier()torch.distributed.monitored_barrier() 使用 send/recv 通信原语在类似于确认的过程中实现主机端 barrier,从而允许 rank 0 报告哪个或哪些 rank 未能及时确认 barrier。例如,考虑以下函数,其中 rank 1 未能调用 torch.distributed.monitored_barrier()(在实践中,这可能是由于应用程序错误或先前集体通信中的挂起)

import os
from datetime import timedelta

import torch
import torch.distributed as dist
import torch.multiprocessing as mp


def worker(rank):
    dist.init_process_group("nccl", rank=rank, world_size=2)
    # monitored barrier requires gloo process group to perform host-side sync.
    group_gloo = dist.new_group(backend="gloo")
    if rank not in [1]:
        dist.monitored_barrier(group=group_gloo, timeout=timedelta(seconds=2))


if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29501"
    mp.spawn(worker, nprocs=2, args=())

以下错误消息在 rank 0 上生成,允许用户确定哪个或哪些 rank 可能存在故障并进一步调查

RuntimeError: Rank 1 failed to pass monitoredBarrier in 2000 ms
 Original exception:
[gloo/transport/tcp/pair.cc:598] Connection closed by peer [2401:db00:eef0:1100:3560:0:1c05:25d]:8594

TORCH_DISTRIBUTED_DEBUG

使用 TORCH_CPP_LOG_LEVEL=INFO 环境变量时,可以使用 TORCH_DISTRIBUTED_DEBUG 来触发额外的有用日志和集体同步检查,以确保所有 rank 都已适当同步。TORCH_DISTRIBUTED_DEBUG 可以设置为 OFF (默认)、INFODETAIL,具体取决于所需的调试级别。请注意,最详细的选项 DETAIL 可能会影响应用程序性能,因此仅应在调试问题时使用。

设置 TORCH_DISTRIBUTED_DEBUG=INFO 将在使用 torch.nn.parallel.DistributedDataParallel() 训练的模型初始化时,生成额外的调试日志;而 TORCH_DISTRIBUTED_DEBUG=DETAIL 还将记录运行时性能统计信息,并在选定的迭代次数中进行记录。这些运行时统计信息包括前向时间、反向时间、梯度通信时间等数据。例如,给定以下应用程序

import os

import torch
import torch.distributed as dist
import torch.multiprocessing as mp


class TwoLinLayerNet(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.a = torch.nn.Linear(10, 10, bias=False)
        self.b = torch.nn.Linear(10, 1, bias=False)

    def forward(self, x):
        a = self.a(x)
        b = self.b(x)
        return (a, b)


def worker(rank):
    dist.init_process_group("nccl", rank=rank, world_size=2)
    torch.cuda.set_device(rank)
    print("init model")
    model = TwoLinLayerNet().cuda()
    print("init ddp")
    ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])

    inp = torch.randn(10, 10).cuda()
    print("train")

    for _ in range(20):
        output = ddp_model(inp)
        loss = output[0] + output[1]
        loss.sum().backward()


if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29501"
    os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
    os.environ[
        "TORCH_DISTRIBUTED_DEBUG"
    ] = "DETAIL"  # set to DETAIL for runtime logging.
    mp.spawn(worker, nprocs=2, args=())

以下日志在初始化时呈现

I0607 16:10:35.739390 515217 logger.cpp:173] [Rank 0]: DDP Initialized with:
broadcast_buffers: 1
bucket_cap_bytes: 26214400
find_unused_parameters: 0
gradient_as_bucket_view: 0
is_multi_device_module: 0
iteration: 0
num_parameter_tensors: 2
output_device: 0
rank: 0
total_parameter_size_bytes: 440
world_size: 2
backend_name: nccl
bucket_sizes: 440
cuda_visible_devices: N/A
device_ids: 0
dtypes: float
master_addr: localhost
master_port: 29501
module_name: TwoLinLayerNet
nccl_async_error_handling: N/A
nccl_blocking_wait: N/A
nccl_debug: WARN
nccl_ib_timeout: N/A
nccl_nthreads: N/A
nccl_socket_ifname: N/A
torch_distributed_debug: INFO

以下日志在运行时呈现(当 TORCH_DISTRIBUTED_DEBUG=DETAIL 设置时)

I0607 16:18:58.085681 544067 logger.cpp:344] [Rank 1 / 2] Training TwoLinLayerNet unused_parameter_size=0
 Avg forward compute time: 40838608
 Avg backward compute time: 5983335
Avg backward comm. time: 4326421
 Avg backward comm/comp overlap time: 4207652
I0607 16:18:58.085693 544066 logger.cpp:344] [Rank 0 / 2] Training TwoLinLayerNet unused_parameter_size=0
 Avg forward compute time: 42850427
 Avg backward compute time: 3885553
Avg backward comm. time: 2357981
 Avg backward comm/comp overlap time: 2234674

此外,由于模型中未使用的参数,TORCH_DISTRIBUTED_DEBUG=INFO 增强了 torch.nn.parallel.DistributedDataParallel() 中的崩溃日志记录。目前,如果前向传播中可能存在未使用的参数,则必须将 find_unused_parameters=True 传递到 torch.nn.parallel.DistributedDataParallel() 初始化中,并且从 v1.10 版本开始,所有模型输出都必须在损失计算中使用,因为 torch.nn.parallel.DistributedDataParallel() 不支持反向传播中未使用的参数。这些约束具有挑战性,尤其是对于较大的模型,因此当因错误而崩溃时,torch.nn.parallel.DistributedDataParallel() 将记录所有未使用的参数的完整限定名称。例如,在上述应用程序中,如果我们修改 loss 以改为计算为 loss = output[1],则 TwoLinLayerNet.a 在反向传播中未接收到梯度,因此导致 DDP 失败。在崩溃时,用户会收到有关未使用参数的信息,这对于大型模型来说可能难以手动查找

RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by passing
 the keyword argument `find_unused_parameters=True` to `torch.nn.parallel.DistributedDataParallel`, and by
making sure all `forward` function outputs participate in calculating loss.
If you already have done the above, then the distributed data parallel module wasn't able to locate the output tensors in the return value of your module's `forward` function. Please include the loss function and the structure of the return va
lue of `forward` of your module when reporting this issue (e.g. list, dict, iterable).
Parameters which did not receive grad for rank 0: a.weight
Parameter indices which did not receive grad for rank 0: 0

设置 TORCH_DISTRIBUTED_DEBUG=DETAIL 将在用户直接或间接发出的每个 collective 调用(例如 DDP allreduce)上触发额外的 一致性和同步检查。这是通过创建一个包装器进程组来完成的,该包装器进程组包装了 torch.distributed.init_process_group()torch.distributed.new_group() API 返回的所有进程组。因此,这些 API 将返回一个包装器进程组,该进程组可以像常规进程组一样使用,但在将 collective 调度到基础进程组之前执行一致性检查。目前,这些检查包括 torch.distributed.monitored_barrier(),它确保所有 rank 完成其未完成的 collective 调用,并报告卡住的 rank。接下来,通过确保所有 collective 函数匹配并使用一致的张量形状调用来检查 collective 本身的一致性。如果不是这种情况,则在应用程序崩溃时会包含详细的错误报告,而不是挂起或信息量不足的错误消息。例如,考虑以下函数,该函数在 torch.distributed.all_reduce() 中具有不匹配的输入形状

import torch
import torch.distributed as dist
import torch.multiprocessing as mp


def worker(rank):
    dist.init_process_group("nccl", rank=rank, world_size=2)
    torch.cuda.set_device(rank)
    tensor = torch.randn(10 if rank == 0 else 20).cuda()
    dist.all_reduce(tensor)
    torch.cuda.synchronize(device=rank)


if __name__ == "__main__":
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29501"
    os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
    os.environ["TORCH_DISTRIBUTED_DEBUG"] = "DETAIL"
    mp.spawn(worker, nprocs=2, args=())

使用 NCCL 后端,这样的应用程序可能会导致挂起,这在非平凡的场景中可能难以找到根本原因。如果用户启用 TORCH_DISTRIBUTED_DEBUG=DETAIL 并重新运行应用程序,则以下错误消息将揭示根本原因

work = default_pg.allreduce([tensor], opts)
RuntimeError: Error when verifying shape tensors for collective ALLREDUCE on rank 0. This likely indicates that input shapes into the collective are mismatched across ranks. Got shapes:  10
[ torch.LongTensor{1} ]

注意

为了在运行时对调试级别进行细粒度控制,还可以使用函数 torch.distributed.set_debug_level()torch.distributed.set_debug_level_from_env()torch.distributed.get_debug_level()

此外,TORCH_DISTRIBUTED_DEBUG=DETAIL 可以与 TORCH_SHOW_CPP_STACKTRACES=1 结合使用,以便在检测到 collective 不同步时记录整个调用堆栈。这些 collective 不同步检查将适用于所有使用 c10d collective 调用,并由使用 torch.distributed.init_process_group()torch.distributed.new_group() API 创建的进程组支持的应用程序。

日志记录

除了通过 torch.distributed.monitored_barrier()TORCH_DISTRIBUTED_DEBUG 提供的显式调试支持外,torch.distributed 的底层 C++ 库还在各个级别输出日志消息。这些消息有助于了解分布式训练作业的执行状态,并排除网络连接失败等问题。下表显示了如何通过 TORCH_CPP_LOG_LEVELTORCH_DISTRIBUTED_DEBUG 环境变量的组合来调整日志级别。

TORCH_CPP_LOG_LEVEL

TORCH_DISTRIBUTED_DEBUG

生效日志级别

ERROR

忽略

错误

WARNING

忽略

警告

INFO

忽略

信息

INFO

INFO

Debug

INFO

DETAIL

Trace (又名 All)

分布式组件引发从 RuntimeError 派生的自定义异常类型

  • torch.distributed.DistError: 这是所有分布式异常的基本类型。

  • torch.distributed.DistBackendError: 当发生后端特定的错误时,将引发此异常。例如,如果使用 NCCL 后端,并且用户尝试使用 NCCL 库不可用的 GPU。

  • torch.distributed.DistNetworkError: 当网络库遇到错误时(例如:连接被对端重置),将引发此异常

  • torch.distributed.DistStoreError: 当 Store 遇到错误时(例如:TCPStore 超时),将引发此异常

class torch.distributed.DistError

当分布式库中发生错误时引发的异常

class torch.distributed.DistBackendError

当分布式中发生后端错误时引发的异常

class torch.distributed.DistNetworkError

当分布式中发生网络错误时引发的异常

class torch.distributed.DistStoreError

当分布式存储中发生错误时引发的异常

如果您正在运行单节点训练,则可以方便地以交互方式断点脚本。我们提供了一种方便地断点单个 rank 的方法

torch.distributed.breakpoint(rank=0, skip=0)[source][source]

设置断点,但仅在单个 rank 上设置。所有其他 rank 将等待您完成断点后再继续。

参数
  • rank (int) – 要在其上断点的 rank。默认值:0

  • skip (int) – 跳过对此断点的首次 skip 调用。默认值:0

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取面向初学者和高级开发者的深入教程

查看教程

资源

查找开发资源并获得您的问题解答

查看资源