分布式数据并行¶
- class torch.nn.parallel.DistributedDataParallel(module, device_ids=None, output_device=None, dim=0, broadcast_buffers=True, process_group=None, bucket_cap_mb=None, find_unused_parameters=False, check_reduction=False, gradient_as_bucket_view=False, static_graph=False, delay_all_reduce_named_params=None, param_to_hook_all_reduce=None, mixed_precision=None, device_mesh=None)[source]¶
基于
torch.distributed
在模块级别实现分布式数据并行。此容器通过在每个模型副本之间同步梯度来提供数据并行。要同步的设备由输入
process_group
指定,默认为整个世界。请注意,DistributedDataParallel
不会将输入切分或以其他方式跨参与的 GPU 分片;用户负责定义如何执行此操作,例如通过使用DistributedSampler
。另请参阅:基础知识 和 使用 nn.parallel.DistributedDataParallel 而不是多进程或 nn.DataParallel。与
torch.nn.DataParallel
相同的输入约束适用。创建此类需要
torch.distributed
已初始化,通过调用torch.distributed.init_process_group()
。DistributedDataParallel
被证明在单节点多 GPU 数据并行训练方面显著快于torch.nn.DataParallel
。要在具有 N 个 GPU 的主机上使用
DistributedDataParallel
,您应该启动N
个进程,确保每个进程独占地使用从 0 到 N-1 的单个 GPU。这可以通过为每个进程设置CUDA_VISIBLE_DEVICES
或通过调用以下方法来完成:>>> torch.cuda.set_device(i)
其中 i 从 0 到 N-1。在每个进程中,您应该参考以下内容来构建此模块:
>>> torch.distributed.init_process_group( >>> backend='nccl', world_size=N, init_method='...' >>> ) >>> model = DistributedDataParallel(model, device_ids=[i], output_device=i)
为了在每个节点上启动多个进程,您可以使用
torch.distributed.launch
或torch.multiprocessing.spawn
。注意
请参阅 PyTorch 分布式概述,以简要了解与分布式训练相关的所有功能。
注意
DistributedDataParallel
可以与torch.distributed.optim.ZeroRedundancyOptimizer
结合使用,以减少每个秩优化器状态的内存占用。请参阅 ZeroRedundancyOptimizer 食谱 以获取更多详细信息。注意
nccl
后端是目前速度最快且强烈推荐的后端,用于 GPU。这适用于单节点和多节点分布式训练。注意
此模块还支持混合精度分布式训练。这意味着您的模型可以具有不同类型的参数,例如
fp16
和fp32
的混合类型,这些混合类型参数上的梯度归约将正常工作。注意
如果在一个进程上使用
torch.save
来保存模块的检查点,并在其他一些进程上使用torch.load
来恢复它,请确保为每个进程正确配置了map_location
。如果没有map_location
,torch.load
会将模块恢复到保存模块的设备上。注意
当一个模型在
M
个节点上训练,且batch=N
时,如果损失在批次中的实例上求和(而不是像通常那样求平均),则与在单个节点上使用batch=M*N
训练的相同模型相比,梯度将小M
倍(因为不同节点之间的梯度是平均的)。当您希望获得与本地训练对应的数学上等效的训练过程时,应该考虑这一点。但在大多数情况下,您可以将DistributedDataParallel包装的模型、DataParallel包装的模型和单个GPU上的普通模型视为相同(例如,对于等效的批次大小使用相同的学习率)。注意
参数永远不会在进程之间广播。该模块对梯度执行全简化步骤,并假设它们将在所有进程中以相同的方式被优化器修改。缓冲区(例如BatchNorm统计信息)在每次迭代中从秩为0的进程中的模块广播到系统中的所有其他副本。
注意
如果您将DistributedDataParallel与分布式RPC框架结合使用,则应始终使用
torch.distributed.autograd.backward()
计算梯度,并使用torch.distributed.optim.DistributedOptimizer
优化参数。示例
>>> import torch.distributed.autograd as dist_autograd >>> from torch.nn.parallel import DistributedDataParallel as DDP >>> import torch >>> from torch import optim >>> from torch.distributed.optim import DistributedOptimizer >>> import torch.distributed.rpc as rpc >>> from torch.distributed.rpc import RRef >>> >>> t1 = torch.rand((3, 3), requires_grad=True) >>> t2 = torch.rand((3, 3), requires_grad=True) >>> rref = rpc.remote("worker1", torch.add, args=(t1, t2)) >>> ddp_model = DDP(my_model) >>> >>> # Setup optimizer >>> optimizer_params = [rref] >>> for param in ddp_model.parameters(): >>> optimizer_params.append(RRef(param)) >>> >>> dist_optim = DistributedOptimizer( >>> optim.SGD, >>> optimizer_params, >>> lr=0.05, >>> ) >>> >>> with dist_autograd.context() as context_id: >>> pred = ddp_model(rref.to_here()) >>> loss = loss_func(pred, target) >>> dist_autograd.backward(context_id, [loss]) >>> dist_optim.step(context_id)
注意
DistributedDataParallel目前对使用
torch.utils.checkpoint()
的梯度检查点提供有限的支持。如果检查点使用use_reentrant=False(推荐)完成,则DDP将按预期工作,没有任何限制。但是,如果检查点使用use_reentrant=True(默认值)完成,则当模型中没有未使用的参数并且每个层最多检查点一次时,DDP将按预期工作(确保您没有将find_unused_parameters=True传递给DDP)。我们目前不支持多次检查点一个层的情况,或者检查点模型中存在未使用参数的情况。注意
为了让非DDP模型加载来自DDP模型的状态字典,需要将
consume_prefix_in_state_dict_if_present()
应用于在加载前剥离DDP状态字典中的前缀“module.”。警告
构造函数、前向方法以及输出的微分(或该模块输出的函数)是分布式同步点。如果不同的进程可能正在执行不同的代码,请将其考虑在内。
警告
此模块假设所有参数在创建时都已在模型中注册。以后不应添加或删除任何参数。缓冲区也适用。
警告
此模块假设每个分布式进程的模型中所有参数的注册顺序相同。该模块本身将按照模型注册参数的反序进行梯度
allreduce
。换句话说,确保每个分布式进程具有完全相同的模型,从而具有完全相同的参数注册顺序,是用户的责任。警告
此模块允许具有非行主序连续步长的参数。例如,您的模型可能包含一些参数,其
torch.memory_format
为torch.contiguous_format
,而其他参数的格式为torch.channels_last
。但是,不同进程中对应的参数必须具有相同的步长。警告
此模块不适用于
torch.autograd.grad()
(即,只有当梯度要累积在参数的.grad
属性中时,它才会起作用)。警告
如果您计划将此模块与
nccl
后端或gloo
后端(使用Infiniband)一起使用,以及使用多个工作进程的DataLoader,请将多处理启动方法更改为forkserver
(仅限Python 3)或spawn
。不幸的是,Gloo(使用Infiniband)和NCCL2不是线程安全的,如果您不更改此设置,则可能会遇到死锁。警告
在使用
DistributedDataParallel
包装模型后,您永远不要尝试更改模型的参数。因为,当使用DistributedDataParallel
包装模型时,DistributedDataParallel
的构造函数将在构造时在模型本身的所有参数上注册额外的梯度缩减函数。如果您随后更改模型的参数,梯度缩减函数将不再与正确的参数集匹配。警告
将
DistributedDataParallel
与分布式RPC框架结合使用是实验性的,并且可能会发生变化。- 参数
module (Module) – 要并行的模块
device_ids (list of int or torch.device) –
CUDA 设备。1) 对于单设备模块,
device_ids
可以包含正好一个设备ID,它表示此进程对应的输入模块所在的唯一CUDA设备。或者,device_ids
也可以是None
。2) 对于多设备模块和CPU模块,device_ids
必须是None
。当
device_ids
对于这两种情况都为None
时,前向传递的输入数据和实际模块都必须放置在正确的设备上。(默认值:None
)output_device (int or torch.device) – 单设备CUDA模块输出的设备位置。对于多设备模块和CPU模块,它必须是
None
,模块本身决定输出位置。(默认值:对于单设备模块,为device_ids[0]
)broadcast_buffers (bool) – 在
forward
函数开始时启用同步(广播)模块缓冲区的标志。(默认值:True
)process_group – 用于分布式数据全简化的进程组。如果为
None
,则将使用默认进程组,该进程组由torch.distributed.init_process_group()
创建。(默认值:None
)bucket_cap_mb –
DistributedDataParallel
会将参数分成多个桶,以便每个桶的梯度缩减可能与反向计算重叠。bucket_cap_mb
控制桶的大小(以兆字节(MiB)为单位)。如果为None
,则将使用默认大小25 MiB。(默认值:None
)find_unused_parameters (bool) – 从包装模块的
forward
函数返回值中包含的所有张量遍历自动梯度图。作为此图的一部分而未接收梯度的参数会被抢先标记为已准备好进行缩减。此外,可能已在包装模块的forward
函数中使用但不是损失计算的一部分,因此也不会接收梯度的参数也会被抢先标记为已准备好进行缩减。(默认值:False
)check_reduction – 此参数已弃用。
gradient_as_bucket_view (bool) – 当设置为
True
时,梯度将是指向allreduce
通信桶的不同偏移量的视图。这可以减少峰值内存使用量,节省的内存大小将等于总梯度大小。此外,它避免了梯度和allreduce
通信桶之间复制的开销。当梯度是视图时,无法在梯度上调用detach_()
。如果遇到此类错误,请参考torch/optim/optimizer.py
中的zero_grad()
函数作为解决方案。请注意,梯度将在第一次迭代后成为视图,因此应在第一次迭代后检查峰值内存节省情况。static_graph (bool) –
当设置为
True
时,DDP 知道训练图是静态的。静态图意味着 1) 在整个训练循环期间,使用和未使用参数的集合不会改变;在这种情况下,用户是否设置find_unused_parameters = True
并不重要。2) 训练图的方式在整个训练循环中不会改变(意味着没有依赖于迭代的控制流)。当 static_graph 设置为True
时,DDP 将支持过去无法支持的情况:1) 重入反向传播。2) 多次激活检查点。3) 模型具有未使用参数时的激活检查点。4) 模型参数位于前向函数之外。5) 当存在未使用参数时,可能会提高性能,因为当 static_graph 设置为True
时,DDP 不会在每次迭代中搜索图以检测未使用参数。要检查是否可以将 static_graph 设置为True
,一种方法是在您之前模型训练结束时检查 ddp 日志数据,如果ddp_logging_data.get("can_set_static_graph") == True
,则大多数情况下您也可以设置static_graph = True
。- 示例:
>>> model_DDP = torch.nn.parallel.DistributedDataParallel(model) >>> # Training loop >>> ... >>> ddp_logging_data = model_DDP._get_ddp_logging_data() >>> static_graph = ddp_logging_data.get("can_set_static_graph")
delay_all_reduce_named_params (list of tuple of str 和 torch.nn.Parameter) – 一个命名参数列表,当
param_to_hook_all_reduce
中指定的参数梯度准备好时,其所有归约将被延迟。DDP 的其他参数不适用于此参数中指定的命名参数,因为这些命名参数将被 DDP 归约器忽略。param_to_hook_all_reduce (torch.nn.Parameter) – 一个参数,用于挂钩
delay_all_reduce_named_params
中指定参数的延迟所有归约。
- 变量
module (Module) – 要并行的模块。
示例
>>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...') >>> net = torch.nn.parallel.DistributedDataParallel(model)
- join(divide_by_initial_world_size=True, enable=True, throw_on_early_termination=False)[source]¶
用于在 DDP 中使用跨进程不均匀输入进行训练的上下文管理器。
此上下文管理器将跟踪已加入的 DDP 进程,并通过插入集体通信操作来“影子”前向和后向传递,以匹配非加入的 DDP 进程创建的操作。这将确保每个集体调用都有一个由已加入的 DDP 进程进行的相应调用,从而防止在跨进程使用不均匀输入进行训练时可能发生的挂起或错误。或者,如果标志
throw_on_early_termination
被指定为True
,则一旦一个进程用完输入,所有训练器都会抛出一个错误,允许这些错误被捕获并根据应用程序逻辑进行处理。一旦所有 DDP 进程都加入,上下文管理器将把对应于最后一个加入进程的模型广播到所有进程,以确保模型在所有进程中都是相同的(这由 DDP 保证)。
要使用它来启用跨进程使用不均匀输入进行训练,只需将此上下文管理器包装在您的训练循环周围即可。无需对模型或数据加载进行进一步修改。
警告
如果此上下文管理器包装的模型或训练循环具有其他分布式集体操作,例如模型前向传递中的
SyncBatchNorm
,则必须启用标志throw_on_early_termination
。这是因为此上下文管理器不知道非 DDP 集体通信。此标志将导致所有进程在任何一个进程耗尽输入时抛出异常,允许在所有进程中捕获并恢复这些错误。- 参数
divide_by_initial_world_size (bool) – 如果
True
,则将梯度除以启动 DDP 训练时的初始world_size
。如果False
,则计算有效世界大小(尚未用完输入的进程数),并在所有归约期间将梯度除以该值。设置divide_by_initial_world_size=True
以确保每个输入样本(包括不均匀输入)在它们对全局梯度的贡献方面具有相同的权重。这是通过始终将梯度除以初始world_size
来实现的,即使我们在遇到不均匀输入时也是如此。如果您将其设置为False
,我们将梯度除以剩余的节点数。这确保了与在较小world_size
上进行训练的同等性,尽管这也意味着不均匀输入将对全局梯度贡献更多。通常,对于训练作业的最后几个输入不均匀的情况,您希望将其设置为True
。在输入数量差异很大的极端情况下,将其设置为False
可能会提供更好的结果。enable (bool) – 是否启用不均匀输入检测。在您知道输入在参与进程中是均匀的情况下,传入
enable=False
以禁用。默认为True
。throw_on_early_termination (bool) – 当至少一个进程耗尽输入时,是否抛出错误或继续训练。如果
True
,则在第一个进程到达数据末尾时抛出。如果False
,则将继续训练,直到所有进程都加入为止,有效世界大小会变小。请注意,如果指定了此标志,则会忽略标志divide_by_initial_world_size
。默认为False
。
示例
>>> import torch >>> import torch.distributed as dist >>> import os >>> import torch.multiprocessing as mp >>> import torch.nn as nn >>> # On each spawned worker >>> def worker(rank): >>> dist.init_process_group("nccl", rank=rank, world_size=2) >>> torch.cuda.set_device(rank) >>> model = nn.Linear(1, 1, bias=False).to(rank) >>> model = torch.nn.parallel.DistributedDataParallel( >>> model, device_ids=[rank], output_device=rank >>> ) >>> # Rank 1 gets one more input than rank 0. >>> inputs = [torch.tensor([1]).float() for _ in range(10 + rank)] >>> with model.join(): >>> for _ in range(5): >>> for inp in inputs: >>> loss = model(inp).sum() >>> loss.backward() >>> # Without the join() API, the below synchronization will hang >>> # blocking for rank 1's allreduce to complete. >>> torch.cuda.synchronize(device=rank)
- join_hook(**kwargs)[source]¶
DDP join hook 通过镜像前向和后向传递中的通信来启用对不均匀输入的训练。
- 参数
kwargs (dict) – 一个
dict
,包含任何关键字参数,用于在运行时修改 join hook 的行为;所有共享相同 join 上下文管理器的Joinable
实例都会转发kwargs
的相同值。
- 此 hook 支持以下关键字参数
- divide_by_initial_world_size (bool, 可选)
如果
True
,则梯度将除以启动 DDP 时的初始世界大小。如果False
,则梯度将除以有效世界大小(即未加入的进程数),这意味着不均匀输入对全局梯度的贡献更大。通常,如果均匀性程度较小,则应将其设置为True
,但在极端情况下可以将其设置为False
以获得更好的结果。默认为True
。
- no_sync()[source]¶
用于禁用跨 DDP 进程的梯度同步的上下文管理器。
在此上下文中,梯度将在模块变量上累积,稍后将在退出上下文的第一个前向-后向传递中进行同步。
示例
>>> ddp = torch.nn.parallel.DistributedDataParallel(model, pg) >>> with ddp.no_sync(): >>> for input in inputs: >>> ddp(input).backward() # no synchronization, accumulate grads >>> ddp(another_input).backward() # synchronize grads
警告
前向传递应包含在上下文管理器内,否则梯度仍将同步。
- register_comm_hook(state, hook)[source]¶
注册用于用户定义的 DDP 跨多个工作器聚合梯度的通信 hook。
此 hook 对研究人员尝试新想法非常有用。例如,此 hook 可用于实现 GossipGrad 和梯度压缩等几种算法,这些算法涉及在运行分布式数据并行训练时不同的参数同步通信策略。
- 参数
state (object) –
传递给 hook 以在训练过程中维护任何状态信息。例如,梯度压缩中的错误反馈、GossipGrad 中要与其通信的同伴等。
它由每个工作器本地存储,并在工作器上的所有梯度张量之间共享。
hook (Callable) –
具有以下签名的可调用对象:
hook(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]
此函数在桶准备好后调用一次。hook 可以执行任何必要的处理并返回一个 Future,指示任何异步工作的完成(例如,所有归约)。如果 hook 不执行任何通信,它仍然必须返回一个已完成的 Future。Future 应该保存 grad 桶张量的新值。一旦桶准备好,c10d 归约器将调用此 hook 并使用 Future 返回的张量并将梯度复制到各个参数。请注意,future 的返回类型必须是单个张量。
我们还提供了一个名为
get_future
的 API,用于检索与c10d.ProcessGroup.Work
完成相关的 Future。目前,get_future
支持 NCCL,也支持 GLOO 和 MPI 上的大多数操作,但点对点操作(发送/接收)除外。
警告
Grad bucket 中的张量不会预先除以 world_size。用户需要在 allreduce 等操作时自行除以 world_size。
警告
DDP 通信钩子只能注册一次,并且应该在调用反向传播之前注册。
警告
钩子返回的 Future 对象应包含一个与 grad bucket 内部的张量形状相同的单个张量。
警告
get_future
API 支持 NCCL,以及部分 GLOO 和 MPI 后端(不支持点对点操作,如发送/接收),并将返回一个torch.futures.Future
。- 示例:
下面是一个返回相同张量的空操作钩子的示例。
>>> def noop(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]: >>> fut = torch.futures.Future() >>> fut.set_result(bucket.buffer()) >>> return fut >>> ddp.register_comm_hook(state=None, hook=noop)
- 示例:
下面是一个并行 SGD 算法的示例,其中梯度在 allreduce 之前进行编码,并在 allreduce 之后进行解码。
>>> def encode_and_decode(state: object, bucket: dist.GradBucket) -> torch.futures.Future[torch.Tensor]: >>> encoded_tensor = encode(bucket.buffer()) # encode gradients >>> fut = torch.distributed.all_reduce(encoded_tensor).get_future() >>> # Define the then callback to decode. >>> def decode(fut): >>> decoded_tensor = decode(fut.value()[0]) # decode gradients >>> return decoded_tensor >>> return fut.then(decode) >>> ddp.register_comm_hook(state=None, hook=encode_and_decode)