分布式数据并行入门¶
作者: 沈立
编辑: Joe Zhu
注意
在 github 上查看和编辑本教程。
先决条件
DistributedDataParallel (DDP) 在模块级别实现数据并行,可以在多台机器上运行。使用 DDP 的应用程序应该生成多个进程,每个进程创建一个 DDP 实例。DDP 使用 torch.distributed 包中的集体通信来同步梯度和缓冲区。更具体地说,DDP 为 model.parameters()
给出的每个参数注册一个自动微分钩子,并且当在反向传播过程中计算相应的梯度时,该钩子将被触发。然后,DDP 使用该信号在进程之间触发梯度同步。有关更多详细信息,请参阅 DDP 设计说明。
使用 DDP 的推荐方法是为每个模型副本生成一个进程,其中一个模型副本可以跨多个设备。DDP 进程可以放置在同一台机器上或跨多台机器,但 GPU 设备不能跨进程共享。本教程从 DDP 的基本用例开始,然后演示更高级的用例,包括检查点模型和将 DDP 与模型并行结合。
注意
本教程中的代码在一个 8 个 GPU 的服务器上运行,但可以很容易地推广到其他环境。
DataParallel
和 DistributedDataParallel
的比较¶
在我们深入探讨之前,让我们澄清一下,为什么尽管增加了复杂性,您仍然会考虑使用 DistributedDataParallel
而不是 DataParallel
。
首先,
DataParallel
是单进程、多线程的,并且只在单机上运行,而DistributedDataParallel
是多进程的,适用于单机和多机训练。由于 GIL 在线程之间的竞争、每次迭代复制模型以及散布输入和收集输出带来的额外开销,DataParallel
通常比DistributedDataParallel
即使在单机上也更慢。回想一下,从 之前的教程 中,如果您的模型太大而无法在一个 GPU 上运行,则必须使用 **模型并行** 将其拆分到多个 GPU 上。
DistributedDataParallel
与 **模型并行** 协同工作;DataParallel
目前不支持。当 DDP 与模型并行结合时,每个 DDP 进程都会使用模型并行,而所有进程共同使用数据并行。如果您的模型需要跨多个机器运行,或者您的用例不适合数据并行范式,请查看 RPC API 以获得更通用的分布式训练支持。
基本用例¶
要创建 DDP 模块,您必须首先正确设置进程组。更多详细信息可以在 使用 PyTorch 编写分布式应用程序 中找到。
import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
# On Windows platform, the torch.distributed package only
# supports Gloo backend, FileStore and TcpStore.
# For FileStore, set init_method parameter in init_process_group
# to a local file. Example as follow:
# init_method="file:///f:/libtmp/some_file"
# dist.init_process_group(
# "gloo",
# rank=rank,
# init_method=init_method,
# world_size=world_size)
# For TcpStore, same way as on Linux.
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
现在,让我们创建一个玩具模块,用 DDP 包装它,并向它提供一些虚拟输入数据。请注意,由于 DDP 在 DDP 构造函数中将模型状态从 rank 0 进程广播到所有其他进程,您无需担心不同的 DDP 进程从不同的初始模型参数值开始。
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
def demo_basic(rank, world_size):
print(f"Running basic DDP example on rank {rank}.")
setup(rank, world_size)
# create model and move it to GPU with id rank
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(rank)
loss_fn(outputs, labels).backward()
optimizer.step()
cleanup()
def run_demo(demo_fn, world_size):
mp.spawn(demo_fn,
args=(world_size,),
nprocs=world_size,
join=True)
如您所见,DDP 包装了低级分布式通信细节,并提供了一个干净的 API,就像它是一个本地模型一样。梯度同步通信发生在反向传播期间,并与反向计算重叠。当 backward()
返回时,param.grad
已经包含同步的梯度张量。对于基本用例,DDP 只需要几行额外的代码来设置进程组。当将 DDP 应用于更高级的用例时,一些注意事项需要谨慎。
倾斜的处理速度¶
在 DDP 中,构造函数、前向传播和反向传播都是分布式同步点。不同的进程应该启动相同数量的同步,并以相同的顺序到达这些同步点,并在大致相同的时间进入每个同步点。否则,快速的进程可能会提前到达并超时,而等待落后的进程。因此,用户负责平衡进程之间的负载分配。有时,由于网络延迟、资源争用或不可预测的工作负载峰值等原因,倾斜的处理速度是不可避免的。为了避免这些情况下出现超时,请确保在调用 init_process_group 时传递一个足够大的 timeout
值。
保存和加载检查点¶
通常使用 torch.save
和 torch.load
在训练期间对模块进行检查点,并从检查点中恢复。有关更多详细信息,请参阅 保存和加载模型。在使用 DDP 时,一个优化是只在一个进程中保存模型,然后加载到所有进程,从而减少写入开销。这是正确的,因为所有进程都从相同的参数开始,梯度在反向传播中是同步的,因此优化器应该继续将参数设置为相同的值。如果您使用此优化,请确保在保存完成之前没有进程开始加载。此外,在加载模块时,您需要提供一个适当的 map_location
参数,以防止进程进入其他进程的设备。如果缺少 map_location
,torch.load
将首先将模块加载到 CPU,然后将每个参数复制到其保存的位置,这将导致同一台机器上的所有进程使用同一组设备。有关更高级的故障恢复和弹性支持,请参考 TorchElastic。
def demo_checkpoint(rank, world_size):
print(f"Running DDP checkpoint example on rank {rank}.")
setup(rank, world_size)
model = ToyModel().to(rank)
ddp_model = DDP(model, device_ids=[rank])
CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
if rank == 0:
# All processes should see same parameters as they all start from same
# random parameters and gradients are synchronized in backward passes.
# Therefore, saving it in one process is sufficient.
torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
# Use a barrier() to make sure that process 1 loads the model after process
# 0 saves it.
dist.barrier()
# configure map_location properly
map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
ddp_model.load_state_dict(
torch.load(CHECKPOINT_PATH, map_location=map_location, weights_only=True))
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(rank)
loss_fn(outputs, labels).backward()
optimizer.step()
# Not necessary to use a dist.barrier() to guard the file deletion below
# as the AllReduce ops in the backward pass of DDP already served as
# a synchronization.
if rank == 0:
os.remove(CHECKPOINT_PATH)
cleanup()
将 DDP 与模型并行结合¶
DDP 也适用于多 GPU 模型。用 DDP 包装多 GPU 模型在训练具有大量数据的庞大模型时特别有用。
class ToyMpModel(nn.Module):
def __init__(self, dev0, dev1):
super(ToyMpModel, self).__init__()
self.dev0 = dev0
self.dev1 = dev1
self.net1 = torch.nn.Linear(10, 10).to(dev0)
self.relu = torch.nn.ReLU()
self.net2 = torch.nn.Linear(10, 5).to(dev1)
def forward(self, x):
x = x.to(self.dev0)
x = self.relu(self.net1(x))
x = x.to(self.dev1)
return self.net2(x)
当将多 GPU 模型传递给 DDP 时,device_ids
和 output_device
必须不设置。输入和输出数据将由应用程序或模型的 forward()
方法放置在适当的设备中。
def demo_model_parallel(rank, world_size):
print(f"Running DDP with model parallel example on rank {rank}.")
setup(rank, world_size)
# setup mp_model and devices for this process
dev0 = rank * 2
dev1 = rank * 2 + 1
mp_model = ToyMpModel(dev0, dev1)
ddp_mp_model = DDP(mp_model)
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_mp_model.parameters(), lr=0.001)
optimizer.zero_grad()
# outputs will be on dev1
outputs = ddp_mp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(dev1)
loss_fn(outputs, labels).backward()
optimizer.step()
cleanup()
if __name__ == "__main__":
n_gpus = torch.cuda.device_count()
assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
world_size = n_gpus
run_demo(demo_basic, world_size)
run_demo(demo_checkpoint, world_size)
world_size = n_gpus//2
run_demo(demo_model_parallel, world_size)
使用 torch.distributed.run/torchrun 初始化 DDP¶
我们可以利用 PyTorch Elastic 来简化 DDP 代码并更轻松地初始化作业。让我们仍然使用 Toymodel 示例,并创建一个名为 elastic_ddp.py
的文件。
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.net1 = nn.Linear(10, 10)
self.relu = nn.ReLU()
self.net2 = nn.Linear(10, 5)
def forward(self, x):
return self.net2(self.relu(self.net1(x)))
def demo_basic():
dist.init_process_group("nccl")
rank = dist.get_rank()
print(f"Start running basic DDP example on rank {rank}.")
# create model and move it to GPU with id rank
device_id = rank % torch.cuda.device_count()
model = ToyModel().to(device_id)
ddp_model = DDP(model, device_ids=[device_id])
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
optimizer.zero_grad()
outputs = ddp_model(torch.randn(20, 10))
labels = torch.randn(20, 5).to(device_id)
loss_fn(outputs, labels).backward()
optimizer.step()
dist.destroy_process_group()
if __name__ == "__main__":
demo_basic()
然后,可以在所有节点上运行 torch elastic/torchrun 命令来初始化上面创建的 DDP 作业。
torchrun --nnodes=2 --nproc_per_node=8 --rdzv_id=100 --rdzv_backend=c10d --rdzv_endpoint=$MASTER_ADDR:29400 elastic_ddp.py
我们在两个主机上运行 DDP 脚本,每个主机我们都运行 8 个进程,也就是说,我们在 16 个 GPU 上运行它。请注意,$MASTER_ADDR
在所有节点上必须相同。
这里 torchrun 将在它启动的节点上的每个进程上启动 8 个进程并调用 elastic_ddp.py
,但用户还需要应用像 slurm 这样的集群管理工具来真正地在这 2 个节点上运行此命令。
例如,在启用 SLURM 的集群上,我们可以编写一个脚本来运行上面的命令,并将 MASTER_ADDR
设置为
export MASTER_ADDR=$(scontrol show hostname ${SLURM_NODELIST} | head -n 1)
然后,我们只需使用 SLURM 命令运行此脚本:srun --nodes=2 ./torchrun_script.sh
。当然,这只是一个例子;您可以选择您自己的集群调度工具来启动 torchrun 作业。
有关 Elastic 运行的更多信息,您可以查看此 快速入门文档 了解更多。