使用 PyTorch 编写分布式应用¶
创建于:2017 年 10 月 06 日 | 最后更新:2025 年 1 月 13 日 | 最后验证:2024 年 11 月 05 日
作者:Séb Arnold
注意
在 github 中查看和编辑本教程。
先决条件
在本简短教程中,我们将介绍 PyTorch 的分布式包。我们将了解如何设置分布式环境,使用不同的通信策略,并介绍包的一些内部结构。
设置¶
PyTorch 中包含的分布式包(即 torch.distributed
)使研究人员和从业人员能够轻松地跨进程和机器集群并行化他们的计算。为此,它利用消息传递语义,允许每个进程将数据通信到任何其他进程。与多处理 (torch.multiprocessing
) 包相反,进程可以使用不同的通信后端,并且不限于在同一台机器上执行。
为了开始使用,我们需要能够同时运行多个进程。如果您有权访问计算集群,您应该咨询您的本地系统管理员或使用您喜欢的协调工具(例如,pdsh、clustershell 或 slurm)。在本教程中,我们将使用单台机器并使用以下模板生成多个进程。
"""run.py:"""
#!/usr/bin/env python
import os
import sys
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def run(rank, size):
""" Distributed function to be implemented later. """
pass
def init_process(rank, size, fn, backend='gloo'):
""" Initialize the distributed environment. """
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank, world_size=size)
fn(rank, size)
if __name__ == "__main__":
world_size = 2
processes = []
if "google.colab" in sys.modules:
print("Running in Google Colab")
mp.get_context("spawn")
else:
mp.set_start_method("spawn")
for rank in range(size):
p = mp.Process(target=init_process, args=(rank, world_size, run))
p.start()
processes.append(p)
for p in processes:
p.join()
上面的脚本生成两个进程,每个进程都将设置分布式环境,初始化进程组 (dist.init_process_group
),最后执行给定的 run
函数。
让我们看一下 init_process
函数。它确保每个进程都能够通过主进程进行协调,使用相同的 IP 地址和端口。请注意,我们使用了 gloo
后端,但还有其他后端可用。(参见 第 5.1 节)我们将在本教程的末尾介绍 dist.init_process_group
中发生的魔术,但它本质上允许进程通过共享它们的位置来相互通信。
点对点通信¶

发送和接收¶
从一个进程到另一个进程的数据传输称为点对点通信。这些是通过 send
和 recv
函数或它们的立即对应物 isend
和 irecv
实现的。
"""Blocking point-to-point communication."""
def run(rank, size):
tensor = torch.zeros(1)
if rank == 0:
tensor += 1
# Send the tensor to process 1
dist.send(tensor=tensor, dst=1)
else:
# Receive tensor from process 0
dist.recv(tensor=tensor, src=0)
print('Rank ', rank, ' has data ', tensor[0])
在上面的示例中,两个进程都从零张量开始,然后进程 0 递增张量并将其发送到进程 1,以便它们都以 1.0 结束。请注意,进程 1 需要分配内存以存储它将接收的数据。
另请注意,send/recv
是阻塞的:两个进程都阻塞,直到通信完成。另一方面,立即数是非阻塞的;脚本继续执行,并且这些方法返回一个 Work
对象,我们可以选择在其上 wait()
。
"""Non-blocking point-to-point communication."""
def run(rank, size):
tensor = torch.zeros(1)
req = None
if rank == 0:
tensor += 1
# Send the tensor to process 1
req = dist.isend(tensor=tensor, dst=1)
print('Rank 0 started sending')
else:
# Receive tensor from process 0
req = dist.irecv(tensor=tensor, src=0)
print('Rank 1 started receiving')
req.wait()
print('Rank ', rank, ' has data ', tensor[0])
当使用立即数时,我们必须小心如何使用发送和接收的张量。由于我们不知道何时将数据通信到另一个进程,因此在 req.wait()
完成之前,我们不应修改发送的张量,也不应访问接收的张量。换句话说,
在
dist.isend()
之后写入tensor
将导致未定义的行为。在
dist.irecv()
之后从tensor
读取将导致未定义的行为,直到执行req.wait()
。
但是,在执行 req.wait()
之后,我们保证通信已经发生,并且存储在 tensor[0]
中的值是 1.0。
当我们想要更精细地控制进程的通信时,点对点通信非常有用。它们可以用来实现花哨的算法,例如 百度的 DeepSpeech 或 Facebook 的大规模实验 中使用的算法。(参见 第 4.1 节)
集体通信¶
![]() Scatter¶ |
![]() Gather¶ |
![]() Reduce¶ |
![]() All-Reduce¶ |
![]() Broadcast¶ |
![]() All-Gather¶ |
与点对点通信相反,集体通信允许跨组中所有进程的通信模式。组是我们所有进程的子集。要创建组,我们可以将秩列表传递给 dist.new_group(group)
。默认情况下,集体通信在所有进程上执行,也称为世界。例如,为了获得所有进程上所有张量的总和,我们可以使用 dist.all_reduce(tensor, op, group)
集体通信。
""" All-Reduce example."""
def run(rank, size):
""" Simple collective communication. """
group = dist.new_group([0, 1])
tensor = torch.ones(1)
dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group)
print('Rank ', rank, ' has data ', tensor[0])
由于我们想要组中所有张量的总和,因此我们使用 dist.ReduceOp.SUM
作为归约运算符。一般来说,任何可交换的数学运算都可以用作运算符。开箱即用,PyTorch 提供了许多这样的运算符,所有运算符都在元素级工作
dist.ReduceOp.SUM
,dist.ReduceOp.PRODUCT
,dist.ReduceOp.MAX
,dist.ReduceOp.MIN
,dist.ReduceOp.BAND
,dist.ReduceOp.BOR
,dist.ReduceOp.BXOR
,dist.ReduceOp.PREMUL_SUM
.
受支持的运算符的完整列表 在此。
除了 dist.all_reduce(tensor, op, group)
之外,PyTorch 中当前还实现了许多额外的集体通信。以下是一些受支持的集体通信。
dist.broadcast(tensor, src, group)
:将tensor
从src
复制到所有其他进程。dist.reduce(tensor, dst, op, group)
:将op
应用于每个tensor
,并将结果存储在dst
中。dist.all_reduce(tensor, op, group)
:与 reduce 相同,但结果存储在所有进程中。dist.scatter(tensor, scatter_list, src, group)
:将第 \(i^{\text{th}}\) 个张量scatter_list[i]
复制到第 \(i^{\text{th}}\) 个进程。dist.gather(tensor, gather_list, dst, group)
:从dst
中的所有进程复制tensor
。dist.all_gather(tensor_list, tensor, group)
:将所有进程中的tensor
复制到所有进程上的tensor_list
。dist.barrier(group)
:阻止 group 中的所有进程,直到每个进程都进入此函数。dist.all_to_all(output_tensor_list, input_tensor_list, group)
:将输入张量列表分散到组中的所有进程,并在输出列表中返回收集的张量列表。
受支持的集体通信的完整列表可以通过查看 PyTorch Distributed 的最新文档 (链接) 找到。
分布式训练¶
注意: 您可以在 此 GitHub 存储库 中找到本节的示例脚本。
现在我们了解了分布式模块的工作原理,让我们用它编写一些有用的东西。我们的目标是复制 DistributedDataParallel 的功能。当然,这将是一个教学示例,在实际情况下,您应该使用上面链接的官方、经过良好测试和优化的版本。
很简单,我们想要实现随机梯度下降的分布式版本。我们的脚本将让所有进程计算其模型在其数据批次上的梯度,然后平均它们的梯度。为了确保在更改进程数时获得相似的收敛结果,我们将首先必须对我们的数据集进行分区。(您也可以使用 torch.utils.data.random_split,而不是下面的代码片段。)
""" Dataset partitioning helper """
class Partition(object):
def __init__(self, data, index):
self.data = data
self.index = index
def __len__(self):
return len(self.index)
def __getitem__(self, index):
data_idx = self.index[index]
return self.data[data_idx]
class DataPartitioner(object):
def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
self.data = data
self.partitions = []
rng = Random() # from random import Random
rng.seed(seed)
data_len = len(data)
indexes = [x for x in range(0, data_len)]
rng.shuffle(indexes)
for frac in sizes:
part_len = int(frac * data_len)
self.partitions.append(indexes[0:part_len])
indexes = indexes[part_len:]
def use(self, partition):
return Partition(self.data, self.partitions[partition])
使用上面的代码片段,我们现在可以使用以下几行简单地对任何数据集进行分区
""" Partitioning MNIST """
def partition_dataset():
dataset = datasets.MNIST('./data', train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
size = dist.get_world_size()
bsz = 128 // size
partition_sizes = [1.0 / size for _ in range(size)]
partition = DataPartitioner(dataset, partition_sizes)
partition = partition.use(dist.get_rank())
train_set = torch.utils.data.DataLoader(partition,
batch_size=bsz,
shuffle=True)
return train_set, bsz
假设我们有 2 个副本,那么每个进程将有一个 60000 / 2 = 30000 个样本的 train_set
。我们还将批量大小除以副本数,以便保持 128 的总批量大小。
我们现在可以编写我们常用的前向-后向-优化训练代码,并添加一个函数调用来平均我们模型的梯度。(以下内容很大程度上受到官方 PyTorch MNIST 示例 的启发。)
""" Distributed Synchronous SGD Example """
def run(rank, size):
torch.manual_seed(1234)
train_set, bsz = partition_dataset()
model = Net()
optimizer = optim.SGD(model.parameters(),
lr=0.01, momentum=0.5)
num_batches = ceil(len(train_set.dataset) / float(bsz))
for epoch in range(10):
epoch_loss = 0.0
for data, target in train_set:
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
epoch_loss += loss.item()
loss.backward()
average_gradients(model)
optimizer.step()
print('Rank ', dist.get_rank(), ', epoch ',
epoch, ': ', epoch_loss / num_batches)
剩下的就是实现 average_gradients(model)
函数,它只是接收一个模型并在整个世界中平均其梯度。
""" Gradient averaging. """
def average_gradients(model):
size = float(dist.get_world_size())
for param in model.parameters():
dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
param.grad.data /= size
瞧! 我们成功地实现了分布式同步 SGD,并且可以在大型计算机集群上训练任何模型。
注意: 虽然最后一句话在技术上是正确的,但实现生产级同步 SGD 实现还需要 更多技巧。再次强调,使用 经过测试和优化的 内容。
我们自己的环形 Allreduce¶
作为额外的挑战,假设我们想要实现 DeepSpeech 的高效环形 allreduce。使用点对点集体通信可以很容易地实现这一点。
""" Implementation of a ring-reduce with addition. """
def allreduce(send, recv):
rank = dist.get_rank()
size = dist.get_world_size()
send_buff = send.clone()
recv_buff = send.clone()
accum = send.clone()
left = ((rank - 1) + size) % size
right = (rank + 1) % size
for i in range(size - 1):
if i % 2 == 0:
# Send send_buff
send_req = dist.isend(send_buff, right)
dist.recv(recv_buff, left)
accum[:] += recv_buff[:]
else:
# Send recv_buff
send_req = dist.isend(recv_buff, right)
dist.recv(send_buff, left)
accum[:] += send_buff[:]
send_req.wait()
recv[:] = accum[:]
在上面的脚本中,allreduce(send, recv)
函数的签名与 PyTorch 中的签名略有不同。它接收一个 recv
张量,并将所有 send
张量的总和存储在其中。作为留给读者的练习,我们的版本与 DeepSpeech 中的版本仍然存在一个差异:它们的实现将梯度张量划分为块,以便最佳地利用通信带宽。(提示:torch.chunk)
高级主题¶
我们现在准备好探索 torch.distributed
的一些更高级的功能。由于内容很多,本节分为两个小节
通信后端:我们将在其中学习如何使用 MPI 和 Gloo 进行 GPU-GPU 通信。
初始化方法:我们将在其中了解如何在
dist.init_process_group()
中最好地设置初始协调阶段。
通信后端¶
torch.distributed
最优雅的方面之一是它能够抽象并构建在不同的后端之上。如前所述,PyTorch 中实现了多个后端。一些最流行的后端是 Gloo、NCCL 和 MPI。它们各自具有不同的规范和权衡,具体取决于所需的用例。受支持的功能的比较表可以在 此处 找到。
Gloo 后端
到目前为止,我们已经广泛使用了 Gloo 后端。它作为开发平台非常方便,因为它包含在预编译的 PyTorch 二进制文件中,并且在 Linux(自 0.2 起)和 macOS(自 1.3 起)上都有效。它支持 CPU 上的所有点对点和集体操作,以及 GPU 上的所有集体操作。CUDA 张量的集体操作的实现不如 NCCL 后端提供的实现那样优化。
您肯定已经注意到,如果您将 model
放在 GPU 上,我们的分布式 SGD 示例将不起作用。为了使用多个 GPU,让我们也进行以下修改
使用
device = torch.device("cuda:{}".format(rank))
model = Net()
\(\rightarrow\)model = Net().to(device)
使用
data, target = data.to(device), target.to(device)
通过以上修改,我们的模型现在在两个 GPU 上进行训练,您可以使用 watch nvidia-smi
监控它们的利用率。
MPI 后端
消息传递接口 (MPI) 是高性能计算领域的标准化工具。它允许进行点对点和集体通信,并且是 torch.distributed
API 的主要灵感来源。存在 MPI 的几种实现(例如 Open-MPI、MVAPICH2、Intel MPI),每种实现都针对不同的目的进行了优化。使用 MPI 后端的优势在于 MPI 在大型计算机集群上的广泛可用性和高水平的优化。一些 最新 实现 也能够利用 CUDA IPC 和 GPU Direct 技术,以避免通过 CPU 进行内存复制。
不幸的是,PyTorch 的二进制文件无法包含 MPI 实现,我们将不得不手动重新编译它。幸运的是,这个过程相当简单,因为在编译时,PyTorch 将自行查找可用的 MPI 实现。以下步骤通过从 源代码 安装 PyTorch 来安装 MPI 后端。
创建并激活您的 Anaconda 环境,按照 指南 安装所有先决条件,但不要运行
python setup.py install
。选择并安装您喜欢的 MPI 实现。请注意,启用 CUDA 感知 MPI 可能需要一些额外的步骤。在我们的例子中,我们将坚持使用没有 GPU 支持的 Open-MPI:
conda install -c conda-forge openmpi
现在,转到您克隆的 PyTorch 仓库并执行
python setup.py install
。
为了测试我们新安装的后端,需要进行一些修改。
将
if __name__ == '__main__':
下的内容替换为init_process(0, 0, run, backend='mpi')
。运行
mpirun -n 4 python myscript.py
。
进行这些更改的原因是 MPI 需要在生成进程之前创建自己的环境。MPI 也会生成自己的进程并执行 初始化方法 中描述的握手,从而使 init_process_group
的 rank
和 size
参数变得多余。这实际上非常强大,因为您可以将额外的参数传递给 mpirun
,以便为每个进程定制计算资源。(诸如每个进程的内核数、手动将机器分配给特定秩以及 更多)这样做,您应该获得与其他通信后端相同的熟悉输出。
NCCL 后端
NCCL 后端 针对 CUDA 张量提供了集体操作的优化实现。如果您仅将 CUDA 张量用于集体操作,请考虑使用此后端以获得一流的性能。NCCL 后端包含在带有 CUDA 支持的预构建二进制文件中。
初始化方法¶
在本教程的结尾,让我们来研究一下我们调用的初始函数:dist.init_process_group(backend, init_method)
。具体来说,我们将讨论各种初始化方法,这些方法负责每个进程之间的初步协调步骤。 这些方法使您能够定义如何完成此协调。
初始化方法的选择取决于您的硬件设置,并且一种方法可能比其他方法更合适。 除了以下章节外,请参阅官方文档以获取更多信息。
环境变量
在本教程中,我们一直在使用环境变量初始化方法。 通过在所有机器上设置以下四个环境变量,所有进程都能够正确连接到 master,获取有关其他进程的信息,并最终与它们握手。
MASTER_PORT
:主机上可用的端口,该主机将托管 rank 0 的进程。MASTER_ADDR
:将托管 rank 0 进程的主机的 IP 地址。WORLD_SIZE
:进程总数,以便 master 知道需要等待多少 worker。RANK
:每个进程的 rank,以便它们知道它是 master 还是 worker。
共享文件系统
共享文件系统要求所有进程都有权访问共享文件系统,并将通过共享文件协调它们。 这意味着每个进程将打开文件,写入其信息,并等待直到所有人都这样做。 之后,所有必需的信息将随时可供所有进程使用。 为了避免竞争条件,文件系统必须支持通过 fcntl 进行锁定。
dist.init_process_group(
init_method='file:///mnt/nfs/sharedfile',
rank=args.rank,
world_size=4)
TCP
通过 TCP 初始化可以通过提供 rank 0 进程的 IP 地址和可访问的端口号来实现。 在这里,所有 worker 都将能够连接到 rank 0 进程并交换有关如何相互访问的信息。
dist.init_process_group(
init_method='tcp://10.1.1.20:23456',
rank=args.rank,
world_size=4)
致谢
我要感谢 PyTorch 开发者在他们的实现、文档和测试方面做得如此出色。 当代码不清楚时,我总是可以依靠 文档 或 测试 来找到答案。 特别是,我要感谢 Soumith Chintala、Adam Paszke 和 Natalia Gimelshein 提供了深刻的见解并回答了关于早期草稿的问题。