快捷方式

支持 TorchScript 的分布式优化器¶

创建于:2021 年 4 月 26 日 | 最后更新于:2024 年 12 月 02 日 | 最后验证于:2024 年 11 月 05 日

警告

TorchScript 不再积极开发。

在本 Recipe 中,你将学习

  • 支持 TorchScript 的分布式优化器的高级概念以及此功能带来的益处

  • 如何编写支持 TorchScript 的自定义分布式优化器

要求¶

什么是分布式优化器?¶

DistributedOptimizer 接受一个远程参数列表 (RRef),并在参数所在的 worker 上本地运行优化器,这通常与分布式 RPC/Autograd 一起用于模型并行训练。它可以使用任何本地优化器算法(无论是 torch.optim 中提供的预定义算法还是自定义算法)来对每个 worker 上的梯度应用更新。

什么是支持 TorchScript 的分布式优化器?¶

分布式优化器广泛应用于分布式模型并行训练,在一些常见用例中,出于性能考虑和资源利用率,训练需要以多线程方式而不是多进程方式进行(或者至少是部分多线程,例如参数服务器托管模型和参数的一部分,每个请求由新线程更新参数)。PyTorch 本身不原生支持多线程训练,因为它受到 Python 全局解释器锁 (GIL) 的影响,但它可以利用 TorchScript 来摆脱 GIL 并以多线程方式运行模型。

对于关键模型训练工作负载,提高训练性能是一个重要课题。研究人员通常希望通过图表示(即通过算子融合)实现不同的优化策略,或实现自定义算子核以加速训练。

支持 TorchScript 的分布式优化器可以帮助摆脱 GIL,从而提高 PyTorch 在多线程环境中的训练性能,它还释放了利用 TorchScript 提供的先进编译器技术(即 CPU/GPU 融合)进一步提升性能的潜力。

如何编写支持 TorchScript 的自定义分布式优化器?¶

下面的代码展示了如何在现有本地优化器实现的基础上编写自定义分布式优化器,从而解锁 TorchScript 的优势,包括消除 GIL 和提高性能的机会。

假设你已经有一个在训练期间当前使用的本地优化器,在此示例中,我们将使用拟双曲动量 (QHM) 作为例子来展示如何启用 TorchScript 支持,请注意,这也适用于任何继承自 torch.optim.Optimizer 的自定义优化器。

首先,我们需要将计算和状态管理与优化器实现分开,这样我们就可以提取计算部分并使其成为一个自由函数,这对于 TorchScript 是友好的。这有两个好处:1. 计算逻辑更容易检查,它允许我们快速将参数更新/计算部分转换为 TorchScript,并利用 TorchScript IR 进行进一步优化(算子融合等)。2. 分布式优化器底层使用不同的机制来获取梯度和更新参数(我们单独存储梯度,而不是在反向传播期间直接填充 param.grad 字段)。将计算分离出来使得分布式优化器能够在多线程模式下进行优化器更新,因为它消除了对 param.grad 可能存在的竞争条件。

import torch
from torch import Tensor
from typing import List


def qhm_update(params: List[Tensor],
            dp_list: List[Tensor],
            momentum_buffer_list: List[Tensor],
            lr: float,
            nu: float,
            weight_decay: float,
            weight_decay_type: str,
            momentum: float):

    for p, d_p, momentum_buffer in zip(params, dp_list, momentum_buffer_list):
        if weight_decay != 0:
            if weight_decay_type == "grad":
                d_p.add_(weight_decay, p)
            elif weight_decay_type == "direct":
                p.mul_(1.0 - lr * weight_decay)
            else:
                raise ValueError("Invalid weight decay type provided")

        momentum_buffer.mul_(momentum).add_(1.0 - momentum, d_p)

        p.data.add_(-lr * nu, momentum_buffer)
        p.data.add_(-lr * (1.0 - nu), d_p)

接下来,我们将定义一个具有 TorchScript 兼容性的分布式函数式优化器,用于管理优化器状态并调用我们上面定义的 TorchScript 兼容更新函数。请注意,一些约定与普通自定义优化器不同:1. 我们不继承 torch.optim.Optimizer,因为 TorchScript 不支持多态。2. step 接受梯度列表而不是损失闭包。

import torch
from torch import Tensor
from typing import List, Optional, Dict

# define this as a TorchScript class
@torch.jit.script
class FunctionalQHM(object):
    def __init__(self,
                params: List[Tensor],
                lr: float,
                momentum: float,
                nu: float,
                weight_decay: float = 0.0,
                weight_decay_type: str = "grad"):
        if lr < 0.0:
            raise ValueError("Invalid learning rate: {}".format(lr))
        if momentum < 0.0:
            raise ValueError("Invalid momentum value: {}".format(momentum))
        if weight_decay < 0.0:
            raise ValueError("Invalid weight_decay value: {}".format(weight_decay))
        if weight_decay_type not in ("grad", "direct"):
            raise ValueError("Invalid weight_decay_type value: {}".format(weight_decay_type))

        self.defaults = {
            "lr": lr,
            "momentum": momentum,
            "nu": nu,
            "weight_decay": weight_decay,
        }
        self.weight_decay_type = weight_decay_type

        # NOTE: we only have one param_group here and don't allow user to add additional
        # param group as it's not a common use case.
        self.param_group = {"params": params}

        self.state = torch.jit.annotate(Dict[torch.Tensor, Dict[str, torch.Tensor]], {})

    def step(self, gradients: List[Optional[Tensor]]):
        params = self.param_group['params']
        params_with_grad = []
        grads = []
        momentum_buffer_list: List[Tensor] = []

        if len(params) != len(gradients):
            raise ValueError(
                "the gradients passed in does not equal to the size of the parameters!"
                + f"Params length: {len(params)}. "
                + f"Gradients length: {len(gradients)}"
            )

        for param, gradient in zip(self.param_group['params'], gradients):
            if gradient is not None:
                params_with_grad.append(param)
                grads.append(gradient)
                state = self.state[param]
                state['momentum_buffer'] = torch.zeros_like(param, memory_format=torch.preserve_format)
                momentum_buffer_list.append(state['momentum_buffer'])

        # calls into the update function we just defined
        with torch.no_grad():
            qhm_update(params_with_grad,
                    grads,
                    momentum_buffer_list,
                    self.defaults['lr'],
                    self.defaults['nu'],
                    self.defaults['weight_decay'],
                    self.weight_decay_type,
                    self.defaults['momentum'])

最后,我们将新定义的分布式函数式优化器注册到 functional_optim_map 中。这样,DistributedOptimizer 将尝试使用我们的自定义实现,而不是预定义的默认实现。

from torch.distributed.optim import DistributedOptimizer

DistributedOptimizer.functional_optim_map[QHM] = FunctionalQHM

现在,你可以在分布式训练中像往常一样使用 QHM 优化器,将其传递给 DistributedOptimizer

...
remote_params_list = [...]
dist_optim = DistributedOptimizer(
    QHM, remote_params_list, *args, **kwargs
)

DistributedOptimizer 会在底层自动将 QHM 优化器转换为 FunctionalQHM,并启用 TorchScript 支持。这将释放由多线程训练带来的性能提升,也为进一步改进(例如 TorchScript 融合等)提供了更多潜力。

请注意,大多数 PyTorch 内置优化器已经使用这种方法来加速分布式训练。如果你看到关于某些优化器尚未转换的警告,可以按照本 recipe 编写自己的转换。

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取面向初学者和高级开发者的深度教程

查看教程

资源

查找开发资源并获得问题解答

查看资源