分布式通信包 - torch.distributed¶
注意
请参考 PyTorch 分布式概述 以简要介绍与分布式训练相关的所有功能。
后端¶
torch.distributed
支持三个内置后端,每个后端具有不同的功能。下表显示了哪些函数可用于 CPU / CUDA 张量。如果用于构建 PyTorch 的实现支持 MPI,则 MPI 也支持 CUDA。
后端 |
|
|
|
|||
---|---|---|---|---|---|---|
设备 |
CPU |
GPU |
CPU |
GPU |
CPU |
GPU |
send |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
recv |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
broadcast |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
all_reduce |
✓ |
✓ |
✓ |
? |
✘ |
✓ |
reduce |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
all_gather |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
gather |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
scatter |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
reduce_scatter |
✘ |
✘ |
✘ |
✘ |
✘ |
✓ |
all_to_all |
✘ |
✘ |
✓ |
? |
✘ |
✓ |
barrier |
✓ |
✘ |
✓ |
? |
✘ |
✓ |
随 PyTorch 提供的后端¶
PyTorch 分布式包支持 Linux(稳定版)、MacOS(稳定版)和 Windows(原型版)。默认情况下,对于 Linux,Gloo 和 NCCL 后端是在 PyTorch 分布式中构建并包含的(只有在使用 CUDA 构建时才包含 NCCL)。MPI 是一个可选的后端,只有在你从源代码构建 PyTorch 时才能包含。(例如,在安装了 MPI 的主机上构建 PyTorch。)
注意
从 PyTorch v1.8 开始,Windows 支持所有集体通信后端,但 NCCL 除外。如果 init_method init_process_group()
的参数指向一个文件,则该文件必须符合以下模式
本地文件系统,
init_method="file:///d:/tmp/some_file"
共享文件系统,
init_method="file://////{machine_name}/{share_folder_name}/some_file"
与 Linux 平台相同,你可以通过设置环境变量 MASTER_ADDR 和 MASTER_PORT 来启用 TcpStore。
使用哪个后端?¶
过去,我们经常被问到:“我应该使用哪个后端?”
经验法则
对于分布式 **GPU** 训练,使用 NCCL 后端
对于分布式 **CPU** 训练,使用 Gloo 后端。
具有 InfiniBand 互连的 GPU 主机
使用 NCCL,因为它是目前唯一支持 InfiniBand 和 GPUDirect 的后端。
具有以太网互连的 GPU 主机
使用 NCCL,因为它目前提供了最佳的分布式 GPU 训练性能,尤其是对于多进程单节点或多节点分布式训练。如果你遇到任何 NCCL 问题,请使用 Gloo 作为备用选项。(请注意,Gloo 目前运行速度比 NCCL 在 GPU 上慢。)
具有 InfiniBand 互连的 CPU 主机
如果你的 InfiniBand 启用了 IP over IB,请使用 Gloo,否则请使用 MPI。我们计划在即将发布的版本中为 Gloo 添加 InfiniBand 支持。
具有以太网互连的 CPU 主机
使用 Gloo,除非你有特定的理由使用 MPI。
常用环境变量¶
选择要使用的网络接口¶
默认情况下,NCCL 和 Gloo 后端都将尝试找到要使用的正确网络接口。如果自动检测到的接口不正确,你可以使用以下环境变量覆盖它(适用于相应的后端)
**NCCL_SOCKET_IFNAME**,例如
export NCCL_SOCKET_IFNAME=eth0
**GLOO_SOCKET_IFNAME**,例如
export GLOO_SOCKET_IFNAME=eth0
如果你使用的是 Gloo 后端,可以通过逗号分隔来指定多个接口,例如:export GLOO_SOCKET_IFNAME=eth0,eth1,eth2,eth3
。后端将以循环方式在这些接口之间调度操作。所有进程都必须在此变量中指定相同数量的接口,这一点至关重要。
其他 NCCL 环境变量¶
**调试** - 如果 NCCL 失败,你可以设置 NCCL_DEBUG=INFO
以打印显式警告消息以及基本的 NCCL 初始化信息。
你也可以使用 NCCL_DEBUG_SUBSYS
来获取有关 NCCL 特定方面的更多详细信息。例如,NCCL_DEBUG_SUBSYS=COLL
将打印集体调用的日志,这在调试挂起时可能会有所帮助,尤其是那些由集体类型或消息大小不匹配引起的挂起。如果拓扑检测失败,设置 NCCL_DEBUG_SUBSYS=GRAPH
以检查详细的检测结果并保存为参考,如果需要 NCCL 团队的进一步帮助,这将很有帮助。
性能调优 - NCCL 根据其拓扑检测执行自动调优,以节省用户的调优工作。在一些基于插槽的系统上,用户仍然可以尝试调优 NCCL_SOCKET_NTHREADS
和 NCCL_NSOCKS_PERTHREAD
以提高插槽网络带宽。这两个环境变量已针对一些云提供商(例如 AWS 或 GCP)由 NCCL 预先调优。
有关 NCCL 环境变量的完整列表,请参阅 NVIDIA NCCL 的官方文档
基础知识¶
torch.distributed 包为 PyTorch 提供了跨多个计算节点(运行在一台或多台机器上)进行多进程并行化的支持和通信原语。类 torch.nn.parallel.DistributedDataParallel()
基于此功能,提供同步分布式训练作为任何 PyTorch 模型的包装器。这与 多进程包 - torch.multiprocessing 和 torch.nn.DataParallel()
提供的并行化类型不同,因为它支持多个网络连接的机器,并且用户必须为每个进程显式启动主训练脚本的单独副本。
在单机同步情况下,torch.distributed 或 torch.nn.parallel.DistributedDataParallel()
包装器可能仍然比其他数据并行化方法(包括 torch.nn.DataParallel()
)具有优势。
每个进程都维护自己的优化器,并在每次迭代中执行完整的优化步骤。虽然这可能看起来很冗余,因为梯度已经收集在一起并在进程之间平均,因此对于每个进程都是相同的,但这意味着不需要参数广播步骤,从而减少了在节点之间传输张量的花费时间。
每个进程包含一个独立的 Python 解释器,消除了从单个 Python 进程驱动多个执行线程、模型副本或 GPU 带来的额外解释器开销和“GIL 争用”。这对那些大量使用 Python 运行时的模型(包括具有循环层或许多小组件的模型)尤其重要。
初始化¶
在调用任何其他方法之前,需要使用 torch.distributed.init_process_group()
或 torch.distributed.device_mesh.init_device_mesh()
函数初始化该包。两者都会阻塞,直到所有进程都加入。
- torch.distributed.is_available()[source]¶
如果分布式包可用,则返回
True
。否则,
torch.distributed
不会公开任何其他 API。目前,torch.distributed
在 Linux、MacOS 和 Windows 上可用。在从源代码构建 PyTorch 时,设置USE_DISTRIBUTED=1
以启用它。目前,Linux 和 Windows 的默认值为USE_DISTRIBUTED=1
,MacOS 的默认值为USE_DISTRIBUTED=0
。- 返回类型
- torch.distributed.init_process_group(backend=None, init_method=None, timeout=None, world_size=-1, rank=-1, store=None, group_name='', pg_options=None, device_id=None)[source]¶
初始化默认分布式进程组。
这也会初始化分布式包。
- 初始化进程组主要有两种方法
显式指定
store
、rank
和world_size
。指定
init_method
(一个 URL 字符串),它指示在哪里/如何发现对等方。可以选择指定rank
和world_size
,或者将所有必需的参数编码到 URL 中并省略它们。
如果两者都没有指定,则假设
init_method
为 “env://”。- 参数
backend (str 或 Backend, 可选) – 要使用的后端。根据构建时配置,有效值包括
mpi
、gloo
、nccl
和ucc
。如果未提供后端,则会创建gloo
和nccl
后端,有关如何管理多个后端的说明,请参见下面的说明。此字段可以作为小写字符串(例如"gloo"
)给出,也可以通过Backend
属性(例如Backend.GLOO
)访问。如果在使用nccl
后端时每台机器上使用多个进程,则每个进程必须对它使用的每个 GPU 拥有独占访问权限,因为在进程之间共享 GPU 会导致死锁。ucc
后端是实验性的。init_method (str, 可选) – 指定如何初始化进程组的 URL。如果未指定
init_method
或store
,则默认为 “env://”。与store
相互排斥。world_size (int, 可选) – 参与作业的进程数。如果指定了
store
,则需要此参数。rank (int, 可选) – 当前进程的等级(它应该是一个介于 0 和
world_size
-1 之间的数字)。如果指定了store
,则需要此参数。store (Store, 可选) – 所有工作人员都可以访问的键值存储,用于交换连接/地址信息。与
init_method
相互排斥。timeout (timedelta, 可选) – 对进程组执行的操作的超时时间。NCCL 的默认值为 10 分钟,其他后端的默认值为 30 分钟。这是集体操作在异步中止后进程崩溃的持续时间。这样做是因为 CUDA 执行是异步的,并且在继续执行用户代码时不再安全,因为失败的异步 NCCL 操作可能会导致后续 CUDA 操作在损坏的数据上运行。当设置 TORCH_NCCL_BLOCKING_WAIT 时,进程将阻塞并等待此超时时间。
group_name (str, 可选, 已弃用) – 组名。此参数被忽略
pg_options (ProcessGroupOptions, 可选) – 进程组选项,指定在构建特定进程组期间需要传递哪些附加选项。目前,我们支持的唯一选项是
ProcessGroupNCCL.Options
用于nccl
后端,可以指定is_high_priority_stream
,以便 nccl 后端在有计算内核在等待时可以拾取高优先级 cuda 流。device_id (torch.device, 可选) – 要“绑定”此进程的单个特定设备,允许进行后端特定的优化。目前,这只有在 NCCL 下才有两种效果:通信器立即形成(立即调用
ncclCommInit*
而不是正常的延迟调用),并且子组将使用ncclCommSplit
(如果可能)以避免不必要的组创建开销。如果你想尽早知道 NCCL 初始化错误,你也可以使用这个字段。
注意
要启用
backend == Backend.MPI
,PyTorch 需要在支持 MPI 的系统上从源代码构建。注意
对多个后端的支持尚处于实验阶段。当前,如果未指定后端,则会创建
gloo
和nccl
两种后端。对于包含 CPU 张量的集合运算,将使用gloo
后端,而对于包含 CUDA 张量的集合运算,将使用nccl
后端。可以通过传递格式为 “<device_type>:<backend_name>,<device_type>:<backend_name>” 的字符串来指定自定义后端,例如,“cpu:gloo,cuda:custom_backend”。
- torch.distributed.device_mesh.init_device_mesh(device_type, mesh_shape, *, mesh_dim_names=None)[source]¶
根据 device_type、mesh_shape 和 mesh_dim_names 参数初始化 DeviceMesh。
这将创建一个具有 n 维数组布局的 DeviceMesh,其中 n 是 mesh_shape 的长度。如果提供了 mesh_dim_names,则每个维度将被标记为 mesh_dim_names[i]。
注意
init_device_mesh 遵循 SPMD 编程模型,这意味着相同的 PyTorch Python 程序在集群中的所有进程/排名上运行。确保 mesh_shape(描述设备布局的 nD 数组的维度)在所有排名上都相同。不一致的 mesh_shape 可能会导致挂起。
注意
如果没有找到进程组,init_device_mesh 将在幕后初始化分布式进程组/组,这些组是分布式通信所需的。
- 参数
- 返回
一个
DeviceMesh
对象,表示设备布局。- 返回类型
- 示例:
>>> from torch.distributed.device_mesh import init_device_mesh >>> >>> mesh_1d = init_device_mesh("cuda", mesh_shape=(8,)) >>> mesh_2d = init_device_mesh("cuda", mesh_shape=(2, 8), mesh_dim_names=("dp", "tp"))
- torch.distributed.is_torchelastic_launched()[source]¶
检查当前进程是否使用
torch.distributed.elastic
(也称为 torchelastic)启动。TORCHELASTIC_RUN_ID
环境变量的存在被用作代理来确定当前进程是否使用 torchelastic 启动。这是一个合理的代理,因为TORCHELASTIC_RUN_ID
映射到 rendezvous id,该 id 始终是非空值,表示用于对等发现的作业 id。- 返回类型
当前支持三种初始化方法
TCP 初始化¶
使用 TCP 初始化有两种方法,这两种方法都需要一个所有进程都可以访问的网络地址,以及一个期望的 world_size
。第一种方法需要指定属于排名 0 进程的地址。这种初始化方法要求所有进程都手动指定排名。
请注意,最新版分布式包不再支持多播地址。 group_name
也已弃用。
import torch.distributed as dist
# Use address of one of the machines
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456',
rank=args.rank, world_size=4)
环境变量初始化¶
此方法将从环境变量中读取配置,使您能够完全自定义如何获取信息。要设置的变量是
MASTER_PORT
- 必需;必须是排名 0 机器上的空闲端口MASTER_ADDR
- 必需(排名 0 除外);排名 0 节点的地址WORLD_SIZE
- 必需;可以在这里设置,也可以在 init 函数调用中设置RANK
- 必需;可以在这里设置,也可以在 init 函数调用中设置
排名为 0 的机器将用于设置所有连接。
这是默认方法,这意味着不需要指定 init_method
(或者可以是 env://
)。
初始化后¶
一旦运行了 torch.distributed.init_process_group()
,就可以使用以下函数。要检查进程组是否已初始化,请使用 torch.distributed.is_initialized()
。
- class torch.distributed.Backend(name)[source]¶
后端的类似枚举的类。
可用的后端:GLOO、NCCL、UCC、MPI 和其他注册的后端。
此类的值是小写字符串,例如
"gloo"
。它们可以作为属性访问,例如Backend.NCCL
。可以直接调用此类来解析字符串,例如,
Backend(backend_str)
将检查backend_str
是否有效,如果有效,则返回解析后的字符串。它还接受大写字符串,例如Backend("GLOO")
返回"gloo"
。注意
条目
Backend.UNDEFINED
存在,但仅用作某些字段的初始值。用户不应该直接使用它,也不应该假设它的存在。- classmethod register_backend(name, func, extended_api=False, devices=None)[source]¶
使用给定的名称和实例化函数注册一个新的后端。
此类方法由第三方
ProcessGroup
扩展用于注册新的后端。- 参数
name (str) –
ProcessGroup
扩展的后端名称。它应该与init_process_group()
中的名称匹配。func (function) – 实例化后端的函数处理程序。该函数应在后端扩展中实现,并接受四个参数,包括
store
、rank
、world_size
和timeout
。extended_api (bool, optional) – 后端是否支持扩展的参数结构。默认值:
False
。如果设置为True
,后端将获得一个c10d::DistributedBackendOptions
实例,以及由后端实现定义的进程组选项对象。device (str or list of str, optional) – 此后端支持的设备类型,例如“cpu”、“cuda”等。如果为 None,则假设“cpu”和“cuda”都支持。
注意
对第三方后端的支持处于实验阶段,可能会发生变化。
- torch.distributed.get_backend(group=None)[source]¶
返回给定进程组的后端。
- 参数
group (ProcessGroup, optional) – 要操作的进程组。默认情况下为常规主进程组。如果指定了其他特定组,则调用进程必须是
group
的一部分。- 返回
给定进程组的后端,以小写字符串形式表示。
- 返回类型
关闭¶
在退出时调用 destroy_process_group()
来清理资源非常重要。
最简单的模式是通过调用 destroy_process_group()
来销毁所有进程组和后端,并将 group 参数的默认值设置为 None,在不再需要通信的训练脚本中的某个点处进行操作,通常是在 main() 的结尾附近。此调用应在每个训练器进程中执行一次,而不是在外部进程启动器级别执行。
如果在超时持续时间内,pg 中所有等级都没有调用 destroy_process_group()
,尤其是在应用程序中有多个进程组时(例如用于 N 维并行),则可能会在退出时挂起。这是因为 ProcessGroupNCCL 的析构函数调用 ncclCommAbort,该函数必须以集合的方式调用,但是 ProcessGroupNCCL 的析构函数由 python 的 GC 调用的顺序是不确定的。调用 destroy_process_group()
通过确保 ncclCommAbort 以一致的顺序跨等级调用来提供帮助,并避免在 ProcessGroupNCCL 的析构函数期间调用 ncclCommAbort。
重新初始化¶
destroy_process_group 也可以用来销毁单个进程组。一个用例可能是容错训练,其中一个进程组可能被销毁,然后在运行时初始化一个新的进程组。在这种情况下,在调用销毁之后,并在随后初始化之前,必须使用除 torch.distributed 原语之外的其他方法来同步训练器进程。由于难以实现这种同步,目前不支持/未测试此行为,并被视为已知问题。如果这是一个阻止您的用例,请提交 github 问题或 RFC。
分布式键值存储¶
分布式软件包带有一个分布式键值存储,它可以用来在组中的进程之间共享信息,以及在 torch.distributed.init_process_group()
中初始化分布式软件包(通过显式创建存储作为指定 init_method
的替代方法)。键值存储有 3 种选择:TCPStore
、FileStore
和 HashStore
。
- class torch.distributed.TCPStore¶
基于 TCP 的分布式键值存储实现。服务器存储保存数据,而客户端存储可以通过 TCP 连接到服务器存储,并执行诸如
set()
(插入键值对)、get()
(检索键值对)等操作。始终应该初始化一个服务器存储,因为客户端存储会等待服务器建立连接。- 参数
host_name (str) – 服务器存储应运行的主机名或 IP 地址。
port (int) – 服务器存储应监听传入请求的端口。
world_size (int, optional) – 存储用户总数(客户端数量 + 服务器的 1)。默认值为 None(None 表示存储用户数量不固定)。
is_master (bool, optional) – 初始化服务器存储时为 True,客户端存储为 False。默认值为 False。
timeout (timedelta, optional) – 存储在初始化期间以及在诸如
get()
和wait()
等方法中使用的超时时间。默认值为 timedelta(seconds=300)wait_for_workers (bool, optional) – 是否等待所有 worker 与服务器存储建立连接。这仅在 world_size 为固定值时适用。默认值为 True。
multi_tenant (bool, 可选) – 如果为 True,当前进程中所有具有相同主机/端口的
TCPStore
实例将使用相同的底层TCPServer
。默认值为 False。master_listen_fd (int, 可选) – 如果指定,底层
TCPServer
将监听此文件描述符,该描述符必须是已绑定到port
的套接字。在某些情况下,这对于避免端口分配竞争很有用。默认值为 None(表示服务器创建新的套接字并尝试将其绑定到port
)。use_libuv (bool, 可选) – 如果为 True,则使用 libuv 作为
TCPServer
后端。默认值为 True。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Run on process 1 (server) >>> server_store = dist.TCPStore("127.0.0.1", 1234, 2, True, timedelta(seconds=30)) >>> # Run on process 2 (client) >>> client_store = dist.TCPStore("127.0.0.1", 1234, 2, False) >>> # Use any of the store methods from either the client or server after initialization >>> server_store.set("first_key", "first_value") >>> client_store.get("first_key")
- class torch.distributed.HashStore¶
基于底层哈希映射的线程安全存储实现。此存储可以在同一个进程中使用(例如,由其他线程使用),但不能跨进程使用。
- 示例:
>>> import torch.distributed as dist >>> store = dist.HashStore() >>> # store can be used from other threads >>> # Use any of the store methods after initialization >>> store.set("first_key", "first_value")
- class torch.distributed.FileStore¶
使用文件存储底层键值对的存储实现。
- 示例:
>>> import torch.distributed as dist >>> store1 = dist.FileStore("/tmp/filestore", 2) >>> store2 = dist.FileStore("/tmp/filestore", 2) >>> # Use any of the store methods from either the client or server after initialization >>> store1.set("first_key", "first_value") >>> store2.get("first_key")
- class torch.distributed.PrefixStore¶
围绕 3 个键值存储中的任何一个 (
TCPStore
、FileStore
和HashStore
) 的包装器,它在插入存储的每个键之前添加一个前缀。- 参数
prefix (str) – 在插入存储之前附加到每个键的前缀字符串。
store (torch.distributed.store) – 构成底层键值存储的存储对象。
- torch.distributed.Store.set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str) None ¶
根据提供的
key
和value
将键值对插入存储。如果key
已经存在于存储中,它将用新提供的value
覆盖旧值。- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # Should return "first_value" >>> store.get("first_key")
- torch.distributed.Store.get(self: torch._C._distributed_c10d.Store, arg0: str) bytes ¶
检索与存储中给定
key
相关联的值。如果key
不存在于存储中,该函数将等待timeout
(在初始化存储时定义),然后抛出异常。- 参数
key (str) – 该函数将返回与该键相关联的值。
- 返回
如果
key
存在于存储中,则与key
相关联的值。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # Should return "first_value" >>> store.get("first_key")
- torch.distributed.Store.add(self: torch._C._distributed_c10d.Store, arg0: str, arg1: int) int ¶
对给定
key
的第一次调用 add 会在存储中创建一个与key
相关联的计数器,初始化为amount
。后续对使用相同key
的 add 的调用会将计数器增加指定的amount
。使用已经通过set()
在存储中设置的键调用add()
将导致异常。- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.add("first_key", 1) >>> store.add("first_key", 6) >>> # Should return 7 >>> store.get("first_key")
- torch.distributed.Store.compare_set(self: torch._C._distributed_c10d.Store, arg0: str, arg1: str, arg2: str) bytes ¶
根据提供的
key
将键值对插入存储,并在插入之前执行expected_value
和desired_value
之间的比较。只有当key
的expected_value
已经存在于存储中,或者expected_value
为空字符串时,才会设置desired_value
。- 参数
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("key", "first_value") >>> store.compare_set("key", "first_value", "second_value") >>> # Should return "second_value" >>> store.get("key")
- torch.distributed.Store.wait(*args, **kwargs)¶
重载函数。
wait(self: torch._C._distributed_c10d.Store, arg0: list[str]) -> None
等待
keys
中的每个键添加到存储。如果在timeout
(在存储初始化期间设置)之前并非所有键都被设置,那么wait
将抛出异常。- 参数
keys (list) – 要等待在存储中设置的键列表。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # This will throw an exception after 30 seconds >>> store.wait(["bad_key"])
wait(self: torch._C._distributed_c10d.Store, arg0: list[str], arg1: datetime.timedelta) -> None
等待
keys
中的每个键添加到存储中,如果在提供的timeout
时间内未设置键,则抛出异常。- 参数
keys (list) – 要等待在存储中设置的键列表。
timeout (timedelta) – 在抛出异常之前等待键添加的时间。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> # This will throw an exception after 10 seconds >>> store.wait(["bad_key"], timedelta(seconds=10))
- torch.distributed.Store.num_keys(self: torch._C._distributed_c10d.Store) int ¶
返回存储中设置的键的数量。请注意,此数量通常比由
set()
和add()
添加的键的数量多一个,因为一个键用于协调使用存储的所有工作进程。警告
当与
TCPStore
一起使用时,num_keys
返回写入基础文件的键的数量。如果存储被销毁,另一个存储使用相同的文件创建,原始键将被保留。- 返回
存储中存在的键的数量。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key", "first_value") >>> # This should return 2 >>> store.num_keys()
- torch.distributed.Store.delete_key(self: torch._C._distributed_c10d.Store, arg0: str) bool ¶
从存储中删除与
key
关联的键值对。如果键已成功删除,则返回 true,否则返回 false。- 参数
key (str) – 要从存储中删除的键
- 返回
如果
key
已删除,则为 True,否则为 False。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, HashStore can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set("first_key") >>> # This should return true >>> store.delete_key("first_key") >>> # This should return false >>> store.delete_key("bad_key")
- torch.distributed.Store.set_timeout(self: torch._C._distributed_c10d.Store, arg0: datetime.timedelta) None ¶
设置存储的默认超时时间。此超时时间在初始化以及在
wait()
和get()
中使用。- 参数
timeout (timedelta) – 要在存储中设置的超时时间。
- 示例:
>>> import torch.distributed as dist >>> from datetime import timedelta >>> # Using TCPStore as an example, other store types can also be used >>> store = dist.TCPStore("127.0.0.1", 0, 1, True, timedelta(seconds=30)) >>> store.set_timeout(timedelta(seconds=10)) >>> # This will throw an exception after 10 seconds >>> store.wait(["bad_key"])
组¶
默认情况下,集体操作在默认组(也称为世界)上运行,并要求所有进程进入分布式函数调用。但是,一些工作负载可以从更细粒度的通信中获益。这就是分布式组发挥作用的地方。 new_group()
函数可用于创建新组,其中包含所有进程的任意子集。它返回一个不透明的组句柄,可以作为 group
参数传递给所有集体操作(集体操作是分布式函数,用于以某些众所周知的编程模式交换信息)。
- torch.distributed.new_group(ranks=None, timeout=None, backend=None, pg_options=None, use_local_synchronization=False, group_desc=None)[source]¶
创建一个新的分布式组。
此函数要求主组中的所有进程(即分布式作业中所有进程)进入此函数,即使它们不打算成为该组的成员。此外,应按相同的顺序在所有进程中创建组。
警告
在
NCCL
后端中同时使用多个进程组是不安全的,用户应在应用程序中执行显式同步以确保一次只使用一个进程组。这意味着来自一个进程组的集体操作应在设备上完成执行(不仅仅是排队,因为 CUDA 执行是异步的),然后再排队来自另一个进程组的集体操作。有关更多详细信息,请参阅 同时使用多个 NCCL 通信器。- 参数
timeout (timedelta, optional) – 有关详细信息和默认值,请参阅 init_process_group。
backend (str or Backend, optional) – 要使用的后端。根据构建时配置,有效值为
gloo
和nccl
。默认情况下使用与全局组相同的后端。此字段应以小写字符串形式给出(例如,"gloo"
),也可以通过Backend
属性访问(例如,Backend.GLOO
)。如果传递None
,将使用与默认进程组相对应的后端。默认值为None
。pg_options (ProcessGroupOptions, optional) – 进程组选项,指定在构建特定进程组期间需要传入哪些附加选项。例如,对于
nccl
后端,可以指定is_high_priority_stream
,以便进程组可以选择高优先级 cuda 流。use_local_synchronization (bool, optional) – 在进程组创建结束时执行组本地屏障。这不同之处在于非成员排名不需要调用 API 也不加入屏障。
group_desc (str, optional) – 用于描述进程组的字符串。
- 返回
一个分布式组的句柄,可以传递给集体调用或 GroupMember.NON_GROUP_MEMBER,如果排名不是
ranks
的一部分。
注意:use_local_synchronization 不适用于 MPI。
注意:虽然 use_local_synchronization=True 在更大的集群和较小的进程组中可能快得多,但必须注意,因为它会改变集群行为,因为非成员排名不会加入组屏障()。
注意:use_local_synchronization=True 在每个排名创建多个重叠的进程组时会导致死锁。为了避免这种情况,请确保所有排名都遵循相同的全局创建顺序。
- torch.distributed.get_group_rank(group, global_rank)[source]¶
将全局排名转换为组排名。
global_rank
必须是group
的一部分,否则会引发 RuntimeError。- 参数
group (ProcessGroup) – 要查找相对排名的 ProcessGroup。
global_rank (int) – 要查询的全局排名。
- 返回
相对于
group
的global_rank
的组排名- 返回类型
注意:在默认进程组上调用此函数会返回身份
DeviceMesh¶
DeviceMesh 是一个更高层次的抽象概念,用于管理进程组(或 NCCL 通信器)。它允许用户轻松创建节点间和节点内进程组,而无需担心如何为不同的子进程组正确设置排名,并且它有助于轻松管理这些分布式进程组。 init_device_mesh()
函数可用于创建新的 DeviceMesh,其中网格形状描述了设备拓扑结构。
- class torch.distributed.device_mesh.DeviceMesh(device_type, mesh, *, mesh_dim_names=None, _init_backend=True)[source]¶
DeviceMesh 表示设备网格,其中设备布局可以表示为 n 维数组,n 维数组的每个值都是默认进程组排名的全局 ID。
DeviceMesh 可用于描述集群中设备的布局,并充当集群内设备列表之间通信的代理。
DeviceMesh 可用作上下文管理器。
注意
DeviceMesh 遵循 SPMD 编程模型,这意味着相同的 PyTorch Python 程序在集群中的所有进程/排名上运行。因此,用户需要确保 mesh 数组(它描述了设备的布局)在所有排名上都应该相同。不一致的 mesh 会导致静默挂起。
- 参数
device_type (str) – 网格的设备类型。当前支持: “cpu”,“cuda/cuda-like”。
mesh (ndarray) – 描述设备布局的多维数组或整数张量,其中 ID 是默认进程组的全局 ID。
- 返回
一个
DeviceMesh
对象,表示设备布局。- 返回类型
以下程序以 SPMD 方式在每个进程/排名上运行。在此示例中,我们有 2 台主机,每台主机有 4 个 GPU。对 mesh 的第一维进行约简将跨列(0, 4),.. 和(3, 7)进行约简,对 mesh 的第二维进行约简将在行(0, 1, 2, 3)和(4, 5, 6, 7)上进行约简。
- 示例:
>>> from torch.distributed.device_mesh import DeviceMesh >>> >>> # Initialize device mesh as (2, 4) to represent the topology >>> # of cross-host(dim 0), and within-host (dim 1). >>> mesh = DeviceMesh(device_type="cuda", mesh=[[0, 1, 2, 3],[4, 5, 6, 7]])
点对点通信¶
isend()
和 irecv()
在使用时返回分布式请求对象。一般来说,此对象的类型是未指定的,因为它们永远不应该手动创建,但它们保证支持两种方法
is_completed()
- 如果操作已完成,则返回 Truewait()
- 将阻塞进程,直到操作完成。is_completed()
确保在它返回后返回 True。
- torch.distributed.isend(tensor, dst, group=None, tag=0)[source]¶
异步发送张量。
警告
在请求完成之前修改
tensor
会导致未定义的行为。警告
tag
不支持 NCCL 后端。
- torch.distributed.send_object_list(object_list, dst, group=None, device=None)[source]¶
同步发送
object_list
中的可拾取对象。类似于
send()
,但可以传入 Python 对象。请注意,object_list
中的所有对象都必须可拾取才能发送。- 参数
object_list (List[Any]) – 要发送的输入对象列表。每个对象都必须可拾取。接收器必须提供大小相同的列表。
dst (int) – 要发送
object_list
的目标排名。目标排名基于全局进程组(与group
参数无关)group – (ProcessGroup, 可选): 要处理的进程组。如果为 None,将使用默认进程组。默认为
None
。device (
torch.device
, 可选) – 如果不为 None,则对象将被序列化并转换为张量,并在发送之前移动到device
。默认为None
。
- 返回
None
.
注意
对于基于 NCCL 的进程组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由
torch.cuda.current_device()
给出,用户有责任确保通过torch.cuda.set_device()
设置此设备,以便每个等级都有一个独立的 GPU。警告
send_object_list()
隐式地使用pickle
模块,该模块众所周知是不安全的。可以构造恶意 pickle 数据,这些数据会在解压缩期间执行任意代码。只能对您信任的数据调用此函数。警告
使用 GPU 张量调用
send_object_list()
得不到很好的支持,而且效率低下,因为它会造成 GPU -> CPU 传输,因为张量会被序列化。请考虑改用send()
。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}]
- torch.distributed.recv_object_list(object_list, src=None, group=None, device=None)[source]¶
同步接收
object_list
中的可序列化对象。类似于
recv()
,但可以接收 Python 对象。- 参数
object_list (List[Any]) – 要接收到的对象的列表。必须提供一个大小等于要发送的列表大小的列表。
src (int, 可选) – 要接收
object_list
的源等级。源等级基于全局进程组(与group
参数无关)。如果设置为 None,则将从任何等级接收。默认为None
。group – (ProcessGroup, 可选): 要处理的进程组。如果为 None,将使用默认进程组。默认为
None
。device (
torch.device
, 可选) – 如果不为 None,则在此设备上接收。默认为None
。
- 返回
发送方等级。如果等级不是组的一部分,则为 -1。如果等级是组的一部分,则
object_list
将包含从src
等级发送的对象。
注意
对于基于 NCCL 的进程组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由
torch.cuda.current_device()
给出,用户有责任确保通过torch.cuda.set_device()
设置此设备,以便每个等级都有一个独立的 GPU。警告
recv_object_list()
隐式地使用pickle
模块,该模块众所周知是不安全的。可以构造恶意 pickle 数据,这些数据会在解压缩期间执行任意代码。只能对您信任的数据调用此函数。警告
使用 GPU 张量调用
recv_object_list()
得不到很好的支持,而且效率低下,因为它会造成 GPU -> CPU 传输,因为张量会被序列化。请考虑改用recv()
。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> dist.send_object_list(objects, dst=1, device=device) >>> else: >>> objects = [None, None, None] >>> dist.recv_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}]
- torch.distributed.batch_isend_irecv(p2p_op_list)[source]¶
异步发送或接收一批张量并返回请求列表。
处理
p2p_op_list
中的每个操作并返回相应的请求。目前支持 NCCL、Gloo 和 UCC 后端。- 参数
p2p_op_list – 点对点操作列表(每个运算符的类型是
torch.distributed.P2POp
)。列表中 isend/irecv 的顺序很重要,它需要与远程端对应的 isend/irecv 匹配。- 返回
通过在 op_list 中调用相应操作返回的一系列分布式请求对象。
示例
>>> send_tensor = torch.arange(2, dtype=torch.float32) + 2 * rank >>> recv_tensor = torch.randn(2, dtype=torch.float32) >>> send_op = dist.P2POp(dist.isend, send_tensor, (rank + 1)%world_size) >>> recv_op = dist.P2POp(dist.irecv, recv_tensor, (rank - 1 + world_size)%world_size) >>> reqs = batch_isend_irecv([send_op, recv_op]) >>> for req in reqs: >>> req.wait() >>> recv_tensor tensor([2, 3]) # Rank 0 tensor([0, 1]) # Rank 1
注意
请注意,当此 API 与 NCCL PG 后端一起使用时,用户必须使用 torch.cuda.set_device 设置当前 GPU 设备,否则会导致意外挂起问题。
此外,如果此 API 是传递给
dist.P2POp
的group
中的第一个集体调用,则group
的所有等级都必须参与此 API 调用;否则,行为未定义。如果此 API 调用不是group
中的第一个集体调用,则允许仅涉及group
的部分等级的批处理 P2P 操作。
同步和异步集体操作¶
每个集体操作函数都支持以下两种操作,具体取决于传递到集合的 async_op
标志的设置。
同步操作 - 默认模式,当 async_op
设置为 False
时。当函数返回时,保证执行了集体操作。在 CUDA 操作的情况下,不保证完成 CUDA 操作,因为 CUDA 操作是异步的。对于 CPU 集体操作,任何使用集体调用输出的后续函数调用都将按预期执行。对于 CUDA 集体操作,在同一个 CUDA 流中使用输出的函数调用将按预期执行。用户必须注意在不同流下运行的同步问题。有关 CUDA 语义(如流同步)的详细信息,请参见 CUDA 语义。请参阅下面的脚本,以了解 CPU 和 CUDA 操作在这些语义方面差异的示例。
异步操作 - 当 async_op
设置为 True 时。集体操作函数返回一个分布式请求对象。通常,您无需手动创建它,并且保证它支持两种方法。
is_completed()
- 在 CPU 集体操作的情况下,如果已完成,则返回True
。在 CUDA 操作的情况下,如果操作已成功排队到 CUDA 流中,并且输出可以在默认流上使用而无需进一步同步,则返回True
。wait()
- 在 CPU 集体操作的情况下,将阻塞进程,直到操作完成。在 CUDA 集体操作的情况下,将阻塞,直到操作已成功排队到 CUDA 流中,并且输出可以在默认流上使用而无需进一步同步。get_future()
- 返回torch._C.Future
对象。支持 NCCL,也支持 GLOO 和 MPI 上的大多数操作,但对等操作除外。注意:随着我们继续采用 Futures 并合并 API,get_future()
调用可能会变得多余。
示例
以下代码可以作为使用分布式集体操作时 CUDA 操作语义的参考。它显示了在不同 CUDA 流上使用集体输出时显式同步的必要性。
# Code runs on each rank.
dist.init_process_group("nccl", rank=rank, world_size=2)
output = torch.tensor([rank]).cuda(rank)
s = torch.cuda.Stream()
handle = dist.all_reduce(output, async_op=True)
# Wait ensures the operation is enqueued, but not necessarily complete.
handle.wait()
# Using result on non-default stream.
with torch.cuda.stream(s):
s.wait_stream(torch.cuda.default_stream())
output.add_(100)
if rank == 0:
# if the explicit call to wait_stream was omitted, the output below will be
# non-deterministically 1 or 101, depending on whether the allreduce overwrote
# the value after the add completed.
print(output)
集体函数¶
- torch.distributed.broadcast(tensor, src, group=None, async_op=False)[source]¶
将张量广播到整个组。
tensor
在所有参与集体操作的进程中必须具有相同数量的元素。
- torch.distributed.broadcast_object_list(object_list, src=0, group=None, device=None)[source]¶
将
object_list
中的可腌制对象广播到整个组。类似于
broadcast()
,但可以传入 Python 对象。 请注意,为了进行广播,object_list
中的所有对象都必须是可腌制的。- 参数
object_list (List[Any]) – 要广播的输入对象列表。 每个对象都必须是可腌制的。 只有
src
秩上的对象才会被广播,但每个秩都必须提供大小相等的列表。src (int) – 要广播
object_list
的源秩。 源秩基于全局进程组(无论group
参数如何)。group – (ProcessGroup, 可选): 要处理的进程组。如果为 None,将使用默认进程组。默认为
None
。device (
torch.device
, optional) – 如果不是 None,则对象将被序列化并转换为张量,这些张量将在广播之前移动到device
。 默认值为None
。
- 返回
None
。 如果秩是组的一部分,则object_list
将包含来自src
秩的广播对象。
注意
对于基于 NCCL 的进程组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由
torch.cuda.current_device()
给出,用户有责任确保通过torch.cuda.set_device()
设置此设备,以便每个等级都有一个独立的 GPU。注意
请注意,此 API 与
broadcast()
集体操作略有不同,因为它不提供async_op
句柄,因此将是阻塞调用。警告
broadcast_object_list()
隐式地使用pickle
模块,该模块是已知的不安全的。 可以构建恶意 pickle 数据,这些数据将在取消腌制期间执行任意代码。 仅对您信任的数据调用此函数。警告
使用 GPU 张量调用
broadcast_object_list()
并不被很好地支持并且效率低下,因为它会引起 GPU -> CPU 传输,因为张量将被腌制。 请考虑使用broadcast()
代替。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # Assumes world_size of 3. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> else: >>> objects = [None, None, None] >>> # Assumes backend is not NCCL >>> device = torch.device("cpu") >>> dist.broadcast_object_list(objects, src=0, device=device) >>> objects ['foo', 12, {1: 2}]
- torch.distributed.all_reduce(tensor, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source]¶
以所有进程都获得最终结果的方式,跨所有机器减少张量数据。
在调用之后,
tensor
将在所有进程中按位相同。支持复数张量。
- 参数
- 返回
异步工作句柄,如果 async_op 设置为 True。 如果不是 async_op 或不是组的一部分,则为 None。
示例
>>> # All tensors below are of torch.int64 type. >>> # We have 2 process groups, 2 ranks. >>> device = torch.device(f'cuda:{rank}') >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4, 6], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1
>>> # All tensors below are of torch.cfloat type. >>> # We have 2 process groups, 2 ranks. >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat, device=device) + 2 * rank * (1+1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1 >>> dist.all_reduce(tensor, op=ReduceOp.SUM) >>> tensor tensor([4.+4.j, 6.+6.j], device='cuda:0') # Rank 0 tensor([4.+4.j, 6.+6.j], device='cuda:1') # Rank 1
- torch.distributed.reduce(tensor, dst, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source]¶
跨所有机器减少张量数据。
只有秩为
dst
的进程会收到最终结果。- 参数
- 返回
异步工作句柄,如果 async_op 设置为 True。 如果不是 async_op 或不是组的一部分,则为 None。
- torch.distributed.all_gather(tensor_list, tensor, group=None, async_op=False)[source]¶
将来自整个组的张量收集到一个列表中。
支持复数张量。
- 参数
- 返回
异步工作句柄,如果 async_op 设置为 True。 如果不是 async_op 或不是组的一部分,则为 None。
示例
>>> # All tensors below are of torch.int64 dtype. >>> # We have 2 process groups, 2 ranks. >>> device = torch.device(f'cuda:{rank}') >>> tensor_list = [torch.zeros(2, dtype=torch.int64, device=device) for _ in range(2)] >>> tensor_list [tensor([0, 0], device='cuda:0'), tensor([0, 0], device='cuda:0')] # Rank 0 [tensor([0, 0], device='cuda:0'), tensor([0, 0], device='cuda:1')] # Rank 1 >>> tensor = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1, 2], device='cuda:0'), tensor([3, 4], device='cuda:0')] # Rank 0 [tensor([1, 2], device='cuda:1'), tensor([3, 4], device='cuda:1')] # Rank 1
>>> # All tensors below are of torch.cfloat dtype. >>> # We have 2 process groups, 2 ranks. >>> tensor_list = [torch.zeros(2, dtype=torch.cfloat, device=device) for _ in range(2)] >>> tensor_list [tensor([0.+0.j, 0.+0.j], device='cuda:0'), tensor([0.+0.j, 0.+0.j], device='cuda:0')] # Rank 0 [tensor([0.+0.j, 0.+0.j], device='cuda:1'), tensor([0.+0.j, 0.+0.j], device='cuda:1')] # Rank 1 >>> tensor = torch.tensor([1+1j, 2+2j], dtype=torch.cfloat, device=device) + 2 * rank * (1+1j) >>> tensor tensor([1.+1.j, 2.+2.j], device='cuda:0') # Rank 0 tensor([3.+3.j, 4.+4.j], device='cuda:1') # Rank 1 >>> dist.all_gather(tensor_list, tensor) >>> tensor_list [tensor([1.+1.j, 2.+2.j], device='cuda:0'), tensor([3.+3.j, 4.+4.j], device='cuda:0')] # Rank 0 [tensor([1.+1.j, 2.+2.j], device='cuda:1'), tensor([3.+3.j, 4.+4.j], device='cuda:1')] # Rank 1
- torch.distributed.all_gather_into_tensor(output_tensor, input_tensor, group=None, async_op=False)[source]¶
从所有秩收集张量,并将它们放到一个输出张量中。
- 参数
output_tensor (Tensor) – 用于容纳来自所有秩的张量元素的输出张量。 它必须被正确地调整大小以具有以下形式之一:(i)所有输入张量沿着主维度的串联; 有关“串联”的定义,请参见
torch.cat()
; (ii) 所有输入张量沿着主维度的堆叠; 有关“堆叠”的定义,请参见torch.stack()
。 下面的示例可能更好地解释了支持的输出形式。input_tensor (Tensor) – 要从当前秩收集的张量。 与
all_gather
API 不同,此 API 中的输入张量在所有秩上必须具有相同的大小。group (ProcessGroup, optional) – 要操作的进程组。如果为 None,将使用默认进程组。
async_op (bool, optional) – 此操作是否应为异步操作。
- 返回
异步工作句柄,如果 async_op 设置为 True。 如果不是 async_op 或不是组的一部分,则为 None。
示例
>>> # All tensors below are of torch.int64 dtype and on CUDA devices. >>> # We have two ranks. >>> device = torch.device(f'cuda:{rank}') >>> tensor_in = torch.arange(2, dtype=torch.int64, device=device) + 1 + 2 * rank >>> tensor_in tensor([1, 2], device='cuda:0') # Rank 0 tensor([3, 4], device='cuda:1') # Rank 1 >>> # Output in concatenation form >>> tensor_out = torch.zeros(world_size * 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out, tensor_in) >>> tensor_out tensor([1, 2, 3, 4], device='cuda:0') # Rank 0 tensor([1, 2, 3, 4], device='cuda:1') # Rank 1 >>> # Output in stack form >>> tensor_out2 = torch.zeros(world_size, 2, dtype=torch.int64, device=device) >>> dist.all_gather_into_tensor(tensor_out2, tensor_in) >>> tensor_out2 tensor([[1, 2], [3, 4]], device='cuda:0') # Rank 0 tensor([[1, 2], [3, 4]], device='cuda:1') # Rank 1
警告
Gloo 后端不支持此 API。
- torch.distributed.all_gather_object(object_list, obj, group=None)[source]¶
将来自整个组的可腌制对象收集到一个列表中。
类似于
all_gather()
,但可以传入 Python 对象。 请注意,为了进行收集,对象必须是可腌制的。- 参数
object_list (list[Any]) – 输出列表。 它应该被正确地调整大小为该集体操作的组的大小,并将包含输出。
obj (Any) – 要从当前进程广播的可腌制 Python 对象。
group (ProcessGroup, optional) – 要操作的进程组。 如果为 None,则将使用默认进程组。 默认值为
None
。
- 返回
无。如果调用排名属于该组,则集合的输出将填充到输入
object_list
中。如果调用排名不属于该组,则传入的object_list
将保持不变。
注意
请注意,此 API 与
all_gather()
集合略有不同,因为它不提供async_op
处理程序,因此将是一个阻塞调用。注意
对于基于 NCCL 的处理组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由
torch.cuda.current_device()
给出,用户有责任确保通过torch.cuda.set_device()
设置此设备,以便每个排名都有一个独立的 GPU。警告
all_gather_object()
隐式地使用pickle
模块,该模块已知是不安全的。可以构造恶意 pickle 数据,这些数据将在取消序列化期间执行任意代码。仅对您信任的数据调用此函数。警告
使用 GPU 张量调用
all_gather_object()
支持不佳且效率低下,因为它会导致 GPU -> CPU 传输,因为张量将被序列化。请考虑使用all_gather()
代替。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes world_size of 3. >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object >>> output = [None for _ in gather_objects] >>> dist.all_gather_object(output, gather_objects[dist.get_rank()]) >>> output ['foo', 12, {1: 2}]
- torch.distributed.gather(tensor, gather_list=None, dst=0, group=None, async_op=False)[source]¶
在一个进程中收集一系列张量。
- 参数
- 返回
异步工作句柄,如果 async_op 设置为 True。 如果不是 async_op 或不是组的一部分,则为 None。
- torch.distributed.gather_object(obj, object_gather_list=None, dst=0, group=None)[source]¶
在一个进程中从整个组收集可序列化的对象。
类似于
gather()
,但可以传入 Python 对象。请注意,该对象必须是可序列化的,才能被收集。- 参数
- 返回
无。在
dst
排名上,object_gather_list
将包含该集合的输出。
注意
请注意,此 API 与 gather 集合略有不同,因为它不提供 async_op 处理程序,因此将是一个阻塞调用。
注意
对于基于 NCCL 的处理组,对象的内部张量表示必须在通信发生之前移动到 GPU 设备。在这种情况下,使用的设备由
torch.cuda.current_device()
给出,用户有责任确保通过torch.cuda.set_device()
设置此设备,以便每个排名都有一个独立的 GPU。警告
gather_object()
隐式地使用pickle
模块,该模块已知是不安全的。可以构造恶意 pickle 数据,这些数据将在取消序列化期间执行任意代码。仅对您信任的数据调用此函数。警告
使用 GPU 张量调用
gather_object()
支持不佳且效率低下,因为它会导致 GPU -> CPU 传输,因为张量将被序列化。请考虑使用gather()
代替。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> # Assumes world_size of 3. >>> gather_objects = ["foo", 12, {1: 2}] # any picklable object >>> output = [None for _ in gather_objects] >>> dist.gather_object( ... gather_objects[dist.get_rank()], ... output if dist.get_rank() == 0 else None, ... dst=0 ... ) >>> # On rank 0 >>> output ['foo', 12, {1: 2}]
- torch.distributed.scatter(tensor, scatter_list=None, src=0, group=None, async_op=False)[source]¶
将一系列张量散布到组中的所有进程。
每个进程将恰好接收一个张量,并将它的数据存储在
tensor
参数中。支持复数张量。
- 参数
- 返回
异步工作句柄,如果 async_op 设置为 True。 如果不是 async_op 或不是组的一部分,则为 None。
注意
请注意,scatter_list 中的所有张量必须具有相同的大小。
- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> tensor_size = 2 >>> t_ones = torch.ones(tensor_size) >>> t_fives = torch.ones(tensor_size) * 5 >>> output_tensor = torch.zeros(tensor_size) >>> if dist.get_rank() == 0: >>> # Assumes world_size of 2. >>> # Only tensors, all of which must be the same size. >>> scatter_list = [t_ones, t_fives] >>> else: >>> scatter_list = None >>> dist.scatter(output_tensor, scatter_list, src=0) >>> # Rank i gets scatter_list[i]. For example, on rank 1: >>> output_tensor tensor([5., 5.])
- torch.distributed.scatter_object_list(scatter_object_output_list, scatter_object_input_list, src=0, group=None)[source]¶
将
scatter_object_input_list
中的可序列化对象散布到整个组。类似于
scatter()
,但可以传入 Python 对象。在每个排名上,散布的对象将被存储为scatter_object_output_list
的第一个元素。请注意,scatter_object_input_list
中的所有对象必须是可序列化的,才能被散布。- 参数
scatter_object_output_list (List[Any]) – 非空列表,其第一个元素将存储散布到该排名的对象。
scatter_object_input_list (List[Any]) – 要散布的输入对象列表。每个对象必须是可序列化的。只有在
src
排名上的对象才会被散布,并且对于非 src 排名,该参数可以是None
。src (int) – 从中散布
scatter_object_input_list
的源排名。源排名基于全局进程组(无论group
参数)。group – (ProcessGroup, 可选): 要处理的进程组。如果为 None,将使用默认进程组。默认为
None
。
- 返回
None
。如果排名属于该组,则scatter_object_output_list
将将其第一个元素设置为该排名散布的对象。
注意
请注意,此 API 与 scatter 集合略有不同,因为它不提供
async_op
处理程序,因此将是一个阻塞调用。警告
scatter_object_list()
隐式地使用pickle
模块,该模块已知是不安全的。可以构造恶意 pickle 数据,这些数据将在取消序列化期间执行任意代码。仅对您信任的数据调用此函数。警告
使用 GPU 张量调用
scatter_object_list()
支持不佳且效率低下,因为它会导致 GPU -> CPU 传输,因为张量将被序列化。请考虑使用scatter()
代替。- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() == 0: >>> # Assumes world_size of 3. >>> objects = ["foo", 12, {1: 2}] # any picklable object >>> else: >>> # Can be any list on non-src ranks, elements are not used. >>> objects = [None, None, None] >>> output_list = [None] >>> dist.scatter_object_list(output_list, objects, src=0) >>> # Rank i gets objects[i]. For example, on rank 2: >>> output_list [{1: 2}]
- torch.distributed.reduce_scatter(output, input_list, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source]¶
在组中的所有进程之间减少,然后散布张量列表。
- torch.distributed.reduce_scatter_tensor(output, input, op=<RedOpType.SUM: 0>, group=None, async_op=False)[source]¶
在组中的所有进程之间减少,然后散布一个张量。
- 参数
- 返回
异步工作句柄,如果 async_op 设置为 True。如果未设置 async_op 或不属于组,则为 None。
示例
>>> # All tensors below are of torch.int64 dtype and on CUDA devices. >>> # We have two ranks. >>> device = torch.device(f'cuda:{rank}') >>> tensor_out = torch.zeros(2, dtype=torch.int64, device=device) >>> # Input in concatenation form >>> tensor_in = torch.arange(world_size * 2, dtype=torch.int64, device=device) >>> tensor_in tensor([0, 1, 2, 3], device='cuda:0') # Rank 0 tensor([0, 1, 2, 3], device='cuda:1') # Rank 1 >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) >>> tensor_out tensor([0, 2], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1 >>> # Input in stack form >>> tensor_in = torch.reshape(tensor_in, (world_size, 2)) >>> tensor_in tensor([[0, 1], [2, 3]], device='cuda:0') # Rank 0 tensor([[0, 1], [2, 3]], device='cuda:1') # Rank 1 >>> dist.reduce_scatter_tensor(tensor_out, tensor_in) >>> tensor_out tensor([0, 2], device='cuda:0') # Rank 0 tensor([4, 6], device='cuda:1') # Rank 1
警告
Gloo 后端不支持此 API。
- torch.distributed.all_to_all_single(output, input, output_split_sizes=None, input_split_sizes=None, group=None, async_op=False)[source]¶
拆分输入张量,然后将拆分的列表散布到组中的所有进程。
之后,接收到的张量从组中的所有进程连接起来,并作为单个输出张量返回。
支持复数张量。
- 参数
output (Tensor) – 收集到的连接的输出张量。
input (Tensor) – 要散布的输入张量。
output_split_sizes – (list[Int], optional): 如果指定 None 或空,则为 dim 0 的输出拆分尺寸;
output
张量的 dim 0 必须被world_size
均等地整除。input_split_sizes – (list[Int], optional): 如果指定 None 或空,则为 dim 0 的输入拆分尺寸;
input
张量的 dim 0 必须被world_size
均等地整除。group (ProcessGroup, optional) – 要操作的进程组。如果为 None,将使用默认进程组。
async_op (bool, optional) – 此操作是否应该为异步操作。
- 返回
异步工作句柄,如果 async_op 设置为 True。如果未设置 async_op 或不属于组,则为 None。
警告
all_to_all_single 处于实验阶段,可能会发生变化。
示例
>>> input = torch.arange(4) + rank * 4 >>> input tensor([0, 1, 2, 3]) # Rank 0 tensor([4, 5, 6, 7]) # Rank 1 tensor([8, 9, 10, 11]) # Rank 2 tensor([12, 13, 14, 15]) # Rank 3 >>> output = torch.empty([4], dtype=torch.int64) >>> dist.all_to_all_single(output, input) >>> output tensor([0, 4, 8, 12]) # Rank 0 tensor([1, 5, 9, 13]) # Rank 1 tensor([2, 6, 10, 14]) # Rank 2 tensor([3, 7, 11, 15]) # Rank 3
>>> # Essentially, it is similar to following operation: >>> scatter_list = list(input.chunk(world_size)) >>> gather_list = list(output.chunk(world_size)) >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src = i)
>>> # Another example with uneven split >>> input tensor([0, 1, 2, 3, 4, 5]) # Rank 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 tensor([20, 21, 22, 23, 24]) # Rank 2 tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 >>> input_splits [2, 2, 1, 1] # Rank 0 [3, 2, 2, 2] # Rank 1 [2, 1, 1, 1] # Rank 2 [2, 2, 2, 1] # Rank 3 >>> output_splits [2, 3, 2, 2] # Rank 0 [2, 2, 1, 2] # Rank 1 [1, 2, 1, 2] # Rank 2 [1, 2, 1, 1] # Rank 3 >>> output = ... >>> dist.all_to_all_single(output, input, output_splits, input_splits) >>> output tensor([ 0, 1, 10, 11, 12, 20, 21, 30, 31]) # Rank 0 tensor([ 2, 3, 13, 14, 22, 32, 33]) # Rank 1 tensor([ 4, 15, 16, 23, 34, 35]) # Rank 2 tensor([ 5, 17, 18, 24, 36]) # Rank 3
>>> # Another example with tensors of torch.cfloat type. >>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j) >>> input tensor([1+1j, 2+2j, 3+3j, 4+4j]) # Rank 0 tensor([5+5j, 6+6j, 7+7j, 8+8j]) # Rank 1 tensor([9+9j, 10+10j, 11+11j, 12+12j]) # Rank 2 tensor([13+13j, 14+14j, 15+15j, 16+16j]) # Rank 3 >>> output = torch.empty([4], dtype=torch.int64) >>> dist.all_to_all_single(output, input) >>> output tensor([1+1j, 5+5j, 9+9j, 13+13j]) # Rank 0 tensor([2+2j, 6+6j, 10+10j, 14+14j]) # Rank 1 tensor([3+3j, 7+7j, 11+11j, 15+15j]) # Rank 2 tensor([4+4j, 8+8j, 12+12j, 16+16j]) # Rank 3
- torch.distributed.all_to_all(output_tensor_list, input_tensor_list, group=None, async_op=False)[source]¶
将输入张量的列表散布到组中的所有进程,并在输出列表中返回收集到的张量列表。
支持复数张量。
- 参数
- 返回
异步工作句柄,如果 async_op 设置为 True。如果未设置 async_op 或不属于组,则为 None。
警告
all_to_all 处于实验阶段,可能会发生变化。
示例
>>> input = torch.arange(4) + rank * 4 >>> input = list(input.chunk(4)) >>> input [tensor([0]), tensor([1]), tensor([2]), tensor([3])] # Rank 0 [tensor([4]), tensor([5]), tensor([6]), tensor([7])] # Rank 1 [tensor([8]), tensor([9]), tensor([10]), tensor([11])] # Rank 2 [tensor([12]), tensor([13]), tensor([14]), tensor([15])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([0]), tensor([4]), tensor([8]), tensor([12])] # Rank 0 [tensor([1]), tensor([5]), tensor([9]), tensor([13])] # Rank 1 [tensor([2]), tensor([6]), tensor([10]), tensor([14])] # Rank 2 [tensor([3]), tensor([7]), tensor([11]), tensor([15])] # Rank 3
>>> # Essentially, it is similar to following operation: >>> scatter_list = input >>> gather_list = output >>> for i in range(world_size): >>> dist.scatter(gather_list[i], scatter_list if i == rank else [], src=i)
>>> input tensor([0, 1, 2, 3, 4, 5]) # Rank 0 tensor([10, 11, 12, 13, 14, 15, 16, 17, 18]) # Rank 1 tensor([20, 21, 22, 23, 24]) # Rank 2 tensor([30, 31, 32, 33, 34, 35, 36]) # Rank 3 >>> input_splits [2, 2, 1, 1] # Rank 0 [3, 2, 2, 2] # Rank 1 [2, 1, 1, 1] # Rank 2 [2, 2, 2, 1] # Rank 3 >>> output_splits [2, 3, 2, 2] # Rank 0 [2, 2, 1, 2] # Rank 1 [1, 2, 1, 2] # Rank 2 [1, 2, 1, 1] # Rank 3 >>> input = list(input.split(input_splits)) >>> input [tensor([0, 1]), tensor([2, 3]), tensor([4]), tensor([5])] # Rank 0 [tensor([10, 11, 12]), tensor([13, 14]), tensor([15, 16]), tensor([17, 18])] # Rank 1 [tensor([20, 21]), tensor([22]), tensor([23]), tensor([24])] # Rank 2 [tensor([30, 31]), tensor([32, 33]), tensor([34, 35]), tensor([36])] # Rank 3 >>> output = ... >>> dist.all_to_all(output, input) >>> output [tensor([0, 1]), tensor([10, 11, 12]), tensor([20, 21]), tensor([30, 31])] # Rank 0 [tensor([2, 3]), tensor([13, 14]), tensor([22]), tensor([32, 33])] # Rank 1 [tensor([4]), tensor([15, 16]), tensor([23]), tensor([34, 35])] # Rank 2 [tensor([5]), tensor([17, 18]), tensor([24]), tensor([36])] # Rank 3
>>> # Another example with tensors of torch.cfloat type. >>> input = torch.tensor([1+1j, 2+2j, 3+3j, 4+4j], dtype=torch.cfloat) + 4 * rank * (1+1j) >>> input = list(input.chunk(4)) >>> input [tensor([1+1j]), tensor([2+2j]), tensor([3+3j]), tensor([4+4j])] # Rank 0 [tensor([5+5j]), tensor([6+6j]), tensor([7+7j]), tensor([8+8j])] # Rank 1 [tensor([9+9j]), tensor([10+10j]), tensor([11+11j]), tensor([12+12j])] # Rank 2 [tensor([13+13j]), tensor([14+14j]), tensor([15+15j]), tensor([16+16j])] # Rank 3 >>> output = list(torch.empty([4], dtype=torch.int64).chunk(4)) >>> dist.all_to_all(output, input) >>> output [tensor([1+1j]), tensor([5+5j]), tensor([9+9j]), tensor([13+13j])] # Rank 0 [tensor([2+2j]), tensor([6+6j]), tensor([10+10j]), tensor([14+14j])] # Rank 1 [tensor([3+3j]), tensor([7+7j]), tensor([11+11j]), tensor([15+15j])] # Rank 2 [tensor([4+4j]), tensor([8+8j]), tensor([12+12j]), tensor([16+16j])] # Rank 3
- torch.distributed.barrier(group=None, async_op=False, device_ids=None)[source]¶
同步所有进程。
此集体操作会阻塞进程,直到整个组进入此函数,如果 async_op 为 False,或如果在 wait() 上调用了异步工作句柄。
- 参数
- 返回
异步工作句柄,如果 async_op 设置为 True。 如果不是 async_op 或不是组的一部分,则为 None。
注意
ProcessGroupNCCL 现在依赖于流同步而不是设备同步来阻塞 CPU。因此,请不要假设 barrier() 会执行设备同步。
- torch.distributed.monitored_barrier(group=None, timeout=None, wait_all_ranks=False)[source]¶
类似于
torch.distributed.barrier
同步进程,但考虑可配置的超时。它能够报告在提供的超时时间内没有通过此屏障的进程。具体来说,对于非零进程,将阻塞直到从进程 0 处理一个 send/recv。进程 0 将阻塞直到处理完来自其他进程的所有 send /recv,并将报告未能及时响应的进程的故障。请注意,如果一个进程没有到达 monitored_barrier(例如由于挂起),所有其他进程都会在 monitored_barrier 中失败。
此集体操作会阻塞组中的所有进程/进程,直到整个组成功退出该函数,使其在调试和同步时很有用。但是,它可能会对性能产生影响,并且只应在调试或需要主机端完全同步点的场景中使用。出于调试目的,此屏障可以插入应用程序的集体调用之前,以检查是否有任何进程不同步。
注意
请注意,此集体操作只支持 GLOO 后端。
- 参数
group (ProcessGroup, optional) – 要处理的进程组。如果为
None
,将使用默认进程组。timeout (datetime.timedelta, optional) – monitored_barrier 的超时。如果为
None
,将使用默认进程组超时。wait_all_ranks (bool, optional) – 是否收集所有失败的进程。默认情况下,它为
False
,并且进程 0 上的monitored_barrier
会在其遇到的第一个失败进程时抛出异常,以快速失败。通过设置wait_all_ranks=True
,monitored_barrier
将收集所有失败的进程,并抛出一个包含所有失败进程信息的错误。
- 返回
None
.
- 示例:
>>> # Note: Process group initialization omitted on each rank. >>> import torch.distributed as dist >>> if dist.get_rank() != 1: >>> dist.monitored_barrier() # Raises exception indicating that >>> # rank 1 did not call into monitored_barrier. >>> # Example with wait_all_ranks=True >>> if dist.get_rank() == 0: >>> dist.monitored_barrier(wait_all_ranks=True) # Raises exception >>> # indicating that ranks 1, 2, ... world_size - 1 did not call into >>> # monitored_barrier.
- class torch.distributed.Work¶
Work 对象表示 PyTorch 分布式包中挂起的异步操作的句柄。它由非阻塞集体操作返回,例如 dist.all_reduce(tensor, async_op=True)。
- class torch.distributed.ReduceOp¶
一个枚举类,用于表示可用的归约操作:
SUM
、PRODUCT
、MIN
、MAX
、BAND
、BOR
、BXOR
和PREMUL_SUM
。当使用
NCCL
后端时,BAND
、BOR
和BXOR
归约不可用。AVG
在对所有进程进行求和之前,会将值除以世界大小。AVG
仅适用于NCCL
后端,且仅适用于 NCCL 2.10 或更高版本。PREMUL_SUM
在进行归约之前,会先在本地将输入乘以给定的标量。PREMUL_SUM
仅适用于NCCL
后端,且仅适用于 NCCL 2.11 或更高版本。 用户应该使用torch.distributed._make_nccl_premul_sum
。此外,对于复数张量,不支持
MAX
、MIN
和PRODUCT
。可以通过属性访问此类的值,例如
ReduceOp.SUM
。 它们用于指定归约集合的策略,例如reduce()
。此类不支持
__members__
属性。
分析集合通信¶
请注意,您可以使用 torch.profiler
(推荐,仅在 1.8.1 之后可用)或 torch.autograd.profiler
分析此处提到的集合通信和点对点通信 API。 所有开箱即用的后端(gloo
、nccl
、mpi
)都受支持,集合通信使用情况将在分析输出/跟踪中按预期呈现。 分析代码与任何常规的火炬运算符相同。
import torch
import torch.distributed as dist
with torch.profiler():
tensor = torch.randn(20, 10)
dist.all_reduce(tensor)
有关分析器功能的完整概述,请参考 分析器文档。
多 GPU 集合函数¶
警告
多 GPU 函数(代表每个 CPU 线程的多个 GPU)已弃用。 截至目前,PyTorch Distributed 首选的编程模型是每个线程一个设备,如本文档中的 API 所示。 如果您是后端开发人员并希望支持每个线程的多个设备,请联系 PyTorch Distributed 的维护人员。
第三方后端¶
除了内置的 GLOO/MPI/NCCL 后端,PyTorch distributed 还通过运行时注册机制支持第三方后端。 有关如何通过 C++ 扩展开发第三方后端的参考资料,请参考 教程 - 自定义 C++ 和 CUDA 扩展 和 test/cpp_extensions/cpp_c10d_extension.cpp
。 第三方后端的性能由各自的实现决定。
新的后端派生自 c10d::ProcessGroup
,并通过 torch.distributed.Backend.register_backend()
在导入时注册后端名称和实例化接口。
当手动导入此后端并使用相应的后端名称调用 torch.distributed.init_process_group()
时,torch.distributed
包将在新的后端上运行。
警告
对第三方后端的支持尚处于实验阶段,可能会发生变化。
启动实用程序¶
torch.distributed 包还提供了一个启动实用程序,位于 torch.distributed.launch 中。 此辅助实用程序可用于为分布式训练启动每个节点的多个进程。
模块 torch.distributed.launch
。
torch.distributed.launch
是一个模块,它在每个训练节点上生成多个分布式训练进程。
警告
此模块将被弃用,取而代之的是 torchrun。
此实用程序可用于单节点分布式训练,其中将在每个节点上生成一个或多个进程。 该实用程序可用于 CPU 训练或 GPU 训练。 如果该实用程序用于 GPU 训练,则每个分布式进程将在单个 GPU 上运行。 这可以实现显著改进的单节点训练性能。 它还可用于多节点分布式训练,方法是在每个节点上生成多个进程,以实现同样显著改进的多节点分布式训练性能。 对于具有直接 GPU 支持的多个 Infiniband 接口的系统来说,这将特别有利,因为所有接口都可以用于聚合通信带宽。
在单节点分布式训练或多节点分布式训练的两种情况下,此实用程序将在每个节点上启动给定的进程数 (--nproc-per-node
)。 如果用于 GPU 训练,则此数字需要小于或等于当前系统上的 GPU 数量 (nproc_per_node
),并且每个进程将在单个 GPU 上运行,从GPU 0 到 GPU (nproc_per_node - 1)。
如何使用此模块
单节点多进程分布式训练
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3 and all other
arguments of your training script)
多节点多进程分布式训练:(例如两个节点)
节点 1:(IP: 192.168.1.1,并且有一个空闲端口:1234)
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node-rank=0 --master-addr="192.168.1.1"
--master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
and all other arguments of your training script)
节点 2
python -m torch.distributed.launch --nproc-per-node=NUM_GPUS_YOU_HAVE
--nnodes=2 --node-rank=1 --master-addr="192.168.1.1"
--master-port=1234 YOUR_TRAINING_SCRIPT.py (--arg1 --arg2 --arg3
and all other arguments of your training script)
要查找此模块提供的可选参数
python -m torch.distributed.launch --help
重要注意事项
1. 此实用程序和多进程分布式 (单节点或多节点) GPU 训练目前仅使用 NCCL 分布式后端才能实现最佳性能。 因此,NCCL 后端是 GPU 训练推荐使用的后端。
2. 在您的训练程序中,您必须解析命令行参数:--local-rank=LOCAL_PROCESS_RANK
,此参数将由此模块提供。 如果您的训练程序使用 GPU,您应该确保您的代码仅在 LOCAL_PROCESS_RANK 的 GPU 设备上运行。 这可以通过以下方式完成
解析 local_rank 参数
>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument("--local-rank", "--local_rank", type=int)
>>> args = parser.parse_args()
使用以下任一方法将您的设备设置为本地排名
>>> torch.cuda.set_device(args.local_rank) # before your code runs
或
>>> with torch.cuda.device(args.local_rank):
>>> # your code to run
>>> ...
在版本 2.0.0 中更改: 启动器将传递 --local-rank=<rank>
参数到您的脚本。 从 PyTorch 2.0.0 开始,带破折号的 --local-rank
比以前使用的带下划线的 --local_rank
优先级更高。
为了向后兼容,用户可能需要在他们的参数解析代码中处理这两种情况。 这意味着在参数解析器中包含 "--local-rank"
和 "--local_rank"
。 如果只提供 "--local_rank"
,启动器将触发错误:“error: unrecognized arguments: –local-rank=<rank>”。 对于仅支持 PyTorch 2.0.0+ 的训练代码,包含 "--local-rank"
应该就足够了。
3. 在您的训练程序中,您应该在开始时调用以下函数来启动分布式后端。 强烈建议使用 init_method=env://
。 其他初始化方法(例如 tcp://
)可能有效,但 env://
是此模块正式支持的初始化方法。
>>> torch.distributed.init_process_group(backend='YOUR BACKEND',
>>> init_method='env://')
4. 在您的训练程序中,您可以使用常规的分布式函数,也可以使用 torch.nn.parallel.DistributedDataParallel()
模块。 如果您的训练程序使用 GPU 进行训练,并且您希望使用 torch.nn.parallel.DistributedDataParallel()
模块,以下是配置方法。
>>> model = torch.nn.parallel.DistributedDataParallel(model,
>>> device_ids=[args.local_rank],
>>> output_device=args.local_rank)
请确保 device_ids
参数设置为您的代码将运行的唯一 GPU 设备 ID。 这通常是进程的本地排名。 换句话说,device_ids
需要是 [args.local_rank]
,而 output_device
需要是 args.local_rank
,以便使用此实用程序。
5. 另一种通过环境变量 LOCAL_RANK
将 local_rank
传递给子进程的方式。 当您使用 --use-env=True
启动脚本时,将启用此行为。 您必须调整上面的子进程示例,将 args.local_rank
替换为 os.environ['LOCAL_RANK']
;当您指定此标志时,启动器不会传递 --local-rank
。
警告
local_rank
不是全局唯一的:它只在机器上的每个进程中是唯一的。 因此,不要使用它来判断您是否应该,例如,写入网络文件系统。 请查看 https://github.com/pytorch/pytorch/issues/12042,了解如果您没有正确执行此操作,事情可能会出错的示例。
生成实用程序¶
多进程包 - torch.multiprocessing 包还在 torch.multiprocessing.spawn()
中提供了一个 spawn
函数。 此辅助函数可用于生成多个进程。 它通过传入要运行的函数来工作,并生成 N 个进程来运行它。 这也可用于多进程分布式训练。
有关如何使用它的参考资料,请参考 PyTorch 示例 - ImageNet 实现
请注意,此函数需要 Python 3.4 或更高版本。
调试 torch.distributed
应用¶
调试分布式应用程序可能很困难,因为跨秩会出现难以理解的挂起、崩溃或不一致的行为。 torch.distributed
提供了一套工具来帮助以自助方式调试训练应用程序。
Python 断点¶
在分布式环境中使用 Python 的调试器非常方便,但由于它不能开箱即用,所以很多人根本不使用它。PyTorch 为 pdb 提供了一个定制的包装器,简化了该过程。
torch.distributed.breakpoint 使此过程变得简单。在内部,它以两种方式定制了 pdb 的断点行为,但除此之外,它表现得像普通的 pdb 一样。 1. 仅在一个秩(由用户指定)上附加调试器。 2. 通过使用 torch.distributed.barrier() 来确保所有其他秩都停止,该屏障将在调试秩发出 continue 命令后释放。 3. 将子进程的 stdin 重定向到您的终端。
要使用它,只需在所有秩上发出 torch.distributed.breakpoint(rank),在每种情况下都使用相同的 rank 值。
监控屏障¶
从 v1.10 开始,torch.distributed.monitored_barrier()
作为 torch.distributed.barrier()
的替代品存在,当崩溃时,它会提供有关哪个秩可能出现故障的有用信息,即并非所有秩都可以在提供的超时时间内调用 torch.distributed.monitored_barrier()
。 torch.distributed.monitored_barrier()
使用 send
/recv
通信原语在类似于确认的进程中实现主机端屏障,允许秩 0 报告哪个秩在时间内未能确认屏障。例如,考虑以下函数,其中秩 1 未能调用 torch.distributed.monitored_barrier()
(在实践中,这可能是由于应用程序错误或先前集合中的挂起)。
import os
from datetime import timedelta
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
# monitored barrier requires gloo process group to perform host-side sync.
group_gloo = dist.new_group(backend="gloo")
if rank not in [1]:
dist.monitored_barrier(group=group_gloo, timeout=timedelta(seconds=2))
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
mp.spawn(worker, nprocs=2, args=())
在秩 0 上会生成以下错误消息,允许用户确定哪个秩可能出现故障并进一步调查。
RuntimeError: Rank 1 failed to pass monitoredBarrier in 2000 ms
Original exception:
[gloo/transport/tcp/pair.cc:598] Connection closed by peer [2401:db00:eef0:1100:3560:0:1c05:25d]:8594
TORCH_DISTRIBUTED_DEBUG
¶
使用 TORCH_CPP_LOG_LEVEL=INFO
,环境变量 TORCH_DISTRIBUTED_DEBUG
可用于触发其他有用的日志记录和集合同步检查,以确保所有秩适当地同步。 TORCH_DISTRIBUTED_DEBUG
可以设置为 OFF
(默认)、INFO
或 DETAIL
,具体取决于所需的调试级别。请注意,最详细的选项 DETAIL
可能会影响应用程序性能,因此应仅在调试问题时使用。
设置 TORCH_DISTRIBUTED_DEBUG=INFO
将在使用 torch.nn.parallel.DistributedDataParallel()
训练的模型初始化时产生额外的调试日志,而 TORCH_DISTRIBUTED_DEBUG=DETAIL
还会记录选定迭代次数的运行时性能统计信息。这些运行时统计信息包括数据,如正向时间、反向时间、梯度通信时间等。例如,给定以下应用程序。
import os
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
class TwoLinLayerNet(torch.nn.Module):
def __init__(self):
super().__init__()
self.a = torch.nn.Linear(10, 10, bias=False)
self.b = torch.nn.Linear(10, 1, bias=False)
def forward(self, x):
a = self.a(x)
b = self.b(x)
return (a, b)
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
torch.cuda.set_device(rank)
print("init model")
model = TwoLinLayerNet().cuda()
print("init ddp")
ddp_model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])
inp = torch.randn(10, 10).cuda()
print("train")
for _ in range(20):
output = ddp_model(inp)
loss = output[0] + output[1]
loss.sum().backward()
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
os.environ[
"TORCH_DISTRIBUTED_DEBUG"
] = "DETAIL" # set to DETAIL for runtime logging.
mp.spawn(worker, nprocs=2, args=())
在初始化时会呈现以下日志。
I0607 16:10:35.739390 515217 logger.cpp:173] [Rank 0]: DDP Initialized with:
broadcast_buffers: 1
bucket_cap_bytes: 26214400
find_unused_parameters: 0
gradient_as_bucket_view: 0
is_multi_device_module: 0
iteration: 0
num_parameter_tensors: 2
output_device: 0
rank: 0
total_parameter_size_bytes: 440
world_size: 2
backend_name: nccl
bucket_sizes: 440
cuda_visible_devices: N/A
device_ids: 0
dtypes: float
master_addr: localhost
master_port: 29501
module_name: TwoLinLayerNet
nccl_async_error_handling: N/A
nccl_blocking_wait: N/A
nccl_debug: WARN
nccl_ib_timeout: N/A
nccl_nthreads: N/A
nccl_socket_ifname: N/A
torch_distributed_debug: INFO
在运行时(当设置 TORCH_DISTRIBUTED_DEBUG=DETAIL
时)会呈现以下日志。
I0607 16:18:58.085681 544067 logger.cpp:344] [Rank 1 / 2] Training TwoLinLayerNet unused_parameter_size=0
Avg forward compute time: 40838608
Avg backward compute time: 5983335
Avg backward comm. time: 4326421
Avg backward comm/comp overlap time: 4207652
I0607 16:18:58.085693 544066 logger.cpp:344] [Rank 0 / 2] Training TwoLinLayerNet unused_parameter_size=0
Avg forward compute time: 42850427
Avg backward compute time: 3885553
Avg backward comm. time: 2357981
Avg backward comm/comp overlap time: 2234674
此外,TORCH_DISTRIBUTED_DEBUG=INFO
增强了 torch.nn.parallel.DistributedDataParallel()
中由于模型中未使用参数而导致的崩溃日志。目前,如果在正向传递中可能存在未使用参数,则必须在 torch.nn.parallel.DistributedDataParallel()
初始化中传递 find_unused_parameters=True
,并且从 v1.10 开始,所有模型输出都必须在损失计算中使用,因为 torch.nn.parallel.DistributedDataParallel()
不支持反向传递中的未使用参数。这些约束对于较大的模型来说尤其具有挑战性,因此在发生崩溃并出现错误时,torch.nn.parallel.DistributedDataParallel()
将记录所有未使用参数的全限定名。例如,在上面的应用程序中,如果我们将 loss
修改为改为计算为 loss = output[1]
,那么 TwoLinLayerNet.a
在反向传递中不会收到梯度,因此导致 DDP
失败。在发生崩溃时,用户会收到有关未使用参数的信息,这对于大型模型来说可能很难手动查找。
RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by passing
the keyword argument `find_unused_parameters=True` to `torch.nn.parallel.DistributedDataParallel`, and by
making sure all `forward` function outputs participate in calculating loss.
If you already have done the above, then the distributed data parallel module wasn't able to locate the output tensors in the return value of your module's `forward` function. Please include the loss function and the structure of the return va
lue of `forward` of your module when reporting this issue (e.g. list, dict, iterable).
Parameters which did not receive grad for rank 0: a.weight
Parameter indices which did not receive grad for rank 0: 0
设置 TORCH_DISTRIBUTED_DEBUG=DETAIL
将在用户直接或间接发出的每个集合调用(例如 DDP allreduce
)上触发额外的一致性和同步检查。这是通过创建一个包装器进程组来完成的,该包装器进程组包装了由 torch.distributed.init_process_group()
和 torch.distributed.new_group()
API 返回的所有进程组。因此,这些 API 将返回一个包装器进程组,该进程组的使用方式与常规进程组完全相同,但在将集合分派到底层进程组之前执行一致性检查。目前,这些检查包括 torch.distributed.monitored_barrier()
,它确保所有秩都完成其未完成的集合调用并报告卡住的秩。接下来,通过确保所有集合函数都匹配并使用一致的张量形状进行调用,检查集合本身的一致性。如果情况并非如此,则在应用程序崩溃时会提供详细的错误报告,而不是挂起或提供信息量不足的错误消息。例如,考虑以下函数,该函数对 torch.distributed.all_reduce()
的输入形状不匹配。
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def worker(rank):
dist.init_process_group("nccl", rank=rank, world_size=2)
torch.cuda.set_device(rank)
tensor = torch.randn(10 if rank == 0 else 20).cuda()
dist.all_reduce(tensor)
torch.cuda.synchronize(device=rank)
if __name__ == "__main__":
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29501"
os.environ["TORCH_CPP_LOG_LEVEL"]="INFO"
os.environ["TORCH_DISTRIBUTED_DEBUG"] = "DETAIL"
mp.spawn(worker, nprocs=2, args=())
使用 NCCL
后端,这样的应用程序很可能会导致挂起,这在非平凡场景中可能很难进行根本原因分析。如果用户启用 TORCH_DISTRIBUTED_DEBUG=DETAIL
并重新运行应用程序,以下错误消息将显示根本原因。
work = default_pg.allreduce([tensor], opts)
RuntimeError: Error when verifying shape tensors for collective ALLREDUCE on rank 0. This likely indicates that input shapes into the collective are mismatched across ranks. Got shapes: 10
[ torch.LongTensor{1} ]
注意
为了在运行时更精细地控制调试级别,还可以使用函数 torch.distributed.set_debug_level()
、torch.distributed.set_debug_level_from_env()
和 torch.distributed.get_debug_level()
。
此外,TORCH_DISTRIBUTED_DEBUG=DETAIL 可与 TORCH_SHOW_CPP_STACKTRACES=1 结合使用,在检测到集合不同步时记录整个调用栈。这些集合不同步检查适用于使用 c10d
集合调用(由使用 torch.distributed.init_process_group()
和 torch.distributed.new_group()
API 创建的进程组支持)的所有应用程序。
日志记录¶
除了通过 torch.distributed.monitored_barrier()
和 TORCH_DISTRIBUTED_DEBUG
提供的显式调试支持外,torch.distributed
的底层 C++ 库还会输出不同级别的日志消息。这些消息有助于理解分布式训练作业的执行状态并解决网络连接故障等问题。以下矩阵显示了如何通过 TORCH_CPP_LOG_LEVEL
和 TORCH_DISTRIBUTED_DEBUG
环境变量的组合来调整日志级别。
|
|
有效日志级别 |
---|---|---|
|
ignored |
Error |
|
ignored |
警告 |
|
ignored |
Info |
|
|
Debug |
|
|
Trace (a.k.a. All) |
分布式组件引发从 RuntimeError 派生的自定义异常类型
torch.distributed.DistError: 这是所有分布式异常的基类型。
torch.distributed.DistBackendError: 当发生特定于后端的错误时,会引发此异常。例如,如果使用 NCCL 后端,并且用户尝试使用 NCCL 库无法使用的 GPU。
torch.distributed.DistNetworkError: 当网络库遇到错误(例如:对等方重置连接)时,会引发此异常。
torch.distributed.DistStoreError: 当 Store 遇到错误(例如:TCPStore 超时)时,会引发此异常。
- class torch.distributed.DistError¶
当分布式库中发生错误时引发的异常
- class torch.distributed.DistBackendError¶
当分布式中发生后端错误时引发的异常
- class torch.distributed.DistNetworkError¶
在分布式中发生网络错误时引发的异常
- class torch.distributed.DistStoreError¶
在分布式存储中发生错误时引发的异常
如果您正在运行单节点训练,交互式地断点您的脚本可能很方便。我们提供了一种方便地在单个排名上断点的方法