分布式通信包 - torch.distributed¶
注意
请参阅 PyTorch 分布式概述,简要了解所有与分布式训练相关的功能。
后端¶
torch.distributed
支持三种内置后端,每种具有不同的能力。下表显示了可用于 CPU / CUDA 张量的函数。仅当用于构建 PyTorch 的实现支持 CUDA 时,MPI 才支持 CUDA。
后端 |
|
|
|
|||
---|---|---|---|---|---|---|
设备 |
CPU |
GPU |
CPU |
GPU |
CPU |
GPU |
send |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
recv |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
broadcast |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
all_reduce |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
reduce |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
all_gather |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
gather |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
scatter |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
reduce_scatter |
✘ |
✘ |
✘ |
✘ |
✘ |
✓ |
all_to_all |
✘ |
✘ |
✓ |
? |
✘ |
✓ |
barrier |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
PyTorch 自带的后端¶
PyTorch 分布式包支持 Linux(稳定)、MacOS(稳定)和 Windows(原型)。对于 Linux,默认情况下 Gloo 和 NCCL 后端已构建并包含在 PyTorch 分布式包中(NCCL 仅在构建时包含 CUDA 时)。MPI 是一个可选后端,仅在从源代码构建 PyTorch 时才能包含。(例如,在安装了 MPI 的主机上构建 PyTorch。)
注意
从 PyTorch v1.8 开始,Windows 支持除 NCCL 外的所有集合通信后端。如果 init_method 参数的 init_process_group()
指向文件,则必须遵循以下模式
本地文件系统,
init_method="file:///d:/tmp/some_file"
共享文件系统,
init_method="file://////{machine_name}/{share_folder_name}/some_file"
与 Linux 平台相同,您可以通过设置环境变量 MASTER_ADDR 和 MASTER_PORT 来启用 TcpStore。
使用哪个后端?¶
过去,我们经常被问到:“我应该使用哪个后端?”。
经验法则
使用 NCCL 后端进行分布式 GPU 训练
使用 Gloo 后端进行分布式 CPU 训练。
具有 InfiniBand 互连的 GPU 主机
使用 NCCL,因为它是目前唯一支持 InfiniBand 和 GPUDirect 的后端。
具有 Ethernet 互连的 GPU 主机
使用 NCCL,因为它目前提供了最佳的分布式 GPU 训练性能,尤其是对于多进程单节点或多节点分布式训练。如果您在使用 NCCL 时遇到任何问题,请使用 Gloo 作为备选方案。(请注意,Gloo 目前在 GPU 上的运行速度比 NCCL 慢。)
具有 InfiniBand 互连的 CPU 主机
如果您的 InfiniBand 启用了 IP over IB,请使用 Gloo,否则请使用 MPI。我们计划在即将发布的版本中为 Gloo 添加 InfiniBand 支持。
具有 Ethernet 互连的 CPU 主机
使用 Gloo,除非您有特殊原因使用 MPI。
常用环境变量¶
选择要使用的网络接口¶
默认情况下,NCCL 和 Gloo 后端都会尝试找到正确的网络接口。如果自动检测到的接口不正确,您可以使用以下环境变量覆盖它(适用于相应的后端)
NCCL_SOCKET_IFNAME,例如
export NCCL_SOCKET_IFNAME=eth0
GLOO_SOCKET_IFNAME,例如
export GLOO_SOCKET_IFNAME=eth0
如果您正在使用 Gloo 后端,可以通过逗号分隔来指定多个接口,例如:export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3
。后端将以轮询方式在这些接口上分派操作。所有进程在此变量中指定相同数量的接口至关重要。
其他 NCCL 环境变量¶
调试 - 如果 NCCL 发生故障,您可以设置 NCCL_DEBUG=INFO
来打印明确的警告消息以及基本的 NCCL 初始化信息。
您还可以使用 NCCL_DEBUG_SUBSYS
来获取 NCCL 特定方面的更多详细信息。例如,NCCL_DEBUG_SUBSYS=COLL
将打印集合调用的日志,这对于调试挂起很有帮助,尤其是由集合类型或消息大小不匹配引起的挂起。如果拓扑检测失败,设置 NCCL_DEBUG_SUBSYS=GRAPH
将有助于检查详细的检测结果,并在需要 NCCL 团队进一步帮助时保存作为参考。
性能调优 - NCCL 根据其拓扑检测执行自动调优,以节省用户的调优工作。在一些基于套接字的系统上,用户可能仍会尝试调优 NCCL_SOCKET_NTHREADS
和 NCCL_NSOCKS_PERTHREAD
以增加套接字网络带宽。NCCL 已为一些云提供商(例如 AWS 或 GCP)预调优了这两个环境变量。
有关 NCCL 环境变量的完整列表,请参阅 NVIDIA NCCL 官方文档
基础知识¶
torch.distributed 包提供了 PyTorch 支持和通信原语,用于在运行在一个或多个机器上的多个计算节点之间实现多进程并行。类 torch.nn.parallel.DistributedDataParallel()
基于此功能构建,作为任何 PyTorch 模型的一个封装器,提供同步分布式训练。它与 多进程包 - torch.multiprocessing 和 torch.nn.DataParallel()
提供的并行方式不同,因为它支持多个网络连接的机器,并且用户必须为每个进程显式启动主训练脚本的单独副本。
在单机同步情况下,torch.distributed 或 torch.nn.parallel.DistributedDataParallel()
封装器相比其他数据并行方法(包括 torch.nn.DataParallel()
)仍可能具有优势
每个进程维护自己的优化器,并在每次迭代中执行完整的优化步骤。虽然这可能看起来是多余的,因为梯度已经在进程之间收集并平均,因此对于每个进程都是相同的,但这意味无需参数广播步骤,从而减少了在节点之间传输张量的时间。
每个进程包含一个独立的 Python 解释器,消除了从单个 Python 进程驱动多个执行线程、模型副本或 GPU 所带来的额外解释器开销和“GIL 争用”。这对于大量使用 Python 运行时(包括带有循环层或许多小组件的模型)的模型尤其重要。
初始化¶
在使用任何其他方法之前,需要使用 torch.distributed.init_process_group()
或 torch.distributed.device_mesh.init_device_mesh()
函数来初始化包。这两个函数都会阻塞直到所有进程都加入。
警告
初始化不是线程安全的。进程组的创建应从单个线程执行,以防止跨排名分配不一致的 'UUID',并防止初始化期间可能导致挂起的竞争条件。
- torch.distributed.is_available()[source][source]¶
如果分布式包可用,则返回
True
。否则,
torch.distributed
不会暴露任何其他 API。目前,torch.distributed
在 Linux、MacOS 和 Windows 上可用。从源代码构建 PyTorch 时,设置USE_DISTRIBUTED=1
以启用它。目前,Linux 和 Windows 的默认值为USE_DISTRIBUTED=1
,MacOS 的默认值为USE_DISTRIBUTED=0
。- 返回类型
- torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None)[source][source]¶
初始化默认分布式进程组。
这也会初始化分布式包。
- 初始化进程组主要有两种方式
显式指定
store
,rank
, 和world_size
。指定
init_method
(一个 URL 字符串),指明在哪里/如何发现对等方。可以选择指定rank
和world_size
,或将所有必需参数编码在 URL 中并省略它们。
如果两者都未指定,则假定
init_method
为“env://”。- 参数
backend (str or Backend, optional) – 要使用的后端。根据构建时配置,有效值包括
mpi
、gloo
、nccl
、ucc
或第三方插件注册的后端。自 2.6 版本起,如果未提供backend
,c10d 将使用 device_id kwarg(如果提供)指示的设备类型注册的后端。目前已知的默认注册是:cuda
使用nccl
,cpu
使用gloo
。如果既未提供backend
也未提供device_id
,c10d 将检测运行时机器上的加速器并使用为该检测到的加速器(或cpu
)注册的后端。此字段可以作为小写字符串(例如,“gloo
”)给出,也可以通过Backend
属性(例如,Backend.GLOO
)访问。如果在使用nccl
后端时每台机器使用多个进程,则每个进程必须对其使用的每个 GPU 拥有独占访问权,因为在进程之间共享 GPU 可能导致死锁或 NCCL 无效使用。ucc
后端是实验性的。init_method (str, optional) – 指定如何初始化进程组的 URL。如果未指定
init_method
或store
,则默认为“env://”。与store
互斥。world_size (int, optional) – 参与作业的进程数。如果指定了
store
,则必需。rank (int, optional) – 当前进程的排名(应为 0 到
world_size
-1 之间的数字)。如果指定了store
,则必需。store (Store, optional) – 所有工作进程可访问的键/值存储,用于交换连接/地址信息。与
init_method
互斥。timeout (timedelta, optional) – 对进程组执行的操作的超时时间。NCCL 的默认值为 10 分钟,其他后端的默认值为 30 分钟。这是集合操作将异步中止并导致进程崩溃的持续时间。这样做是因为 CUDA 执行是异步的,并且由于失败的异步 NCCL 操作可能导致后续的 CUDA 操作在损坏的数据上运行,继续执行用户代码已不再安全。设置 TORCH_NCCL_BLOCKING_WAIT 时,进程将阻塞并等待此超时。
group_name (str, optional, deprecated) – 组名。此参数被忽略
pg_options (ProcessGroupOptions, optional) – 进程组选项,指定在构建特定进程组时需要传入的额外选项。目前,我们唯一支持的选项是针对
nccl
后端的ProcessGroupNCCL.Options
,可以指定is_high_priority_stream
,以便 nccl 后端在有计算内核等待时能够选择高优先级 CUDA 流。有关配置 nccl 的其他可用选项,请参阅 https://docs.nvda.net.cn/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-tdevice_id (torch.device, optional) – 用于将此进程“绑定”到单个特定设备,从而实现后端特定优化。目前,这仅在 NCCL 下有两个影响:通信器会立即形成(立即调用
ncclCommInit*
而不是正常的延迟调用),并且子组在可能的情况下会使用ncclCommSplit
以避免不必要的组创建开销。如果想尽早了解 NCCL 初始化错误,也可以使用此字段。
注意
要启用
backend == Backend.MPI
,需要在支持 MPI 的系统上从源代码构建 PyTorch。注意
对多个后端的支持是实验性的。目前,当未指定后端时,会创建
gloo
和nccl
后端。gloo
后端将用于带有 CPU 张量的集合操作,nccl
后端将用于带有 CUDA 张量的集合操作。可以通过传入格式为“<device_type>:<backend_name>,<device_type>:<backend_name>”的字符串来指定自定义后端,例如“cpu:gloo,cuda:custom_backend”。
- torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, *, mesh_dim_names=None)[source][source]¶
根据 device_type、mesh_shape 和 mesh_dim_names 参数初始化 DeviceMesh。
这将创建一个具有 n 维数组布局的 DeviceMesh,其中 n 是 mesh_shape 的长度。如果提供了 mesh_dim_names,则每个维度都将标记为 mesh_dim_names[i]。
注意
init_device_mesh 遵循 SPMD 编程模型,意味着相同的 PyTorch Python 程序在集群中的所有进程/排名上运行。确保 mesh_shape(描述设备布局的 nD 数组的维度)在所有排名上一致。不一致的 mesh_shape 可能导致程序挂起。
注意
如果找不到进程组,init_device_mesh 将在幕后初始化分布式通信所需的进程组。
- 参数
- 返回
表示设备布局的
DeviceMesh
对象。- 返回类型
- 示例:
>>> from torch.distributed.device_mesh import init_device_mesh >>> >>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,)) >>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp"))
- torch.distributed.is_torchelastic_launched()[source][source]¶
检查此进程是否通过
torch.distributed.elastic
(也称为 torchelastic) 启动。是否存在
TORCHELASTIC_RUN_ID
环境变量被用作判断当前进程是否通过 torchelastic 启动的代理。这是一个合理的代理,因为TORCHELASTIC_RUN_ID
映射到集合 ID (rendezvous id),它总是一个非空值,用于指示作业 ID 以进行对等节点发现 (peer discovery) 用途。- 返回类型
目前支持三种初始化方法
TCP 初始化¶
使用 TCP 进行初始化有两种方式,两者都需要所有进程可访问的网络地址以及期望的 world_size
。第一种方式需要指定属于 rank 0 进程的地址。这种初始化方法要求所有进程都手动指定 rank。
请注意,最新版本的分布式包不再支持多播地址 (multicast address)。group_name
也已弃用。
import torch.distributed as dist
# Use address of one of the machines
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456',
rank=args.rank, world_size=4)
环境变量初始化¶
此方法将从环境变量中读取配置,从而允许完全自定义如何获取信息。需要设置的变量有:
MASTER_PORT
- 必需;必须是 rank 0 机器上未被占用的端口MASTER_ADDR
- 必需(rank 0 除外);rank 0 节点的地址WORLD_SIZE
- 必需;可以在此处设置,也可以在调用初始化函数时设置RANK
- 必需;可以在此处设置,也可以在调用初始化函数时设置
将使用 rank 0 的机器来建立所有连接。
这是默认方法,这意味着不必指定 init_method
(或可以设置为 env://
)。
初始化后¶
运行 torch.distributed.init_process_group()
后,可以使用以下函数。要检查进程组是否已初始化,请使用 torch.distributed.is_initialized()
。
- class torch.distributed.Backend(name)[source][source]¶
一个类似于枚举的后端类。
可用后端:GLOO, NCCL, UCC, MPI, XCCL, 和其他已注册的后端。
此类的值是小写字符串,例如
"gloo"
。可以通过属性访问它们,例如Backend.NCCL
。此类可以直接调用来解析字符串,例如
Backend(backend_str)
将检查backend_str
是否有效,如果有效,则返回解析后的小写字符串。它也接受大写字符串,例如Backend("GLOO")
返回"gloo"
。注意
条目
Backend.UNDEFINED
存在,但仅用作某些字段的初始值。用户不应直接使用它或假设其存在。- classmethod register_backend(name, func, extended_api=False, devices=None)[source][source]¶
注册一个具有给定名称和实例化函数的新后端。
此类方法供第三方
ProcessGroup
扩展用于注册新后端。- 参数
name (str) –
ProcessGroup
扩展的后端名称。应与init_process_group()
中的名称匹配。func (function) – 实例化后端的函数处理器。该函数应在后端扩展中实现,并接受四个参数,包括
store
,rank
,world_size
和timeout
。extended_api (bool, optional) – 后端是否支持扩展参数结构。默认值:
False
。如果设置为True
,后端将获得一个c10d::DistributedBackendOptions
实例以及由后端实现定义的进程组选项对象。device (str or list of str, optional) – 此后端支持的设备类型,例如 “cpu”, “cuda” 等。如果为 None,则假定同时支持 “cpu” 和 “cuda”。
注意
第三方后端支持是实验性的,可能会有所变化。
- torch.distributed.get_backend(group=None)[source][source]¶
返回给定进程组的后端。
- 参数
group (ProcessGroup, optional) – 要操作的进程组。默认是主进程组。如果指定了另一个特定组,则调用进程必须是
group
的一部分。- 返回
给定进程组的后端,返回一个全小写的字符串。
- 返回类型
关闭¶
在退出时通过调用 destroy_process_group()
清理资源非常重要。
最简单的做法是在训练脚本中不再需要通信的地方(通常在主函数的末尾)调用 destroy_process_group()
并将 group 参数保留其默认值 None,以销毁每个进程组和后端。每个训练进程都应调用一次此函数,而不是在外部进程启动器级别调用。
如果在超时时间内并非所有 rank 都调用 destroy_process_group()
,特别是当应用程序中有多个进程组(例如用于 N-D 并行)时,可能会导致退出时挂起。这是因为 ProcessGroupNCCL 的析构函数调用了 ncclCommAbort,而 ncclCommAbort 必须协同调用,但是如果由 Python 的 GC 调用 ProcessGroupNCCL 的析构函数,则调用顺序是不确定的。调用 destroy_process_group()
有助于确保 ncclCommAbort 在各个 rank 之间以一致的顺序调用,并避免在 ProcessGroupNCCL 的析构函数中调用 ncclCommAbort。
重新初始化¶
destroy_process_group 也可以用来销毁单个进程组。一个用例是容错训练,其中进程组可能在运行时被销毁然后初始化一个新的。在这种情况下,在调用 destroy 之后、随后初始化之前,使用 torch.distributed 原语以外的其他方式同步训练进程至关重要。由于实现这种同步的困难性,此行为目前不受支持/未经测试,被认为是一个已知问题。如果这是阻碍您的用例,请提交 github issue 或 RFC。
组¶
默认情况下,集合通信操作在默认组(也称为世界)上进行,并且需要所有进程都进入分布式函数调用。然而,某些工作负载可以从更细粒度的通信中受益。这时分布式组就派上用场了。new_group()
函数可以用来创建新组,其中包含所有进程的任意子集。它返回一个不透明的组句柄,可以作为 group
参数传递给所有集合通信操作(集合通信是分布式函数,用于在某些已知编程模式下交换信息)。
- torch.distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False, group_desc=None, device_id=None)[source][source]¶
创建一个新的分布式组。
此函数要求主组中的所有进程(即参与分布式作业的所有进程)都进入此函数,即使它们不属于该组的成员。此外,所有进程必须以相同的顺序创建组。
警告
安全的并发使用:当使用
NCCL
后端且有多个进程组时,用户必须确保在各个 rank 上集合通信的执行顺序全局一致。如果一个进程内的多个线程发出集合通信操作,则需要显式同步以确保一致的顺序。
当使用 torch.distributed 通信 API 的异步变体时,会返回一个工作对象,并且通信核被加入到单独的 CUDA 流中排队,从而允许通信和计算重叠。一旦在一个进程组上发出一个或多个异步操作,在使用另一个进程组之前,必须通过调用 work.wait() 与其他 CUDA 流进行同步。
- 参数
ranks (list[int]) – 组成员的 rank 列表。如果为
None
,将设置为所有 rank。默认值为None
。timeout (timedelta, optional) – 详情和默认值请参阅 init_process_group。
backend (str or Backend, optional) – 要使用的后端。根据构建时配置,有效值为
gloo
和nccl
。默认使用与全局组相同的后端。此字段应提供为小写字符串(例如"gloo"
),也可以通过Backend
属性访问(例如Backend.GLOO
)。如果传入None
,将使用与默认进程组对应的后端。默认值为None
。pg_options (ProcessGroupOptions, optional) – 进程组选项,指定在构造特定进程组时需要传递的其他选项。例如,对于
nccl
后端,可以指定is_high_priority_stream
,以便进程组可以选择高优先级 CUDA 流。有关可用于配置 nccl 的其他可用选项,请参阅 https://docs.nvda.net.cn/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-tuse_local_synchronization (bool, optional) – 在进程组创建结束时执行组内局部 barrier。这与全局 barrier 不同,非组成员的 rank 不需要调用此 API 也不参与 barrier。
group_desc (str, optional) – 描述进程组的字符串。
device_id (torch.device, optional) – 用于将此进程“绑定”到的单个特定设备,如果提供了此字段,new_group 调用将尝试立即为该设备初始化通信后端。
- 返回
一个分布式组的句柄,可以传递给集合通信调用,或者如果 rank 不是
ranks
的一部分,则返回 GroupMember.NON_GROUP_MEMBER。
注意:use_local_synchronization 不适用于 MPI。
注意:虽然 use_local_synchronization=True 在大型集群和小型进程组中速度显着更快,但必须小心使用,因为它改变了集群行为,非组成员的 rank 不会参与 group barrier()。
注意:use_local_synchronization=True 可能导致死锁,当每个 rank 创建多个重叠的进程组时。为了避免这种情况,请确保所有 rank 遵循相同的全局创建顺序。
- torch.distributed.get_group_rank(group, global_rank)[source][source]¶
将全局 rank 转换为组内 rank。
global_rank
必须属于group
,否则会引发 RuntimeError。- 参数
group (ProcessGroup) – 用于查找相对 rank 的进程组。
global_rank (int) – 要查询的全局 rank。
- 返回
global_rank
相对于group
的组内 rank。- 返回类型
注意:在默认进程组上调用此函数返回 identity。
DeviceMesh¶
DeviceMesh 是一种更高层次的抽象,用于管理进程组(或 NCCL 通信器)。它允许用户轻松创建节点间和节点内进程组,而无需担心如何为不同的子进程组正确设置 rank,并且有助于轻松管理这些分布式进程组。init_device_mesh()
函数可以用来创建新的 DeviceMesh,其中 mesh 形状描述了设备拓扑。
- class torch.distributed.device_mesh.DeviceMesh(device_type, mesh, *, mesh_dim_names=None, _init_backend=True)[source][source]¶
DeviceMesh 表示一个设备网格,其中设备的布局可以表示为一个 n 维数组,n 维数组的每个值是默认进程组 rank 的全局 ID。
DeviceMesh 可用于描述设备在集群中的布局,并作为集群内设备列表之间通信的代理。
DeviceMesh 可以用作上下文管理器。
注意
DeviceMesh 遵循 SPMD 编程模型,这意味着相同的 PyTorch Python 程序在集群中的所有进程/rank 上运行。因此,用户需要确保 mesh 数组(描述设备布局)在所有 rank 上都相同。不一致的 mesh 将导致静默挂起。
- 参数
device_type (str) – mesh 的设备类型。目前支持:“cpu”, “cuda/cuda-like”。
mesh (ndarray) – 描述设备布局的多维数组或整数 tensor,其中 ID 是默认进程组的全局 ID。
- 返回
表示设备布局的
DeviceMesh
对象。- 返回类型
以下程序在每个进程/rank 上以 SPMD 方式运行。在此示例中,我们有 2 个主机,每个主机有 4 个 GPU。对 mesh 的第一维进行 reduce 操作将在列 (0, 4)、... 和 (3, 7) 上进行 reduce,对 mesh 的第二维进行 reduce 操作将在行 (0, 1, 2, 3) 和 (4, 5, 6, 7) 上进行 reduce。
- 示例:
>>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # Initialize device mesh as (2, 4) to represent the topology >>> # of cross-host(dim 0), and within-host (dim 1). >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])
- static from_group(group, device_type, mesh=None, *, mesh_dim_names=None)[source][source]¶
从现有的
ProcessGroup
或现有ProcessGroup
列表构造一个具有device_type
的DeviceMesh
。构造的设备网格的维度数等于传入的组数。例如,如果传入单个进程组,则生成的 DeviceMesh 是一个 1D 网格。如果传入 2 个进程组的列表,则生成的 DeviceMesh 是一个 2D 网格。
如果传入了多个组,则
mesh
和mesh_dim_names
参数是必需的。传入的进程组的顺序决定了 mesh 的拓扑结构。例如,第一个进程组将是 DeviceMesh 的第 0 维。mesh 张量必须具有与传入的进程组数相同的维度数,并且 mesh 张量中的维度顺序必须与传入的进程组的顺序匹配。- 参数
group (ProcessGroup or list[ProcessGroup]) – 现有的 ProcessGroup 或现有 ProcessGroup 的列表。
device_type (str) – 网格的设备类型。目前支持:“cpu”、“cuda/cuda-like”。不允许传入带有 GPU 索引的设备类型,例如“cuda:0”。
mesh (torch.Tensor or ArrayLike, optional) – 描述设备布局的多维数组或整数 tensor,其中 ID 是默认进程组的全局 ID。默认值为 None。
mesh_dim_names (tuple[str], optional) – 用于为描述设备布局的多维数组的每个维度分配 mesh 维度名称的元组。其长度必须与 mesh_shape 的长度匹配。mesh_dim_names 中的每个字符串必须唯一。默认值为 None。
- 返回
表示设备布局的
DeviceMesh
对象。- 返回类型
- get_all_groups()[source][source]¶
返回所有 mesh 维度的 ProcessGroups 列表。
- 返回
一个
ProcessGroup
对象列表。- 返回类型
list[torch.distributed.distributed_c10d.ProcessGroup]
- get_group(mesh_dim=None)[source][source]¶
返回由 mesh_dim 指定的单个 ProcessGroup,或者,如果未指定 mesh_dim 且 DeviceMesh 是一维的,则返回网格中唯一的 ProcessGroup。
- 参数
mesh_dim (str/python:int, 可选的) – 可以是网格维度的名称或索引。
(默认值为 None。) –
- 返回
一个
ProcessGroup
对象。- 返回类型
ProcessGroup
- get_local_rank(mesh_dim=None)[source][source]¶
返回 DeviceMesh 在给定 mesh_dim 上的局部进程编号(local rank)。
- 参数
mesh_dim (str/python:int, 可选的) – 可以是网格维度的名称或索引。
(默认值为 None。) –
- 返回
一个整数,表示局部进程编号(local rank)。
- 返回类型
以下程序以 SPMD 方式在每个进程/进程编号上运行。在此示例中,我们有 2 个主机,每个主机有 4 个 GPU。在进程编号 0、1、2、3 上调用
mesh_2d.get_local_rank(mesh_dim=0)
将返回 0。在进程编号 4、5、6、7 上调用mesh_2d.get_local_rank(mesh_dim=0)
将返回 1。在进程编号 0、4 上调用mesh_2d.get_local_rank(mesh_dim=1)
将返回 0。在进程编号 1、5 上调用mesh_2d.get_local_rank(mesh_dim=1)
将返回 1。在进程编号 2、6 上调用mesh_2d.get_local_rank(mesh_dim=1)
将返回 2。在进程编号 3、7 上调用mesh_2d.get_local_rank(mesh_dim=1)
将返回 3。- 示例:
>>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # Initialize device mesh as (2, 4) to represent the topology >>> # of cross-host(dim 0), and within-host (dim 1). >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])
点对点通信¶
- torch.distributed.send(tensor, dst=None, group=None, tag=0, group_dst=None)[source][source]¶
同步发送一个张量。
警告
NCCL 后端不支持
tag
。
- torch.distributed.recv(tensor, src=None, group=None, tag=0, group_src=None)[source][source]¶
同步接收一个张量。
警告
NCCL 后端不支持
tag
。
调用 isend()
和 irecv()
时,会返回分布式请求对象。通常,此对象的类型是未指定的,因为它们永远不应手动创建,但它们保证支持以下两种方法
is_completed()
- 如果操作已完成,则返回True
wait()
- 将阻塞进程直到操作完成。wait()
返回后,is_completed()
保证返回True
。
- torch.distributed.isend(tensor, dst=None, group=None, tag=0, group_dst=None)[source][source]¶
异步发送一个张量。
警告
在请求完成之前修改
tensor
会导致未定义的行为。警告
NCCL 后端不支持
tag
。与阻塞的 send 不同,isend 允许 src == dst 进程编号,即发送给自己。
- torch.distributed.irecv(tensor, src=None, group=None, tag=0, group_src=None)[source][source]¶
异步接收一个张量。
警告
NCCL 后端不支持
tag
。与阻塞的 recv 不同,irecv 允许 src == dst 进程编号,即从自己接收。
- torch.distributed.send_object_list(object_list, dst=None, group=None, device=None, group_dst=None)[source][source]¶
同步发送
object_list
中的可序列化对象。类似于
send()
,但可以传入 Python 对象。请注意,object_list
中的所有对象必须是可序列化的才能发送。- 参数
object_list (List[Any]) – 要发送的输入对象列表。每个对象必须是可序列化的。
dst (int) – 要将
object_list
发送到的目标进程编号。目标进程编号基于全局进程组(无论group
参数如何)group (Optional[ProcessGroup]) – (ProcessGroup, 可选的): 要进行操作的进程组。如果为
None
,则使用默认进程组。默认值为None
。device (
torch.device
, 可选的) – 如果不是None
,则对象会被序列化并转换为张量,然后在发送前移动到device
上。默认值为None
。group_dst (int, 可选的) –
group
上的目标进程编号。必须指定dst
和group_dst
中的一个,但不能同时指定两者
- 返回
None
.
注意
对于基于
NCCL
的进程组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备上。在这种情况下,使用的设备由torch.cuda.current_device()
指定,并且用户有责任确保已进行设置,以便每个进程编号(rank)拥有独立的 GPU,这可以通过torch.cuda.set_device()
实现。警告
send_object_list()
隐式使用了pickle
模块,该模块已知存在安全问题。有可能构造恶意的 pickle 数据,在反序列化时执行任意代码。仅在您信任的数据上调用此函数。警告
对 GPU 张量调用
send_object_list()
支持不佳且效率低下,因为它会导致 GPU -> CPU 传输(由于张量会被 pickle 序列化)。请考虑改用send()
。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}]
- torch.distributed.recv_object_list(object_list, src=None, group=None, device=None, group_src=None)[source][source]¶
同步接收到
object_list
中的可序列化对象。类似于
recv()
,但可以接收 Python 对象。- 参数
object_list (List[Any]) – 用于接收的(空)对象列表。必须提供一个大小与待发送列表大小相同的列表。
src (int, 可选的) – 从中接收
object_list
的源进程编号。源进程编号基于全局进程组(无论group
参数如何)。如果设置为None
,则将从任何进程编号接收。默认值为None
。group (Optional[ProcessGroup]) – (ProcessGroup, 可选的): 要进行操作的进程组。如果为
None
,则使用默认进程组。默认值为None
。device (
torch.device
, 可选的) – 如果不是None
,则在此设备上接收。默认值为None
。group_src (int, 可选的) –
group
上的源进程编号。不可同时指定src
和group_src
。
- 返回
发送方进程编号。如果进程编号不在组中,则为 -1。如果进程编号在组中,
object_list
将包含来自src
进程编号发送的对象。
注意
对于基于
NCCL
的进程组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备上。在这种情况下,使用的设备由torch.cuda.current_device()
指定,并且用户有责任确保已进行设置,以便每个进程编号(rank)拥有独立的 GPU,这可以通过torch.cuda.set_device()
实现。警告
recv_object_list()
隐式使用了pickle
模块,该模块已知存在安全问题。有可能构造恶意的 pickle 数据,在反序列化时执行任意代码。仅在您信任的数据上调用此函数。警告
对 GPU 张量调用
recv_object_list()
支持不佳且效率低下,因为它会导致 GPU -> CPU 传输(由于张量会被 pickle 序列化)。请考虑改用recv()
。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}]
- torch.distributed.batch_isend_irecv(p2p_op_list)[source][source]¶
异步发送或接收一批张量,并返回请求列表。
处理
p2p_op_list
中的每个操作,并返回相应的请求。目前支持NCCL
、Gloo
和UCC
后端。- 参数
p2p_op_list (list[torch.distributed.distributed_c10d.P2POp]) – 一个点对点操作列表(每个操作符的类型为
torch.distributed.P2POp
)。列表中isend
/irecv
的顺序很重要,需要与远程端的相应isend
/irecv
匹配。- 返回
一个分布式请求对象列表,通过调用 op_list 中的相应操作返回。
- 返回类型
示例
>>> send_tensor = torch.arange(2, dtype=torch.float32) + 2 * rank >>> recv_tensor = torch.randn(2, dtype=torch.float32) >>> send_op = dist.P2POp(dist.isend, send_tensor, (rank + 1) % world_size) >>> recv_op = dist.P2POp( ... dist.irecv, recv_tensor, (rank - 1 + world_size) % world_size ... ) >>> reqs = batch_isend_irecv([send_op, recv_op]) >>> for req in reqs: >>> req.wait() >>> recv_tensor tensor([2, 3]) # Rank 0 tensor([0, 1]) # Rank 1
注意
请注意,当此 API 与
NCCL
PG 后端一起使用时,用户必须使用 torch.cuda.set_device 设置当前的 GPU 设备,否则将导致意外的挂起问题。此外,如果此 API 是传递给
dist.P2POp
的group
中的第一个集合通信调用,则group
中的所有进程编号(rank)都必须参与此 API 调用;否则,行为未定义。如果此 API 调用不是group
中的第一个集合通信调用,则允许只涉及group
中部分进程编号的批量 P2P 操作。
同步和异步集合通信操作¶
每个集合通信函数都支持以下两种操作,取决于传递给集合通信操作的 async_op
标志的设置
同步操作 - 当 async_op
设置为 False
时,这是默认模式。当函数返回时,保证已执行集合通信操作。对于 CUDA
操作,不保证 CUDA
操作已完成,因为 CUDA
操作是异步的。对于 CPU
集合通信,任何利用集合通信调用输出的后续函数调用将按预期运行。对于 CUDA
集合通信,在同一 CUDA
流上利用输出的函数调用将按预期运行。在不同流下运行的场景中,用户必须注意同步。有关 CUDA
语义的详细信息(如流同步),请参阅 CUDA Semantics。请参阅下面的脚本,了解 CPU
和 CUDA
操作在这些语义上的差异示例。
异步操作 - 当 async_op
设置为 True
时。集合通信函数返回一个分布式请求对象。通常,您不需要手动创建它,并且它保证支持以下两种方法
is_completed()
- 对于CPU
集合通信,如果完成则返回True
。对于CUDA
操作,如果操作已成功排队到CUDA
流上,并且输出可以在默认流上使用而无需进一步同步,则返回True
。wait()
- 对于CPU
集合通信,将阻塞进程直到操作完成。对于CUDA
集合通信,将阻塞当前活动的CUDA
流直到操作完成(但不会阻塞CPU
)。get_future()
- 返回一个torch._C.Future
对象。支持NCCL
,也支持GLOO
和MPI
上的大多数操作,点对点操作除外。注意:随着我们继续采用 Future 并合并 API,get_future()
调用可能会变得多余。
示例
以下代码可作为在使用分布式集合通信时关于 CUDA
操作语义的参考。它展示了在不同 CUDA
流上使用集合通信输出时明确需要同步。
# Code runs on each rank.
dist.init_process_group("nccl", rank=rank, world_size=2)
output = torch.tensor([rank]).cuda(rank)
s = torch.cuda.Stream()
handle = dist.all_reduce(output, async_op=True)
# Wait ensures the operation is enqueued, but not necessarily complete.
handle.wait()
# Using result on non-default stream.
with torch.cuda.stream(s):
s.wait_stream(torch.cuda.default_stream())
output.add_(100)
if rank == 0:
# if the explicit call to wait_stream was omitted, the output below will be
# non-deterministically 1 or 101, depending on whether the allreduce overwrote
# the value after the add completed.
print(output)
集合通信函数¶
- torch.distributed.broadcast(tensor, src=None, group=None, async_op=False, group_src=None)[source][source]¶
将张量广播到整个组。
参与集合通信的所有进程中的
tensor
必须具有相同数量的元素。- 参数
- 返回
异步工作句柄,如果 async_op 设置为 True。如果 async_op 未设置或不是组的一部分,则为 None。
- torch.distributed.broadcast_object_list(object_list, src=None, group=None, device=None, group_src=None)[source][source]¶
广播
object_list
中的可 pickle 对象到整个组。类似于
broadcast()
,但可以传入 Python 对象。注意object_list
中的所有对象必须是可 pickle 的才能被广播。- 参数
object_list (List[Any]) – 要广播的输入对象列表。每个对象都必须是可 pickle 的。只有
src
rank 上的对象才会被广播,但每个 rank 必须提供大小相等的列表。src (int) – 广播
object_list
的源 rank。源 rank 基于全局进程组(不考虑group
参数)。group (Optional[ProcessGroup]) – (ProcessGroup, 可选的): 要进行操作的进程组。如果为
None
,则使用默认进程组。默认值为None
。device (
torch.device
, optional) – 如果不是 None,对象将被序列化并转换为 tensors,然后在广播之前移动到device
。默认值为None
。group_src (int) –
group
中的源 rank。必须指定group_src
和src
中的一个,但不能同时指定两者。
- 返回
None
。如果 rank 是组的一部分,object_list
将包含来自src
rank 的广播对象。
注意
对于基于
NCCL
的进程组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备上。在这种情况下,使用的设备由torch.cuda.current_device()
指定,并且用户有责任确保已进行设置,以便每个进程编号(rank)拥有独立的 GPU,这可以通过torch.cuda.set_device()
实现。注意
注意此 API 与
broadcast()
集合操作略有不同,因为它不提供async_op
句柄,因此将是一个阻塞调用。警告
broadcast_object_list()
隐式使用pickle
模块,众所周知它不安全。可以构造恶意 pickle 数据,在反序列化时执行任意代码。仅对信任的数据调用此函数。警告
使用 GPU tensors 调用
broadcast_object_list()
支持不好且效率低下,因为它会导致 GPU -> CPU 传输,因为 tensors 将会被 pickle。请考虑改用broadcast()
。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # Assumes world_size of 3. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> else: >>> objects = [None, None, None] >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> dist.broadcast_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}]
- torch.distributed.all_reduce(tensor, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source][source]¶
规约所有机器上的 tensor 数据,以便所有机器都能获得最终结果。
调用后,
tensor
在所有进程中将是位wise相同的。支持复数 tensors。
- 参数
- 返回
异步工作句柄,如果 async_op 设置为 True。如果 async_op 未设置或不是组的一部分,则为 None。
示例
>>> # All tensors below are of torch.int64 type. >>> # We have 2 process groups, 2 ranks. >>> device = torch.device(f"cuda:{rank}") >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4, 6], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1
>>> # All tensors below are of torch.cfloat type. >>> # We have 2 process groups, 2 ranks. >>> tensor = torch.tensor( ... [1 + 1j, 2 + 2j], dtype=torch.cfloat, device=device ... ) + 2 * rank * (1 + 1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4.+4.j, 6.+6.j], device='cuda:0') # Rank 0 tensor([4.+4.j, 6.+6.j], device='cuda:1') # Rank 1
- torch.distributed.reduce(tensor, dst=None, op=<RedOpType.SUM: 0>, group=None, async_op=False, group_dst=None)[source][source]¶
规约所有机器上的 tensor 数据。
只有 rank 为
dst
的进程将接收最终结果。- 参数
tensor (Tensor) – 集合操作的输入和输出。函数会就地操作。
dst (int) – 全局进程组上的目标进程编号(无论
group
参数如何)op (optional) –
torch.distributed.ReduceOp
枚举中的值之一。指定用于元素wise规约的操作。group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。
async_op (bool, 可选的) – 此操作是否应为异步操作
group_dst (int) –
group
中的目标 rank。必须指定group_dst
和dst
中的一个,但不能同时指定两者。
- 返回
异步工作句柄,如果 async_op 设置为 True。如果 async_op 未设置或不是组的一部分,则为 None。
- torch.distributed.all_gather(tensor_list, tensor, group=None, async_op=False)[source][source]¶
收集来自整个组的 tensors 到一个列表中。
支持复数 tensors 和大小不等的 tensors。
- 参数
- 返回
异步工作句柄,如果 async_op 设置为 True。如果 async_op 未设置或不是组的一部分,则为 None。
示例
>>> # All tensors below are of torch.int64 dtype. >>> # We have 2 process groups, 2 ranks. >>> device = torch.device(f"cuda:{rank}") >>> tensor_list = [ ... torch.zeros(2, dtype=torch.int64, device=device) for _ in range(2) ... ] >>> tensor_list [tensor([0, 0], device='cuda:0'), tensor([0, 0], device='cuda:0')] # Rank 0 [tensor([0, 0], device='cuda:1'), tensor([0, 0], device='cuda:1')] # Rank 1 >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1, 2], device='cuda:0'), tensor([3, 4], device='cuda:0')] # Rank 0 [tensor([1, 2], device='cuda:1'), tensor([3, 4], device='cuda:1')] # Rank 1
>>> # All tensors below are of torch.cfloat dtype. >>> # We have 2 process groups, 2 ranks. >>> tensor_list = [ ... torch.zeros(2, dtype=torch.cfloat, device=device) for _ in range(2) ... ] >>> tensor_list [tensor([0.+0.j, 0.+0.j], device='cuda:0'), tensor([0.+0.j, 0.+0.j], device='cuda:0')] # Rank 0 [tensor([0.+0.j, 0.+0.j], device='cuda:1'), tensor([0.+0.j, 0.+0.j], device='cuda:1')] # Rank 1 >>> tensor = torch.tensor( ... [1 + 1j, 2 + 2j], dtype=torch.cfloat, device=device ... ) + 2 * rank * (1 + 1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1.+1.j, 2.+2.j], device='cuda:0'), tensor([3.+3.j, 4.+4.j], device='cuda:0')] # Rank 0 [tensor([1.+1.j, 2.+2.j], device='cuda:1'), tensor([3.+3.j, 4.+4.j], device='cuda:1')] # Rank 1
- torch.distributed.all_gather_into_tensor(output_tensor, input_tensor, group=None, async_op=False)[source][source]¶
从所有 ranks 收集 tensors 并将它们放入一个输出 tensor 中。
此函数要求所有进程上的 tensors 具有相同的大小。
- 参数
output_tensor (Tensor) – 容纳所有 ranks 的 tensor 元素的输出 tensor。它必须具有正确的大小,具有以下形式之一:(i) 所有输入 tensors 沿着主维度进行拼接;“拼接”的定义请参见
torch.cat()
;(ii) 所有输入 tensors 沿着主维度进行堆叠;“堆叠”的定义请参见torch.stack()
。下面的示例可能更好地解释支持的输出形式。input_tensor (Tensor) – 当前 rank 要收集的 Tensor。与
all_gather
API 不同,此 API 中的输入 tensors 必须在所有 ranks 上具有相同的大小。group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。
async_op (bool, 可选的) – 此操作是否应为异步操作
- 返回
异步工作句柄,如果 async_op 设置为 True。如果 async_op 未设置或不是组的一部分,则为 None。
示例
>>> # All tensors below are of torch.int64 dtype and on CUDA devices. >>> # We have two ranks. >>> device = torch.device(f"cuda:{rank}") >>> tensor_in = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor_in tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> # Output in concatenation form >>> tensor_out = torch.zeros(world_size * 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out, tensor_in) >>> tensor_out tensor([1, 2, 3, 4], device='cuda:0') # Rank 0 tensor([1, 2, 3, 4], device='cuda:1') # Rank 1 >>> # Output in stack form >>> tensor_out2 = torch.zeros(world_size, 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out2, tensor_in) >>> tensor_out2 tensor([[1, 2], [3, 4]], device='cuda:0') # Rank 0 tensor([[1, 2], [3, 4]], device='cuda:1') # Rank 1
警告
Gloo 后端不支持此 API。
- torch.distributed.all_gather_object(object_list, obj, group=None)[source][source]¶
从整个组收集可 pickle 的对象到一个列表中。
类似于
all_gather()
,但可以传入 Python 对象。注意对象必须是可 pickle 的才能被收集。- 参数
object_list (list[Any]) – 输出列表。对于此集合操作,它的大小应与组的大小一致,并将包含输出。
obj (Any) – 要从当前进程发送的可 pickle 的 Python 对象。
group (ProcessGroup, optional) – 要操作的进程组。如果为 None,将使用默认进程组。默认值为
None
。
- 返回
None。如果调用 rank 是此组的一部分,集合操作的输出将填充到输入
object_list
中。如果调用 rank 不是此组的一部分,传入的object_list
将不被修改。
注意
注意此 API 与
all_gather()
集合操作略有不同,因为它不提供async_op
句柄,因此将是一个阻塞调用。注意
对于基于 NCCL 的处理组,对象的内部 tensor 表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由
torch.cuda.current_device()
指定,用户有责任通过torch.cuda.set_device()
确保此设置正确,以便每个 rank 都有一个独立的 GPU。警告
all_gather_object()
隐式使用pickle
模块,众所周知它不安全。可以构造恶意 pickle 数据,在反序列化时执行任意代码。仅对信任的数据调用此函数。警告
使用 GPU tensors 调用
all_gather_object()
支持不好且效率低下,因为它会导致 GPU -> CPU 传输,因为 tensors 将会被 pickle。请考虑改用all_gather()
。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes world_size of 3. >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object >>> output = [None for _ in gather_objects] >>> dist.all_gather_object(output, gather_objects[dist.get_rank()]) >>> output ['foo', 12, {1: 2}]
- torch.distributed.gather(tensor, gather_list=None, dst=None, group=None, async_op=False, group_dst=None)[source][source]¶
在单个进程中收集 tensors 列表。
此函数要求所有进程上的 tensors 具有相同的大小。
- 参数
tensor (Tensor) – 输入 tensor。
gather_list (list[Tensor], optional) – 大小合适的同尺寸 tensors 列表,用来存放收集到的数据(默认为 None,必须在目标 rank 上指定)。
dst (int, optional) – 在全局进程组中的目标 rank(不考虑
group
参数)。(如果dst
和group_dst
都为 None,默认是全局 rank 0)group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。
async_op (bool, 可选的) – 此操作是否应为异步操作
group_dst (int, 可选的) –
group
上的目标进程编号。不可同时指定dst
和group_dst
- 返回
异步工作句柄,如果 async_op 设置为 True。如果 async_op 未设置或不是组的一部分,则为 None。
注意
注意
gather_list
中的所有 tensors 必须具有相同的大小。- 示例:
>>> # We have 2 process groups, 2 ranks. >>> tensor_size = 2 >>> device = torch.device(f'cuda:{rank}') >>> tensor = torch.ones(tensor_size, device=device) + rank >>> if dist.get_rank() == 0: >>> gather_list = [torch.zeros_like(tensor, device=device) for i in range(2)] >>> else: >>> gather_list = None >>> dist.gather(tensor, gather_list, dst=0) >>> # Rank 0 gets gathered data. >>> gather_list [tensor([1., 1.], device='cuda:0'), tensor([2., 2.], device='cuda:0')] # Rank 0 None # Rank 1
- torch.distributed.gather_object(obj, object_gather_list=None, dst=None, group=None, group_dst=None)[source][source]¶
从整个组收集可 pickle 的对象到一个单独的进程中。
类似于
gather()
,但可以传入 Python 对象。注意对象必须是可 pickle 的才能被收集。- 参数
obj (Any) – 输入对象。必须是可 pickle 的。
object_gather_list (list[Any]) – 输出列表。在
dst
rank 上,它的大小应该与此集合操作的组的大小一致,并将包含输出。在非dst
ranks 上必须是None
。(默认值为None
)dst (int, optional) – 在全局进程组中的目标 rank(不考虑
group
参数)。(如果dst
和group_dst
都为 None,默认是全局 rank 0)group (Optional[ProcessGroup]) – (ProcessGroup, 可选的): 要进行操作的进程组。如果为
None
,则使用默认进程组。默认值为None
。group_dst (int, 可选的) –
group
上的目标进程编号。不可同时指定dst
和group_dst
- 返回
None。在
dst
rank 上,object_gather_list
将包含集合操作的输出。
注意
注意此 API 与 gather 集合操作略有不同,因为它不提供 async_op 句柄,因此将是一个阻塞调用。
注意
对于基于 NCCL 的处理组,对象的内部 tensor 表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由
torch.cuda.current_device()
指定,用户有责任通过torch.cuda.set_device()
确保此设置正确,以便每个 rank 都有一个独立的 GPU。警告
gather_object()
隐式使用pickle
模块,众所周知它不安全。可以构造恶意 pickle 数据,在反序列化时执行任意代码。仅对信任的数据调用此函数。警告
使用 GPU tensors 调用
gather_object()
支持不好且效率低下,因为它会导致 GPU -> CPU 传输,因为 tensors 将会被 pickle。请考虑改用gather()
。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes world_size of 3. >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object >>> output = [None for _ in gather_objects] >>> dist.gather_object( ... gather_objects[dist.get_rank()], ... output if dist.get_rank() == 0 else None, ... dst=0 ... ) >>> # On rank 0 >>> output ['foo', 12, {1: 2}]
- torch.distributed.scatter(tensor, scatter_list=None, src=None, group=None, async_op=False, group_src=None)[source][source]¶
将 tensors 列表分散到组中的所有进程。
每个进程将接收且仅接收一个 tensor,并将其数据存储在
tensor
参数中。支持复数 tensors。
- 参数
tensor (Tensor) – 输出 tensor。
scatter_list (list[Tensor]) – 要分散的 tensors 列表(默认为 None,必须在源 rank 上指定)。
src (int) – 在全局进程组中的源 rank(不考虑
group
参数)。(如果src
和group_src
都为 None,默认是全局 rank 0)group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。
async_op (bool, 可选的) – 此操作是否应为异步操作
group_src (int, optional) –
group
中的源 rank。同时指定src
和group_src
是无效的。
- 返回
异步工作句柄,如果 async_op 设置为 True。如果 async_op 未设置或不是组的一部分,则为 None。
注意
注意
scatter_list
中的所有 tensors 必须具有相同的大小。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> tensor_size = 2 >>> device = torch.device(f'cuda:{rank}') >>> output_tensor = torch.zeros(tensor_size, device=device) >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> # Only tensors, all of which must be the same size. >>> t_ones = torch.ones(tensor_size, device=device) >>> t_fives = torch.ones(tensor_size, device=device) * 5 >>> scatter_list = [t_ones, t_fives] >>> else: >>> scatter_list = None >>> dist.scatter(output_tensor, scatter_list, src=0) >>> # Rank i gets scatter_list[i]. >>> output_tensor tensor([1., 1.], device='cuda:0') # Rank 0 tensor([5., 5.], device='cuda:1') # Rank 1
- torch.distributed.scatter_object_list(scatter_object_output_list, scatter_object_input_list=None, src=None, group=None, group_src=None)[source][source]¶
将
scatter_object_input_list
中的可 pickle 对象分散到整个组。类似于
scatter()
,但可以传入 Python 对象。在每个 rank 上,分散的对象将作为scatter_object_output_list
的第一个元素被存储。注意scatter_object_input_list
中的所有对象必须是可 pickle 的才能被分散。- 参数
scatter_object_output_list (List[Any]) – 非空列表,其第一个元素将存储分散到此 rank 的对象。
scatter_object_input_list (List[Any], optional) – 要分散的输入对象列表。每个对象都必须是可 pickle 的。只有
src
rank 上的对象才会被分散,非 src ranks 的参数可以是None
。src (int) – 分散
scatter_object_input_list
的源 rank。源 rank 基于全局进程组(不考虑group花——
参数)。(如果src
和group_src
都为 None,默认是全局 rank 0)group (Optional[ProcessGroup]) – (ProcessGroup, 可选的): 要进行操作的进程组。如果为
None
,则使用默认进程组。默认值为None
。group_src (int, optional) –
group
中的源 rank。同时指定src
和group_src
是无效的。
- 返回
None
。如果 rank 是组的一部分,scatter_object_output_list
的第一个元素将被设置为此 rank 分散到的对象。
注意
注意此 API 与 scatter 集合操作略有不同,因为它不提供
async_op
句柄,因此将是一个阻塞调用。警告
scatter_object_list()
隐式使用pickle
模块,众所周知它不安全。可以构造恶意 pickle 数据,在反序列化时执行任意代码。仅对信任的数据调用此函数。警告
使用 GPU tensors 调用
scatter_object_list()
支持不好且效率低下,因为它会导致 GPU -> CPU 传输,因为 tensors 将会被 pickle。请考虑改用scatter()
。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # Assumes world_size of 3. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> else: >>> # Can be any list on non-src ranks, elements are not used. >>> objects = [None, None, None] >>> output_list = [None] >>> dist.scatter_object_list(output_list, objects, src=0) >>> # Rank i gets objects[i]. For example, on rank 2: >>> output_list [{1: 2}]
- torch.distributed.reduce_scatter(output, input_list, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source][source]¶
先规约,然后将 tensors 列表分散到组中的所有进程。
- 参数
- 返回
异步工作句柄,如果 async_op 设置为 True。如果 async_op 未设置或不是组的一部分,则为 None。
- torch.distributed.reduce_scatter_tensor(output, input, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source][source]¶
先规约,然后将一个 tensor 分散到组中的所有 ranks。
- 参数
output (Tensor) – 输出 tensor。它在所有 ranks 上应具有相同的大小。
input (Tensor) – 要规约并分散的输入 tensor。它的大小应为输出 tensor 大小的 world size 倍。输入 tensor 可以是以下形状之一:(i) 输出 tensors 沿着主维度的拼接,或 (ii) 输出 tensors 沿着主维度的堆叠。“拼接”的定义请参见
torch.cat()
。“堆叠”的定义请参见torch.stack()
。group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。
async_op (bool, optional) – 此操作是否应为异步操作。
- 返回
异步工作句柄,如果 async_op 设置为 True。如果 async_op 未设置或不是组的一部分,则为 None。
示例
>>> # All tensors below are of torch.int64 dtype and on CUDA devices. >>> # We have two ranks. >>> device = torch.device(f"cuda:{rank}") >>> tensor_out = torch.zeros(2, dtype=torch.int64, device=device) >>> # Input in concatenation form >>> tensor_in = torch.arange(world_size * 2, dtype=torch.int64, device=device) >>> tensor_in tensor([0, 1, 2, 3], device='cuda:0') # Rank 0 tensor([0, 1, 2, 3], device='cuda:1') # Rank 1 >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) >>> tensor_out tensor([0, 2], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1 >>> # Input in stack form >>> tensor_in = torch.reshape(tensor_in, (world_size, 2)) >>> tensor_in tensor([[0, 1], [2, 3]], device='cuda:0') # Rank 0 tensor([[0, 1], [2, 3]], device='cuda:1') # Rank 1 >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) >>> tensor_out tensor([0, 2], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1
警告
Gloo 后端不支持此 API。
- torch.distributed.all_to_all_single(output, input, output_split_sizes=None, input_split_sizes=None, group=None, async_op=False)[source][source]¶
拆分输入 tensor,然后将拆分后的列表分散到组中的所有进程。
之后,从组中的所有进程接收到的 tensors 会被拼接,并作为单个输出 tensor 返回。
支持复数 tensors。
- 参数
output (Tensor) – 收集后拼接的输出 tensor。
input (Tensor) – 要分散的输入 tensor。
output_split_sizes – (list[Int], optional): dim 0 的输出拆分大小。如果指定为 None 或空列表,
output
tensor 的 dim 0 必须能被world_size
整除。input_split_sizes – (list[Int], optional): dim 0 的输入拆分大小。如果指定为 None 或空列表,
input
tensor 的 dim 0 必须能被world_size
整除。group (ProcessGroup, optional) – 要操作的进程组。如果为 None,则使用默认进程组。
async_op (bool, optional) – 此操作是否应为异步操作。
- 返回
异步工作句柄,如果 async_op 设置为 True。如果 async_op 未设置或不是组的一部分,则为 None。
警告
all_to_all_single 是实验性的,可能会有所更改。
示例
>>> input = torch.arange(4) + rank * 4 >>> input tensor([0, 1, 2, 3]) # Rank 0 tensor([4, 5, 6, 7]) # Rank 1 tensor([8, 9, 10, 11]) # Rank 2 tensor([12, 13, 14, 15]) # Rank 3 >>> output = torch.empty([4], dtype=torch.int64) >>> dist.all_to_all_single(output, input) >>> output tensor([0, 4, 8, 12]) # Rank 0 tensor([1, 5, 9, 13]) # Rank 1 tensor([2, 6, 10, 14]) # Rank 2 tensor([3, 7, 11, 15]) # Rank 3
>>> # Essentially, it is similar to following operation: >>> scatter_list = list(input.chunk(world_size)) >>> gather_list = list(output.chunk(world_size)) >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i)
>>> # Another example with uneven split >>> input tensor([0, 1, 2, 3, 4, 5]) # Rank 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 tensor([20, 21, 22, 23, 24]) # Rank 2 tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 >>> input_splits [2, 2, 1, 1] # Rank 0 [3, 2, 2, 2] # Rank 1 [2, 1, 1, 1] # Rank 2 [2, 2, 2, 1] # Rank 3 >>> output_splits [2, 3, 2, 2] # Rank 0 [2, 2, 1, 2] # Rank 1 [1, 2, 1, 2] # Rank 2 [1, 2, 1, 1] # Rank 3 >>> output = ... >>> dist.all_to_all_single(output, input, output_splits, input_splits) >>> output tensor([ 0, 1, 10, 11, 12, 20, 21, 30, 31]) # Rank 0 tensor([ 2, 3, 13, 14, 22, 32, 33]) # Rank 1 tensor([ 4, 15, 16, 23, 34, 35]) # Rank 2 tensor([ 5, 17, 18, 24, 36]) # Rank 3
>>> # Another example with tensors of torch.cfloat type. >>> input = torch.tensor( ... [1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j], dtype=torch.cfloat ... ) + 4 * rank * (1 + 1j) >>> input tensor([1+1j, 2+2j, 3+3j, 4+4j]) # Rank 0 tensor([5+5j, 6+6j, 7+7j, 8+8j]) # Rank 1 tensor([9+9j, 10+10j, 11+11j, 12+12j]) # Rank 2 tensor([13+13j, 14+14j, 15+15j, 16+16j]) # Rank 3 >>> output = torch.empty([4], dtype=torch.int64) >>> dist.all_to_all_single(output, input) >>> output tensor([1+1j, 5+5j, 9+9j, 13+13j]) # Rank 0 tensor([2+2j, 6+6j, 10+10j, 14+14j]) # Rank 1 tensor([3+3j, 7+7j, 11+11j, 15+15j]) # Rank 2 tensor([4+4j, 8+8j, 12+12j, 16+16j]) # Rank 3
- torch.distributed.all_to_all(output_tensor_list, input_tensor_list, group=None, async_op=False)[source][source]¶
将输入 tensors 列表分散到组中的所有进程,并在输出列表中返回收集到的 tensors 列表。
支持复数 tensors。
- 参数
- 返回
异步工作句柄,如果 async_op 设置为 True。如果 async_op 未设置或不是组的一部分,则为 None。
警告
all_to_all 是实验性的,可能会改变。
示例
>>> input = torch.arange(4) + rank * 4 >>> input = list(input.chunk(4)) >>> input [tensor([0]), tensor([1]), tensor([2]), tensor([3])] # Rank 0 [tensor([4]), tensor([5]), tensor([6]), tensor([7])] # Rank 1 [tensor([8]), tensor([9]), tensor([10]), tensor([11])] # Rank 2 [tensor([12]), tensor([13]), tensor([14]), tensor([15])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([0]), tensor([4]), tensor([8]), tensor([12])] # Rank 0 [tensor([1]), tensor([5]), tensor([9]), tensor([13])] # Rank 1 [tensor([2]), tensor([6]), tensor([10]), tensor([14])] # Rank 2 [tensor([3]), tensor([7]), tensor([11]), tensor([15])] # Rank 3
>>> # Essentially, it is similar to following operation: >>> scatter_list = input >>> gather_list = output >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src=i)
>>> input tensor([0, 1, 2, 3, 4, 5]) # Rank 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 tensor([20, 21, 22, 23, 24]) # Rank 2 tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 >>> input_splits [2, 2, 1, 1] # Rank 0 [3, 2, 2, 2] # Rank 1 [2, 1, 1, 1] # Rank 2 [2, 2, 2, 1] # Rank 3 >>> output_splits [2, 3, 2, 2] # Rank 0 [2, 2, 1, 2] # Rank 1 [1, 2, 1, 2] # Rank 2 [1, 2, 1, 1] # Rank 3 >>> input = list(input.split(input_splits)) >>> input [tensor([0, 1]), tensor([2, 3]), tensor([4]), tensor([5])] # Rank 0 [tensor([10, 11, 12]), tensor([13, 14]), tensor([15, 16]), tensor([17, 18])] # Rank 1 [tensor([20, 21]), tensor([22]), tensor([23]), tensor([24])] # Rank 2 [tensor([30, 31]), tensor([32, 33]), tensor([34, 35]), tensor([36])] # Rank 3 >>> output = ... >>> dist.all_to_all(output, input) >>> output [tensor([0, 1]), tensor([10, 11, 12]), tensor([20, 21]), tensor([30, 31])] # Rank 0 [tensor([2, 3]), tensor([13, 14]), tensor([22]), tensor([32, 33])] # Rank 1 [tensor([4]), tensor([15, 16]), tensor([23]), tensor([34, 35])] # Rank 2 [tensor([5]), tensor([17, 18]), tensor([24]), tensor([36])] # Rank 3
>>> # Another example with tensors of torch.cfloat type. >>> input = torch.tensor( ... [1 + 1j, 2 + 2j, 3 + 3j, 4 + 4j], dtype=torch.cfloat ... ) + 4 * rank * (1 + 1j) >>> input = list(input.chunk(4)) >>> input [tensor([1+1j]), tensor([2+2j]), tensor([3+3j]), tensor([4+4j])] # Rank 0 [tensor([5+5j]), tensor([6+6j]), tensor([7+7j]), tensor([8+8j])] # Rank 1 [tensor([9+9j]), tensor([10+10j]), tensor([11+11j]), tensor([12+12j])] # Rank 2 [tensor([13+13j]), tensor([14+14j]), tensor([15+15j]), tensor([16+16j])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([1+1j]), tensor([5+5j]), tensor([9+9j]), tensor([13+13j])] # Rank 0 [tensor([2+2j]), tensor([6+6j]), tensor([10+10j]), tensor([14+14j])] # Rank 1 [tensor([3+3j]), tensor([7+7j]), tensor([11+11j]), tensor([15+15j])] # Rank 2 [tensor([4+4j]), tensor([8+8j]), tensor([12+12j]), tensor([16+16j])] # Rank 3
- torch.distributed.barrier(group=None, async_op=False, device_ids=None)[source][source]¶
同步所有进程。
如果 async_op 为 False,或异步工作句柄在调用 wait() 时,此集合通信会阻塞进程,直到整个组进入此函数。
- 参数
- 返回
异步工作句柄,如果 async_op 设置为 True。如果 async_op 未设置或不是组的一部分,则为 None。
注意
ProcessGroupNCCL 现在会阻塞 CPU 线程,直到屏障集合通信完成。
- torch.distributed.monitored_barrier(group=None, timeout=None, wait_all_ranks=False)[source][source]¶
类似
torch.distributed.barrier
同步进程,但会考虑可配置的超时。它能够报告在指定超时时间内未能通过此屏障的秩。具体来说,对于非零秩,它将阻塞,直到处理了来自秩 0 的发送/接收。秩 0 将阻塞,直到处理了来自其他所有秩的发送/接收,并会报告未能及时响应的秩的失败情况。请注意,如果某个秩未能到达 monitored_barrier(例如由于挂起),所有其他秩也将在 monitored_barrier 中失败。
此集合通信将阻塞组中的所有进程/秩,直到整个组成功退出函数,这使其对于调试和同步非常有用。然而,它可能会影响性能,因此应仅用于调试或需要在主机端进行完全同步的场景。出于调试目的,可以在应用程序的集合通信调用之前插入此屏障,以检查是否有任何秩不同步。
注意
请注意,此集合通信仅支持 GLOO 后端。
- 参数
group (ProcessGroup, optional) – 要操作的进程组。如果为
None
,将使用默认进程组。timeout (datetime.timedelta, optional) – monitored_barrier 的超时时间。如果为
None
,将使用默认进程组的超时时间。wait_all_ranks (bool, optional) – 是否收集所有失败的秩。默认情况下,此参数为
False
,并且秩 0 上的monitored_barrier
将在遇到第一个失败的秩时抛出异常,以便快速失败。通过设置wait_all_ranks=True
,monitored_barrier
将收集所有失败的秩,并抛出包含所有失败秩信息的错误。
- 返回
None
.
- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() != 1: >>> dist.monitored_barrier() # Raises exception indicating that >>> # rank 1 did not call into monitored_barrier. >>> # Example with wait_all_ranks=True >>> if dist.get_rank() == 0: >>> dist.monitored_barrier(wait_all_ranks=True) # Raises exception >>> # indicating that ranks 1, 2, ... world_size - 1 did not call into >>> # monitored_barrier.
- class torch.distributed.Work¶
一个 Work 对象表示 PyTorch 分布式包中待处理的异步操作的句柄。它由非阻塞集合通信操作返回,例如 dist.all_reduce(tensor, async_op=True)。
- exception(self: torch._C._distributed_c10d.Work) std::__exception_ptr::exception_ptr ¶
- get_future(self: torch._C._distributed_c10d.Work) torch.Future ¶
- 返回
一个
torch.futures.Future
对象,与Work
的完成相关联。例如,可以通过fut = process_group.allreduce(tensors).get_future()
检索到 future 对象。
- 示例:
下面是一个简单的 allreduce DDP 通信钩子的示例,它使用
get_future` API 来检索与 ``allreduce
完成相关联的 Future。>>> def allreduce(process_group: dist.ProcessGroup, bucket: dist.GradBucket): -> torch.futures.Future >>> group_to_use = process_group if process_group is not None else torch.distributed.group.WORLD >>> tensor = bucket.buffer().div_(group_to_use.size()) >>> return torch.distributed.all_reduce(tensor, group=group_to_use, async_op=True).get_future() >>> ddp_model.register_comm_hook(state=None, hook=allreduce)
警告
get_future
API 支持 NCCL 以及部分 GLOO 和 MPI 后端(不支持点对点操作,如 send/recv),并将返回一个torch.futures.Future
。在上面的示例中,
allreduce
工作将在 GPU 上使用 NCCL 后端完成,fut.wait()
将在将相应的 NCCL 流与 PyTorch 当前的设备流同步后返回,以确保我们可以进行异步 CUDA 执行,并且它不会等待整个操作在 GPU 上完成。请注意,CUDAFuture
不支持TORCH_NCCL_BLOCKING_WAIT
标志或 NCCL 的barrier()
。此外,如果使用fut.then()
添加了回调函数,它将等待WorkNCCL
的 NCCL 流与ProcessGroupNCCL
的专用回调流同步后,并在回调流上运行回调后立即调用回调。fut.then()
将返回另一个CUDAFuture
,其中包含回调的返回值以及记录回调流的CUDAEvent
。对于 CPU 工作,当工作完成且 value() 张量准备就绪时,
fut.done()
返回 true。对于 GPU 工作,
fut.done()
仅在操作已入队时返回 true。对于混合 CPU-GPU 工作(例如使用 GLOO 发送 GPU 张量),
fut.done()
在张量已到达相应节点时返回 true,但不一定已在相应 GPU 上同步(类似于 GPU 工作)。
- get_future_result(self: torch._C._distributed_c10d.Work) torch.Future ¶
- 返回
一个
torch.futures.Future
对象,其类型为 int,映射到 WorkResult 的枚举类型。例如,可以通过fut = process_group.allreduce(tensor).get_future_result()
检索到 future 对象。
- 示例:
用户可以使用
fut.wait()
阻塞式等待工作完成,并通过fut.value()
获取 WorkResult。此外,用户可以使用fut.then(call_back_func)
注册一个回调函数,以便在工作完成后调用,而不会阻塞当前线程。
警告
get_future_result
API 支持 NCCL
- result(self: torch._C._distributed_c10d.Work) list[torch.Tensor] ¶
- wait(self: torch._C._distributed_c10d.Work, timeout: datetime.timedelta = datetime.timedelta(0)) bool ¶
- 返回
true/false。
- 示例:
- try
work.wait(timeout)
- except
# some handling
警告
正常情况下,用户无需设置超时。调用 wait() 等同于调用 synchronize():让当前流阻塞,等待 NCCL 工作完成。但是,如果设置了超时,它将阻塞 CPU 线程,直到 NCCL 工作完成或超时。如果超时,将抛出异常。
- class torch.distributed.ReduceOp¶
一个枚举类,用于表示可用的归约操作:
SUM
,PRODUCT
,MIN
,MAX
,BAND
,BOR
,BXOR
, 和PREMUL_SUM
。使用
NCCL
后端时,BAND
,BOR
, 和BXOR
归约操作不可用。AVG
在跨秩求和之前将值除以世界大小。AVG
仅在使用NCCL
后端时可用,且仅适用于 NCCL 2.10 或更高版本。PREMUL_SUM
在归约之前将输入乘以给定的标量。PREMUL_SUM
仅在使用NCCL
后端时可用,且仅适用于 NCCL 2.11 或更高版本。 用户应使用torch.distributed._make_nccl_premul_sum
。此外,
MAX
,MIN
和PRODUCT
不支持复数张量。此类的属性可以作为值访问,例如
ReduceOp.SUM
。它们用于指定归约集合通信的策略,例如reduce()
。此类不支持
__members__
属性。
分布式键值存储¶
分布式包附带了一个分布式键值存储,可用于在组中的进程之间共享信息,以及在 torch.distributed.init_process_group()
中初始化分布式包(通过显式创建存储作为指定 init_method
的替代方法)。键值存储有 3 种选择:TCPStore
, FileStore
, 和 HashStore
。
- class torch.distributed.Store¶
所有存储实现的基类,例如 PyTorch 分布式提供的 3 种实现:(
TCPStore
,FileStore
, 和HashStore
)。- add(self: torch._C._distributed_c10d.Store, arg0: str, arg1: int) int ¶
首次为给定
key
调用 add 会在存储中创建一个与key
相关联的计数器,并初始化为amount
。后续使用相同key
调用 add 会按指定的amount
递增计数器。如果调用add()
时使用的 key 已通过set()
在存储中设置,将导致异常。- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.add("first_key", 1) >>> store.add("first_key", 6) >>> # Should return 7 >>> store.get("first_key")
- append(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) None ¶
根据提供的
key
和value
将键值对追加到存储中。如果key
在存储中不存在,则会创建它。- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.append("first_key", "po") >>> store.append("first_key", "tato") >>> # Should return "potato" >>> store.get("first_key")
- check(self: torch._C._distributed_c10d.Store, arg0: list[str]) bool ¶
用于检查给定
keys
列表是否在存储中有值存储的调用。此调用在正常情况下会立即返回,但在某些边缘死锁情况下仍可能受影响,例如 TCPStore 被销毁后调用 check。使用想要检查是否存储在存储中的 key 列表调用check()
。- 参数
keys (lisr[str]) – 要查询是否存储在存储中的键。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.add("first_key", 1) >>> # Should return 7 >>> store.check(["first_key"])
- compare_set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str, arg2: str) bytes ¶
根据提供的
key
将键值对插入存储中,并在插入前执行expected_value
和desired_value
之间的比较。只有当key
的expected_value
已存在于存储中,或者expected_value
是空字符串时,才会设置desired_value
。- 参数
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("key", "first_value") >>> store.compare_set("key", "first_value", "second_value") >>> # Should return "second_value" >>> store.get("key")
- delete_key(self: torch._C._distributed_c10d.Store, arg0: str) bool ¶
从存储中删除与
key
相关联的键值对。如果键成功删除,则返回 true,否则返回 false。- 参数
key (str) – 要从存储中删除的键
- 返回
True 如果
key
被删除,否则 False。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, HashStore can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key") >>> # This should return true >>> store.delete_key("first_key") >>> # This should return false >>> store.delete_key("bad_key")
- get(self: torch._C._distributed_c10d.Store, arg0: str) bytes ¶
检索存储中与给定
key
相关联的值。如果key
不存在于存储中,函数将等待初始化存储时定义的timeout
,然后抛出异常。- 参数
key (str) – 函数将返回与此键相关联的值。
- 返回
如果
key
在存储中,则返回与key
相关联的值。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # Should return "first_value" >>> store.get("first_key")
- multi_get(self: torch._C._distributed_c10d.Store, arg0: list[str]) list[bytes] ¶
检索
keys
中的所有值。如果keys
中的任何键不存在于存储中,函数将等待timeout
- 参数
keys (List[str]) – 要从存储中检索的键。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "po") >>> store.set("second_key", "tato") >>> # Should return [b"po", b"tato"] >>> store.multi_get(["first_key", "second_key"])
- multi_set(self: torch._C._distributed_c10d.Store, arg0: list[str], arg1: list[str]) None ¶
根据提供的
keys
和values
将一个键值对列表插入到存储中。- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.multi_set(["first_key", "second_key"], ["po", "tato"]) >>> # Should return b"po" >>> store.get("first_key")
- num_keys(self: torch._C._distributed_c10d.Store) int ¶
返回存储中已设置的键的数量。请注意,这个数字通常会比通过
set()
和add()
添加的键的数量多一个,因为有一个键用于协调所有使用该存储的工作进程。警告
与
TCPStore
一起使用时,num_keys
返回写入底层文件的键的数量。如果存储被销毁并使用相同文件创建另一个存储,则原始键将被保留。- 返回
存储中存在的键的数量。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # This should return 2 >>> store.num_keys()
- set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) None ¶
根据提供的
key
和value
将键值对插入到存储中。如果key
已存在于存储中,它将使用新提供的value
覆盖旧值。- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # Should return "first_value" >>> store.get("first_key")
- set_timeout(self: torch._C._distributed_c10d.Store, arg0: datetime.timedelta) None ¶
设置存储的默认超时。此超时用于初始化以及
wait()
和get()
方法。- 参数
timeout (timedelta) – 要在存储中设置的超时。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set_timeout(timedelta(seconds=10)) >>> # This will throw an exception after 10 seconds >>> store.wait(["bad_key"])
- property timeout¶
获取存储的超时设置。
- wait(*args, **kwargs)¶
重载函数。
wait(self: torch._C._distributed_c10d.Store, arg0: list[str]) -> None
等待
keys
中的每个键被添加到存储中。如果在timeout
(在存储初始化期间设置)之前并非所有键都已设置,则wait
将抛出异常。- 参数
keys (list) – 要等待其在存储中设置的键列表。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # This will throw an exception after 30 seconds >>> store.wait(["bad_key"])
wait(self: torch._C._distributed_c10d.Store, arg0: list[str], arg1: datetime.timedelta) -> None
等待
keys
中的每个键被添加到存储中,如果在提供的timeout
内键尚未设置,则抛出异常。- 参数
keys (list) – 要等待其在存储中设置的键列表。
timeout (timedelta) – 在抛出异常之前等待键添加的时间。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # This will throw an exception after 10 seconds >>> store.wait(["bad_key"], timedelta(seconds=10))
- class torch.distributed.TCPStore¶
一个基于 TCP 的分布式键值存储实现。服务器存储保存数据,而客户端存储可以通过 TCP 连接到服务器存储并执行诸如
set()
插入键值对、get()
检索键值对等操作。应始终初始化一个服务器存储,因为客户端存储会等待服务器建立连接。- 参数
host_name (str) – 服务器存储应运行的主机名或 IP 地址。
port (int) – 服务器存储应监听传入请求的端口。
world_size (int, optional) – 存储用户总数(客户端数量 + 服务器数量 1)。默认为 None(None 表示存储用户数量不固定)。
is_master (bool, optional) – 初始化服务器存储时为 True,初始化客户端存储时为 False。默认为 False。
timeout (timedelta, optional) – 存储在初始化期间以及诸如
get()
和wait()
等方法使用的超时。默认为 timedelta(seconds=300)。wait_for_workers (bool, optional) – 是否等待所有工作进程连接到服务器存储。这仅适用于 world_size 为固定值的情况。默认为 True。
multi_tenant (bool, optional) – 如果为 True,则当前进程中所有具有相同主机/端口的
TCPStore
实例将使用同一个底层TCPServer
。默认为 False。master_listen_fd (int, optional) – 如果指定,底层
TCPServer
将在这个文件描述符上监听,该文件描述符必须是已经绑定到port
的套接字。在某些场景下有助于避免端口分配竞争。默认为 None(表示服务器创建一个新的套接字并尝试将其绑定到port
)。use_libuv (bool, optional) – 如果为 True,则
TCPServer
后端使用 libuv。默认为 True。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Run on process 1 (server) >>> server_store = dist.TCPStore("127.0.0.1", 1234, 2, True, timedelta(seconds=30)) >>> # Run on process 2 (client) >>> client_store = dist.TCPStore("127.0.0.1", 1234, 2, False) >>> # Use any of the store methods from either the client or server after initialization >>> server_store.set("first_key", "first_value") >>> client_store.get("first_key")
- __init__(self: torch._C._distributed_c10d.TCPStore, host_name: str, port: int, world_size: Optional[int] = None, is_master: bool = False, timeout: datetime.timedelta = datetime.timedelta(seconds=300), wait_for_workers: bool = True, multi_tenant: bool = False, master_listen_fd: Optional[int] = None, use_libuv: bool = True) None ¶
创建一个新的 TCPStore。
- property host¶
获取存储监听请求的主机名。
- property libuvBackend¶
如果使用 libuv 后端,则返回 True。
- property port¶
获取存储监听请求的端口号。
- class torch.distributed.HashStore¶
一个基于底层哈希表的线程安全存储实现。这个存储可以在同一进程内使用(例如,被其他线程使用),但不能跨进程使用。
- 示例:
>>> import torch.distributed as dist >>> store = dist.HashStore() >>> # store can be used from other threads >>> # Use any of the store methods after initialization >>> store.set("first_key", "first_value")
- __init__(self: torch._C._distributed_c10d.HashStore) None ¶
创建一个新的 HashStore。
- class torch.distributed.FileStore¶
一个使用文件来存储底层键值对的存储实现。
- 示例:
>>> import torch.distributed as dist >>> store1 = dist.FileStore("/tmp/filestore", 2) >>> store2 = dist.FileStore("/tmp/filestore", 2) >>> # Use any of the store methods from either the client or server after initialization >>> store1.set("first_key", "first_value") >>> store2.get("first_key")
- __init__(self: torch._C._distributed_c10d.FileStore, file_name: str, world_size: int = -1) None ¶
创建一个新的 FileStore。
- property path¶
获取 FileStore 用于存储键值对的文件的路径。
- class torch.distributed.PrefixStore¶
它是对任何三种键值存储(
TCPStore
、FileStore
和HashStore
)的包装,用于在插入到存储中的每个键前添加一个前缀。- 参数
prefix (str) – 在插入到存储之前添加到每个键前面的前缀字符串。
store (torch.distributed.store) – 构成底层键值存储的存储对象。
- __init__(self: torch._C._distributed_c10d.PrefixStore, prefix: str, store: torch._C._distributed_c10d.Store) None ¶
创建一个新的 PrefixStore。
- property underlying_store¶
获取 PrefixStore 包装的底层存储对象。
集合通信分析¶
请注意,您可以使用 torch.profiler
(推荐,仅在 1.8.1 后可用)或 torch.autograd.profiler
来分析此处提到的集合通信和点对点通信 API。所有开箱即用的后端(gloo
、nccl
、mpi
)都受支持,集合通信的使用将在分析输出/轨迹中按预期呈现。分析您的代码与任何常规的 torch 运算符相同。
import torch
import torch.distributed as dist
with torch.profiler():
tensor = torch.randn(20, 10)
dist.all_reduce(tensor)
有关分析器功能的完整概述,请参阅 分析器文档。
多 GPU 集合函数¶
警告
多 GPU 函数(表示每个 CPU 线程多个 GPU)已弃用。目前,PyTorch Distributed 的首选编程模型是每个线程一个设备,本文档中的 API 就是例证。如果您是后端开发人员,并且想支持每个线程多个设备,请联系 PyTorch Distributed 的维护人员。
第三方后端¶
除了内置的 GLOO/MPI/NCCL 后端外,PyTorch 分布式还通过运行时注册机制支持第三方后端。有关如何通过 C++ Extension 开发第三方后端的参考,请参阅 教程 - 自定义 C++ 和 CUDA 扩展 和 test/cpp_extensions/cpp_c10d_extension.cpp
。第三方后端的 capability 由其自身实现决定。
新的后端派生自 c10d::ProcessGroup
,并在导入时通过 torch.distributed.Backend.register_backend()
注册后端名称和实例化接口。
手动导入此后端并使用相应的后端名称调用 torch.distributed.init_process_group()
时,torch.distributed
包将在新后端上运行。
警告
对第三方后端的支持是实验性的,可能会发生变化。
启动工具¶
torch.distributed 包还在 torch.distributed.launch 中提供了一个启动工具。此 helper 工具可用于在每个节点上启动多个进程进行分布式训练。
模块 torch.distributed.launch
。
torch.distributed.launch
是一个在每个训练节点上启动多个分布式训练进程的模块。
警告
此模块将弃用,取而代之的是 torchrun。
该工具可用于单节点分布式训练,其中将在每个节点上启动一个或多个进程。该工具可用于 CPU 训练或 GPU 训练。如果该工具用于 GPU 训练,每个分布式进程将在单个 GPU 上运行。这可以显着提高单节点训练性能。它也可用于多节点分布式训练,通过在每个节点上启动多个进程,同样可以显着提高多节点分布式训练性能。对于具有支持直接 GPU 的多个 Infiniband 接口的系统,这将尤其有益,因为所有这些接口都可以用于聚合通信带宽。
在单节点分布式训练或多节点分布式训练的两种情况下,此工具都将启动每个节点指定数量的进程(--nproc-per-node
)。如果用于 GPU 训练,此数量需要小于或等于当前系统上的 GPU 数量(nproc_per_node
),每个进程将在 GPU 0 到 GPU (nproc_per_node - 1) 的单个 GPU 上运行。
如何使用此模块
单节点多进程分布式训练
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other
arguments of your training script)
多节点多进程分布式训练:(例如,两个节点)
节点 1:(IP:192.168.1.1,并且有一个空闲端口:1234)
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node-rank=0 --master-addr="192.168.1.1"
--master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
and all other arguments of your training script)
节点 2
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node-rank=1 --master-addr="192.168.1.1"
--master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
and all other arguments of your training script)
要查找此模块提供的可选参数
python -m torch.distributed.launch --help
重要注意事项
1. 目前,此工具和多进程分布式(单节点或多节点)GPU 训练仅在使用 NCCL 分布式后端时才能达到最佳性能。因此,NCCL 后端是 GPU 训练推荐使用的后端。
2. 在您的训练程序中,您必须解析命令行参数:--local-rank=LOCAL_PROCESS_RANK
,这将由本模块提供。如果您的训练程序使用 GPU,您应该确保您的代码仅在 LOCAL_PROCESS_RANK 的 GPU 设备上运行。这可以通过以下方式完成:
解析 local_rank 参数
>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument("--local-rank", "--local_rank", type=int)
>>> args = parser.parse_args()
使用以下任一方法将您的设备设置为 local rank:
>>> torch.cuda.set_device(args.local_rank) # before your code runs
或
>>> with torch.cuda.device(args.local_rank):
>>> # your code to run
>>> ...
在 2.0.0 版本中更改: 启动器将 --local-rank=<rank>
参数传递给您的脚本。从 PyTorch 2.0.0 开始,带连字符的 --local-rank
比之前使用的带下划线的 --local_rank
更受青睐。
为了向后兼容,用户可能需要在其参数解析代码中处理这两种情况。这意味着在参数解析器中同时包含 "--local-rank"
和 "--local_rank"
。如果只提供了 "--local_rank"
,启动器将触发一个错误:“错误:无法识别的参数:--local-rank="--local-rank"
就足够了。
3. 在您的训练程序中,您应该在开始时调用以下函数来启动分布式后端。强烈建议使用 init_method=env://
。其他初始化方法(例如 tcp://
)可能有效,但 env://
是本模块官方支持的方法。
>>> torch.distributed.init_process_group(backend='YOUR BACKEND',
>>> init_method='env://')
4. 在您的训练程序中,您可以使用常规的分布式函数,也可以使用 torch.nn.parallel.DistributedDataParallel()
模块。如果您的训练程序使用 GPU 进行训练,并且您想使用 torch.nn.parallel.DistributedDataParallel()
模块,以下是配置方法。
>>> model = torch.nn.parallel.DistributedDataParallel(model,
>>> device_ids=[args.local_rank],
>>> output_device=args.local_rank)
请确保 device_ids
参数设置为您的代码将操作的唯一 GPU 设备 ID。这通常是进程的 local rank。换句话说,为了使用此工具,device_ids
需要是 [args.local_rank]
,而 output_device
需要是 args.local_rank
。
5. 另一种通过环境变量 LOCAL_RANK
将 local_rank
传递给子进程的方法。当您使用 --use-env=True
启动脚本时,此行为启用。您必须调整上面的子进程示例,将 args.local_rank
替换为 os.environ['LOCAL_RANK']
;当您指定此 flag 时,启动器不会传递 --local-rank
。
警告
local_rank
不是全局唯一的:它仅在单机进程中是唯一的。因此,不要用它来决定是否应该写入网络文件系统等。请参阅 https://github.com/pytorch/pytorch/issues/12042,了解如果操作不正确可能出现问题的示例。
Spawn 工具¶
多进程包 - torch.multiprocessing 包还在 torch.multiprocessing.spawn()
中提供了一个 spawn
函数。此 helper 函数可用于启动多个进程。它的工作原理是传入您想要运行的函数,并启动 N 个进程来运行它。这也可用于多进程分布式训练。
有关如何使用它的参考,请参阅 PyTorch 示例 - ImageNet 实现。
请注意,此函数需要 Python 3.4 或更高版本。
torch.distributed
应用程序调试¶
调试分布式应用程序可能具有挑战性,因为难以理解的挂起、崩溃或各进程(rank)之间不一致的行为。torch.distributed
提供了一套工具,以自助方式帮助调试训练应用程序。
Python Breakpoint¶
在分布式环境中使用 Python 调试器非常方便,但由于它并非开箱即用,许多人根本不使用它。PyTorch 提供了一个定制的 pdb 包装器,可以简化此过程。
torch.distributed.breakpoint 使此过程变得容易。在内部,它通过两种方式定制了 pdb 的断点行为,但其他方面与普通 pdb 一致。1. 仅在一个进程(由用户指定)上附加调试器。2. 通过使用 torch.distributed.barrier() 确保所有其他进程停止,该 barrier 会在被调试进程发出 continue 后释放。3. 将子进程的 stdin 重定向到你的终端。
要使用它,只需在所有进程上发出 torch.distributed.breakpoint(rank),在每种情况下使用相同的 rank 值。
监控的 Barrier¶
从 v1.10 开始,torch.distributed.monitored_barrier()
作为 torch.distributed.barrier()
的替代品而存在,当崩溃时,它会提供有关哪个进程可能出现故障的有用信息,即并非所有进程都在提供的超时时间内调用了 torch.distributed.monitored_barrier()
。torch.distributed.monitored_barrier()
使用 send
/recv
通信原语实现了一个主机端 barrier,其过程类似于确认,允许 rank 0 报告哪些进程未能及时确认 barrier。举例来说,考虑以下函数,其中 rank 1 未能调用 torch.distributed.monitored_barrier()
(在实践中,这可能是由于应用程序 bug 或前一个集合操作中的挂起)。
import os
from datetime import timedelta
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
# monitored barrier requires gloo process group to perform host-side sync.
group_gloo = dist.new_group(backend="gloo")
if rank not in [1]:
dist.monitored_barrier(group=group_gloo, timeout=timedelta(seconds=2))
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
mp.spawn(worker, nprocs=2, args=())
在 rank 0 上产生以下错误消息,允许用户确定哪个进程可能存在故障并进一步调查。
RuntimeError: Rank 1 failed to pass monitoredBarrier in 2000 ms
Original exception:
[gloo/transport/tcp/pair.cc:598] Connection closed by peer [2401:db00:eef0:1100:3560:0:1c05:25d]:8594
TORCH_DISTRIBUTED_DEBUG
¶
结合 TORCH_CPP_LOG_LEVEL=INFO
环境变量,可以使用 TORCH_DISTRIBUTED_DEBUG
触发额外的有用日志记录和集体同步检查,以确保所有进程适当同步。TORCH_DISTRIBUTED_DEBUG
可以设置为 OFF
(默认)、INFO
或 DETAIL
,具体取决于所需的调试级别。请注意,最详细的选项 DETAIL
可能会影响应用程序性能,因此应仅在调试问题时使用。
设置 TORCH_DISTRIBUTED_DEBUG=INFO
将在初始化使用 torch.nn.parallel.DistributedDataParallel()
训练的模型时产生额外的调试日志,而设置 TORCH_DISTRIBUTED_DEBUG=DETAIL
还将在选定的几次迭代中记录运行时性能统计信息。这些运行时统计信息包括前向时间、后向时间、梯度通信时间等数据。举例来说,给定以下应用程序:
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
class TwoLinLayerNet(torch.nn.Module):
def __init__(self):
super().__init__()
self.a = torch.nn.Linear(10, 10, bias=False)
self.b = torch.nn.Linear(10, 1, bias=False)
def forward(self, x):
a = self.a(x)
b = self.b(x)
return (a, b)
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
torch.cuda.set_device(rank)
print("init model")
model = TwoLinLayerNet().cuda()
print("init ddp")
ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])
inp = torch.randn(10, 10).cuda()
print("train")
for _ in range(20):
output = ddp_model(inp)
loss = output[0] + output[1]
loss.sum().backward()
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
os.environ[
"TORCH_DISTRIBUTED_DEBUG"
] = "DETAIL" # set to DETAIL for runtime logging.
mp.spawn(worker, nprocs=2, args=())
初始化时会渲染以下日志:
I0607 16:10:35.739390 515217 logger.cpp:173] [Rank 0]: DDP Initialized with:
broadcast_buffers: 1
bucket_cap_bytes: 26214400
find_unused_parameters: 0
gradient_as_bucket_view: 0
is_multi_device_module: 0
iteration: 0
num_parameter_tensors: 2
output_device: 0
rank: 0
total_parameter_size_bytes: 440
world_size: 2
backend_name: nccl
bucket_sizes: 440
cuda_visible_devices: N/A
device_ids: 0
dtypes: float
master_addr: localhost
master_port: 29501
module_name: TwoLinLayerNet
nccl_async_error_handling: N/A
nccl_blocking_wait: N/A
nccl_debug: WARN
nccl_ib_timeout: N/A
nccl_nthreads: N/A
nccl_socket_ifname: N/A
torch_distributed_debug: INFO
运行时(当设置 TORCH_DISTRIBUTED_DEBUG=DETAIL
时)会渲染以下日志:
I0607 16:18:58.085681 544067 logger.cpp:344] [Rank 1 / 2] Training TwoLinLayerNet unused_parameter_size=0
Avg forward compute time: 40838608
Avg backward compute time: 5983335
Avg backward comm. time: 4326421
Avg backward comm/comp overlap time: 4207652
I0607 16:18:58.085693 544066 logger.cpp:344] [Rank 0 / 2] Training TwoLinLayerNet unused_parameter_size=0
Avg forward compute time: 42850427
Avg backward compute time: 3885553
Avg backward comm. time: 2357981
Avg backward comm/comp overlap time: 2234674
此外,TORCH_DISTRIBUTED_DEBUG=INFO
增强了 torch.nn.parallel.DistributedDataParallel()
由于模型中未使用参数而导致的崩溃日志。目前,如果前向传递中有可能未使用的参数,则必须在 torch.nn.parallel.DistributedDataParallel()
初始化时传入 find_unused_parameters=True
;从 v1.10 开始,所有模型输出都必须用于损失计算,因为 torch.nn.parallel.DistributedDataParallel()
不支持后向传递中未使用的参数。这些约束对于大型模型尤其具有挑战性,因此,当出现错误崩溃时,torch.nn.parallel.DistributedDataParallel()
将记录所有未使用的参数的完全限定名称。例如,在上述应用程序中,如果我们修改 loss
为 loss = output[1]
,则 TwoLinLayerNet.a
在后向传递中未接收到梯度,从而导致 DDP
失败。在崩溃时,用户会收到关于未使用参数的信息,这对于大型模型来说手动查找可能很困难。
RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by passing
the keyword argument `find_unused_parameters=True` to `torch.nn.parallel.DistributedDataParallel`, and by
making sure all `forward` function outputs participate in calculating loss.
If you already have done the above, then the distributed data parallel module wasn't able to locate the output tensors in the return value of your module's `forward` function. Please include the loss function and the structure of the return va
lue of `forward` of your module when reporting this issue (e.g. list, dict, iterable).
Parameters which did not receive grad for rank 0: a.weight
Parameter indices which did not receive grad for rank 0: 0
设置 TORCH_DISTRIBUTED_DEBUG=DETAIL
将在用户直接或间接发出的每个集体调用(例如 DDP allreduce
)上触发额外的一致性和同步检查。这是通过创建一个包装进程组来实现的,该进程组包装了由 torch.distributed.init_process_group()
和 torch.distributed.new_group()
API 返回的所有进程组。因此,这些 API 将返回一个包装进程组,该进程组的使用方式与普通进程组完全相同,但在将集体操作分派到底层进程组之前会执行一致性检查。目前,这些检查包括 torch.distributed.monitored_barrier()
,它确保所有进程完成其未完成的集体调用并报告卡住的进程。接下来,通过确保所有集体函数匹配并以一致的张量形状调用来检查集体操作本身的一致性。如果情况并非如此,当应用程序崩溃时,会包含详细的错误报告,而不是挂起或无信息的错误消息。举例来说,考虑以下函数,其中 torch.distributed.all_reduce()
的输入形状不匹配。
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
torch.cuda.set_device(rank)
tensor = torch.randn(10 if rank == 0 else 20).cuda()
dist.all_reduce(tensor)
torch.cuda.synchronize(device=rank)
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "DETAIL"
mp.spawn(worker, nprocs=2, args=())
使用 NCCL
后端,这样的应用程序很可能导致挂起,这在非平凡场景中可能难以找出根本原因。如果用户启用 TORCH_DISTRIBUTED_DEBUG=DETAIL
并重新运行应用程序,以下错误消息将揭示根本原因。
work = default_pg.allreduce([tensor], opts)
RuntimeError: Error when verifying shape tensors for collective ALLREDUCE on rank 0. This likely indicates that input shapes into the collective are mismatched across ranks. Got shapes: 10
[ torch.LongTensor{1} ]
注意
为了在运行时对调试级别进行精细控制,还可以使用函数 torch.distributed.set_debug_level()
、torch.distributed.set_debug_level_from_env()
和 torch.distributed.get_debug_level()
。
此外,TORCH_DISTRIBUTED_DEBUG=DETAIL 可以与 TORCH_SHOW_CPP_STACKTRACES=1 结合使用,以便在检测到集体操作不同步时记录整个调用堆栈。这些集体操作不同步检查适用于所有使用由 torch.distributed.init_process_group()
和 torch.distributed.new_group()
API 创建的进程组支持的 c10d
集体调用应用程序。
日志记录¶
除了通过 torch.distributed.monitored_barrier()
和 TORCH_DISTRIBUTED_DEBUG
提供的显式调试支持之外,torch.distributed
的底层 C++ 库也会输出各种级别的日志消息。这些消息有助于理解分布式训练作业的执行状态,并排除网络连接失败等问题。以下矩阵显示了如何通过 TORCH_CPP_LOG_LEVEL
和 TORCH_DISTRIBUTED_DEBUG
环境变量的组合来调整日志级别。
|
|
实际日志级别 |
---|---|---|
|
忽略 |
Error(错误) |
|
忽略 |
警告 |
|
忽略 |
Info(信息) |
|
|
Debug(调试) |
|
|
Trace (a.k.a. All)(跟踪,即全部) |
分布式组件会引发派生自 RuntimeError 的自定义 Exception 类型
torch.distributed.DistError:这是所有分布式异常的基类型。
torch.distributed.DistBackendError:当发生特定于后端(backend)的错误时,会引发此异常。例如,如果使用 NCCL 后端,并且用户尝试使用 NCCL 库不可用的 GPU。
torch.distributed.DistNetworkError:当网络库遇到错误时(例如:对端连接重置)会引发此异常。
torch.distributed.DistStoreError:当 Store 遇到错误时(例如:TCPStore 超时)会引发此异常。
- class torch.distributed.DistError¶
分布式库中发生错误时引发的异常。
- class torch.distributed.DistBackendError¶
分布式中发生后端错误时引发的异常。
- class torch.distributed.DistNetworkError¶
分布式中发生网络错误时引发的异常。
- class torch.distributed.DistStoreError¶
分布式 Store 中发生错误时引发的异常。
如果你正在运行单节点训练,交互式地在脚本中设置断点可能很方便。我们提供了一种方便地在单个进程中设置断点的方式。