• 文档 >
  • 分布式数据并行
快捷方式

分布式数据并行

警告

torch.nn.parallel.DistributedDataParallel 的实现随时间演变。此设计说明基于 v1.4 的状态编写。

torch.nn.parallel.DistributedDataParallel (DDP) 透明地执行分布式数据并行训练。本页描述了它的工作原理并揭示了实现细节。

示例

让我们从一个简单的 torch.nn.parallel.DistributedDataParallel 示例开始。此示例使用 torch.nn.Linear 作为本地模型,用 DDP 包装它,然后在 DDP 模型上运行一次前向传播、一次反向传播和一个优化器步进。之后,本地模型上的参数将被更新,并且不同进程上的所有模型应该完全相同。

import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
import os
from torch.nn.parallel import DistributedDataParallel as DDP


def example(rank, world_size):
    # create default process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)
    # create local model
    model = nn.Linear(10, 10).to(rank)
    # construct DDP model
    ddp_model = DDP(model, device_ids=[rank])
    # define loss function and optimizer
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    # forward pass
    outputs = ddp_model(torch.randn(20, 10).to(rank))
    labels = torch.randn(20, 10).to(rank)
    # backward pass
    loss_fn(outputs, labels).backward()
    # update parameters
    optimizer.step()

def main():
    world_size = 2
    mp.spawn(example,
        args=(world_size,),
        nprocs=world_size,
        join=True)

if __name__=="__main__":
    # Environment variables which need to be
    # set when using c10d's default "env"
    # initialization mode.
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29500"
    main()

DDP 支持 TorchDynamo。与 TorchDynamo 一起使用时,在编译模型之前应用 DDP 模型包装器,以便 torchdynamo 可以根据 DDP 桶大小应用 DDPOptimizer(图分割优化)。(详见TorchDynamo DDPOptimizer

ddp_model = DDP(model, device_ids=[rank])
ddp_model = torch.compile(ddp_model)

内部设计

本节深入介绍一个迭代中的每个步骤的细节,揭示 torch.nn.parallel.DistributedDataParallel 的内部工作原理。

  • 先决条件:DDP 依赖 c10d ProcessGroup 进行通信。因此,应用程序必须在构造 DDP 之前创建 ProcessGroup 实例。

  • 构造:DDP 构造函数接受对本地模块的引用,并将 rank 0 进程的 state_dict() 广播到组中所有其他进程,以确保所有模型副本从完全相同的状态开始。然后,每个 DDP 进程创建一个本地 Reducer`,它稍后将负责反向传播期间的梯度同步。为了提高通信效率,Reducer` 将参数梯度组织到桶(bucket)中,并一次减少一个桶。可以通过在 DDP 构造函数中设置 bucket_cap_mb 参数来配置桶大小。参数梯度到桶的映射在构造时确定,基于桶大小限制和参数大小。模型参数按照给定模型的 Model.parameters()` 的大致相反顺序分配到桶中。使用相反顺序的原因是 DDP 期望梯度在反向传播期间大致按照该顺序准备就绪。下图显示了一个示例。请注意,grad0` 和 grad1` 在 bucket1` 中,而另外两个梯度在 bucket0` 中。当然,这个假设可能并非总是成立,当发生这种情况时,可能会影响 DDP 反向传播速度,因为 Reducer` 无法在最早可能的时间启动通信。除了分桶之外,Reducer` 在构造时还会注册自动求导钩子(autograd hook),每个参数一个。这些钩子将在反向传播期间,当梯度准备就绪时触发。"

  • 前向传播:DDP 接受输入并将其传递给本地模型,然后如果将 find_unused_parameters 设置为 True`,则会分析本地模型的输出。此模式允许在模型的子图上运行反向传播,DDP 通过从模型输出遍历自动求导图并标记所有未使用的参数为准备好进行归约(reduction),来找出哪些参数参与了反向传播。在反向传播期间,Reducer` 只会等待未准备好的参数,但它仍然会归约所有桶。目前将参数梯度标记为准备就绪并不能帮助 DDP 跳过桶,但它可以防止 DDP 在反向传播期间无限期地等待缺失的梯度。请注意,遍历自动求导图会引入额外的开销,因此应用程序只应在必要时将 find_unused_parameters 设置为 True`。"

  • 反向传播backward() 函数直接在损失 Tensor` 上调用,这超出了 DDP 的控制范围,DDP 使用在构造时注册的自动求导钩子来触发梯度同步。当一个梯度准备就绪时,其在对应梯度累加器上的 DDP 钩子将触发,DDP 随后会将该参数梯度标记为准备好进行归约。当一个桶中的所有梯度都准备就绪时,Reducer` 会在该桶上启动异步 allreduce` 操作,以计算所有进程上的梯度平均值。当所有桶都准备就绪时,Reducer` 将阻塞等待所有 allreduce` 操作完成。完成后,平均梯度将写入所有参数的 param.grad` 字段。因此,在反向传播之后,不同 DDP 进程上同一对应参数的 grad 字段应该相同。"

  • 优化器步进:从优化器的角度来看,它正在优化一个本地模型。所有 DDP 进程上的模型副本可以保持同步,因为它们都从相同的状态开始,并且在每次迭代中都具有相同的平均梯度。

ddp_grad_sync.png

注意

DDP 要求所有进程上的 Reducer` 实例以完全相同的顺序调用 allreduce`,这是通过始终按照桶索引顺序而不是实际桶就绪顺序运行 allreduce` 来实现的。跨进程的 allreduce` 顺序不匹配可能导致结果错误或 DDP 反向传播挂起。"

实现

以下是指向 DDP 实现组件的指针。堆叠图显示了代码的结构。

ProcessGroup

  • ProcessGroup.hpp:包含所有进程组实现的抽象 API。c10d` 库提供了 3 种开箱即用的实现,即 ProcessGroupGlooProcessGroupNCCLProcessGroupMPIDistributedDataParallel` 在初始化期间使用 ProcessGroup::broadcast()` 将模型状态从 rank 0 进程发送到其他进程,并使用 ProcessGroup::allreduce()` 求和梯度。"

  • Store.hpp:协助进程组实例的 rendezvous 服务相互发现。

DistributedDataParallel

  • distributed.py:是 DDP 的 Python 入口点。它实现了初始化步骤和 nn.parallel.DistributedDataParallel` 模块的 forward` 函数,这些函数会调用 C++ 库。其 _sync_param` 函数在一个 DDP 进程在多个设备上工作时执行进程内参数同步,它还将模型缓冲区从 rank 0 进程广播到所有其他进程。进程间参数同步发生在 Reducer.cpp` 中。"

  • comm.h:实现了合并广播辅助函数,该函数在初始化期间用于广播模型状态,并在前向传播之前同步模型缓冲区。

  • reducer.h:提供了反向传播中梯度同步的核心实现。它有三个入口函数

    • Reducer`:构造函数在 distributed.py` 中调用,它将 Reducer::autograd_hook()` 注册到梯度累加器。"

    • autograd_hook()` 函数将在梯度准备就绪时由自动求导引擎调用。"

    • prepare_for_backward()` 在 distributed.py` 中 DDP 前向传播结束时调用。当在 DDP 构造函数中将 find_unused_parameters 设置为 True` 时,它会遍历自动求导图以查找未使用的参数。"

ddp_code.png

TorchDynamo DDPOptimizer

DDP 的性能优势来自于将 allreduce 集体操作与反向传播期间的计算重叠。AotAutograd 在与 TorchDynamo 一起用于编译整个前向和整个反向图时,会阻止这种重叠,因为 allreduce 操作是在整个优化后的反向计算完成_之后_才由自动求导钩子启动的。

TorchDynamo 的 DDPOptimizer 通过在反向传播期间 DDP allreduce 桶的逻辑边界处中断前向图来提供帮助。注意:目标是在反向传播期间中断图,而最简单的实现是中断前向图,然后对每个部分调用 AotAutograd 和编译。这使得 DDP 的 allreduce 钩子可以在反向传播的各个部分之间触发,并调度通信与计算重叠。

有关更深入的解释和实验结果,请参阅此博客文章,或阅读 torch/_dynamo/optimizations/distributed.py 中的文档和代码

要调试 DDPOptimizer,设置 TORCH_LOGS=’ddp_graphs’ 以获取完整的图转储。对于不包含图的日志,将 ‘dynamo’、‘distributed’ 或 ‘dist_ddp’ 中的任意一个添加到 TORCH_LOGS 中(获取有关桶边界的基本信息)。要禁用 DDPOptimizer,设置 torch._dynamo.config.optimize_ddp=False。DDP 和 TorchDynamo 在没有 DDPOptimizer 的情况下仍然可以正常工作,但性能会下降。

文档

查阅 PyTorch 的全面开发者文档

查看文档

教程

获取面向初学者和高级开发者的深入教程

查看教程

资源

查找开发资源并获取问题解答

查看资源