分布式优化器¶
警告
当前使用 CUDA 张量时不支持分布式优化器
torch.distributed.optim
公开了 DistributedOptimizer,它接受远程参数列表 (RRef
) 并在参数所在的 worker 上本地运行优化器。分布式优化器可以使用任何本地优化器 基类 在每个 worker 上应用梯度。
- class torch.distributed.optim.DistributedOptimizer(optimizer_class, params_rref, *args, **kwargs)[source]¶
DistributedOptimizer 接受分布在各个 worker 上的参数的远程引用,并在每个参数上本地应用给定的优化器。
此类使用
get_gradients()
来检索特定参数的梯度。来自同一客户端或不同客户端的并发调用
step()
将在每个工作器上序列化 - 因为每个工作器的优化器一次只能处理一组梯度。但是,不能保证每个客户端的完整前向-反向-优化器序列会按顺序执行。这意味着应用的梯度可能与在给定工作器上执行的最新前向传递不一致。此外,工作器之间没有保证的排序。DistributedOptimizer 默认情况下使用 TorchScript 创建本地优化器,以便在多线程训练(例如分布式模型并行)的情况下,优化器更新不会被 Python 全局解释器锁 (GIL) 阻塞。此功能目前已在大多数优化器中启用。您也可以按照 PyTorch 教程中的 说明 为您自己的自定义优化器启用 TorchScript 支持。
- 参数
optimizer_class (optim.Optimizer) – 在每个工作器上实例化的优化器类。
params_rref (list[RRef]) – 要优化的本地或远程参数的 RRef 列表。
args – 传递给每个工作器上优化器构造函数的参数。
kwargs – 传递给每个工作器上优化器构造函数的参数。
- 示例:
>>> import torch.distributed.autograd as dist_autograd >>> import torch.distributed.rpc as rpc >>> from torch import optim >>> from torch.distributed.optim import DistributedOptimizer >>> >>> with dist_autograd.context() as context_id: >>> # Forward pass. >>> rref1 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 3)) >>> rref2 = rpc.remote("worker1", torch.add, args=(torch.ones(2), 1)) >>> loss = rref1.to_here() + rref2.to_here() >>> >>> # Backward pass. >>> dist_autograd.backward(context_id, [loss.sum()]) >>> >>> # Optimizer. >>> dist_optim = DistributedOptimizer( >>> optim.SGD, >>> [rref1, rref2], >>> lr=0.05, >>> ) >>> dist_optim.step(context_id)
- step(context_id)[source]¶
执行单个优化步骤。
这将调用每个包含要优化的参数的 worker 上的
torch.optim.Optimizer.step()
,并将阻塞直到所有 worker 返回。提供的context_id
将用于检索包含应应用于参数的梯度的相应context
。- 参数
context_id – 我们应该运行优化器步骤的自动微分上下文 ID。
- class torch.distributed.optim.PostLocalSGDOptimizer(optim, averager)[source]¶
包装任意
torch.optim.Optimizer
并运行 后本地 SGD,此优化器在每一步都运行本地优化器。在预热阶段之后,它会在应用本地优化器后定期对参数进行平均。- 参数
optim (Optimizer) – 本地优化器。
averager (ModelAverager) – 用于运行后本地 SGD 算法的模型平均器实例。
示例
>>> import torch >>> import torch.distributed as dist >>> import torch.distributed.algorithms.model_averaging.averagers as averagers >>> import torch.nn as nn >>> from torch.distributed.optim import PostLocalSGDOptimizer >>> from torch.distributed.algorithms.ddp_comm_hooks.post_localSGD_hook import ( >>> PostLocalSGDState, >>> post_localSGD_hook, >>> ) >>> >>> model = nn.parallel.DistributedDataParallel( >>> module, device_ids=[rank], output_device=rank >>> ) >>> >>> # Register a post-localSGD communication hook. >>> state = PostLocalSGDState(process_group=None, subgroup=None, start_localSGD_iter=100) >>> model.register_comm_hook(state, post_localSGD_hook) >>> >>> # Create a post-localSGD optimizer that wraps a local optimizer. >>> # Note that ``warmup_steps`` used in ``PostLocalSGDOptimizer`` must be the same as >>> # ``start_localSGD_iter`` used in ``PostLocalSGDState``. >>> local_optim = torch.optim.SGD(params=model.parameters(), lr=0.01) >>> opt = PostLocalSGDOptimizer( >>> optim=local_optim, >>> averager=averagers.PeriodicModelAverager(period=4, warmup_steps=100) >>> ) >>> >>> # In the first 100 steps, DDP runs global gradient averaging at every step. >>> # After 100 steps, DDP runs gradient averaging within each subgroup (intra-node by default), >>> # and post-localSGD optimizer runs global model averaging every 4 steps after applying the local optimizer. >>> for step in range(0, 200): >>> opt.zero_grad() >>> loss = loss_fn(output, labels) >>> loss.backward() >>> opt.step()
- load_state_dict(state_dict)[source]¶
这与
torch.optim.Optimizer
load_state_dict()
相同,但也会将模型平均器的步长值恢复到提供的state_dict
中保存的值。如果
state_dict
中没有"step"
条目,它将发出警告并将模型平均器的步长初始化为 0。
- state_dict()[source]¶
这与
torch.optim.Optimizer
的state_dict()
相同,但它在检查点中添加了一个额外的条目来记录模型平均器的步骤,以确保重新加载不会再次导致不必要的预热。
- class torch.distributed.optim.ZeroRedundancyOptimizer(params, optimizer_class, process_group=None, parameters_as_bucket_view=False, overlap_with_ddp=False, **defaults)[source]¶
包装任意
optim.Optimizer
并将其状态在组中的等级之间进行分片。共享方式如 ZeRO 所述。
每个等级中的本地优化器实例仅负责更新大约
1 / world_size
个参数,因此只需要保留1 / world_size
个优化器状态。在本地更新参数后,每个等级会将其参数广播到所有其他对等方,以使所有模型副本保持相同状态。ZeroRedundancyOptimizer
可以与torch.nn.parallel.DistributedDataParallel
结合使用,以减少每个等级的峰值内存消耗。ZeroRedundancyOptimizer
使用排序贪婪算法在每个等级处打包一定数量的参数。每个参数属于单个等级,不会在等级之间划分。分区是任意的,可能与参数注册或使用顺序不匹配。- 参数
params (
Iterable
) – 一个Iterable
,包含torch.Tensor
或dict
,表示所有参数,这些参数将被分片到各个进程。- 关键字参数
optimizer_class (
torch.nn.Optimizer
) – 本地优化器的类。process_group (
ProcessGroup
, 可选) –torch.distributed
的ProcessGroup
(默认值:dist.group.WORLD
,由torch.distributed.init_process_group()
初始化)。parameters_as_bucket_view (bool, 可选) – 如果为
True
,参数将被打包到桶中以加快通信速度,并且param.data
字段将指向不同偏移量的桶视图;如果为False
,则每个参数将单独进行通信,并且每个params.data
将保持不变(默认值:False
)。overlap_with_ddp (bool, optional) – 如果
True
,step()
与DistributedDataParallel
的梯度同步重叠;这需要 (1)optimizer_class
参数为函数式优化器,或者具有函数式等效的优化器,以及 (2) 注册一个从ddp_zero_hook.py
中的函数构建的 DDP 通信钩子;参数被打包到与DistributedDataParallel
中的桶匹配的桶中,这意味着parameters_as_bucket_view
参数被忽略。如果False
,step()
在反向传播之后(按正常方式)独立运行。(默认:False
)**defaults** – 任何尾随参数,这些参数将转发到本地优化器。
示例
>>> import torch.nn as nn >>> from torch.distributed.optim import ZeroRedundancyOptimizer >>> from torch.nn.parallel import DistributedDataParallel as DDP >>> model = nn.Sequential(*[nn.Linear(2000, 2000).to(rank) for _ in range(20)]) >>> ddp = DDP(model, device_ids=[rank]) >>> opt = ZeroRedundancyOptimizer( >>> ddp.parameters(), >>> optimizer_class=torch.optim.Adam, >>> lr=0.01 >>> ) >>> ddp(inputs).sum().backward() >>> opt.step()
警告
目前,
ZeroRedundancyOptimizer
要求所有传入的参数都是相同的密集类型。警告
如果您传递
overlap_with_ddp=True
,请注意以下事项:鉴于目前实现DistributedDataParallel
与ZeroRedundancyOptimizer
重叠的方式,前两个或三个训练迭代不会在优化器步骤中执行参数更新,具体取决于static_graph=False
或static_graph=True
,分别。这是因为它需要有关DistributedDataParallel
使用的梯度分桶策略的信息,该信息在第二次前向传递(如果static_graph=False
)或第三次前向传递(如果static_graph=True
)之前不会最终确定。为了调整这一点,一种选择是在前面添加虚拟输入。警告
ZeroRedundancyOptimizer 处于实验阶段,可能会发生变化。
- add_param_group(param_group)[source]¶
将参数组添加到
Optimizer
的param_groups
中。这在微调预训练网络时很有用,因为可以将冻结的层设置为可训练,并在训练过程中将其添加到
Optimizer
中。- 参数
param_group (dict) – 指定要优化的参数和特定于组的优化选项。
警告
此方法处理更新所有分区上的分片,但需要在所有排名上调用。在排名子集上调用它会导致训练挂起,因为通信原语是根据管理的参数调用的,并期望所有排名都参与同一组参数。
- consolidate_state_dict(to=0)[source]¶
在目标排名上合并
state_dict
列表(每个排名一个)。- 参数
to (int) – 接收优化器状态的排名(默认值:0)。
- 引发
RuntimeError – 如果
overlap_with_ddp=True
并且此方法在此ZeroRedundancyOptimizer
实例完全初始化之前被调用,这将在DistributedDataParallel
梯度桶重建后发生。
警告
这需要在所有排名上调用。
- join_hook(**kwargs)[source]¶
返回 ZeRO 连接钩子。
它通过在优化器步骤中跟踪集体通信来实现对不均匀输入的训练。
在调用此钩子之前,必须正确设置梯度。
此钩子不支持任何关键字参数;即
kwargs
未使用。
- load_state_dict(state_dict)[source]¶
从输入
state_dict
中加载与给定等级相关的状态,根据需要更新本地优化器。- 参数
state_dict (dict) – 优化器状态;应该是从调用
state_dict()
返回的对象。- 引发
RuntimeError – 如果
overlap_with_ddp=True
并且此方法在此ZeroRedundancyOptimizer
实例完全初始化之前被调用,这将在DistributedDataParallel
梯度桶重建后发生。
- state_dict()[source]¶
返回此等级已知的最后一个全局优化器状态。
- 引发
RuntimeError – 如果
overlap_with_ddp=True
并且此方法在该ZeroRedundancyOptimizer
实例完全初始化之前被调用,这将在DistributedDataParallel
梯度桶重建后发生;或者如果此方法在没有先调用consolidate_state_dict()
的情况下被调用。- 返回类型