• 教程 >
  • 使用 RPC 的分布式管道并行
快捷方式

使用 RPC 的分布式管道并行

作者: 沈立

注意

editgithub 上查看和编辑本教程。

先决条件

本教程使用 Resnet50 模型来演示使用 torch.distributed.rpc API 实现分布式管道并行。这可以被视为在 单机模型并行最佳实践 中讨论的多 GPU 管道并行的分布式对应物。

注意

本教程需要 PyTorch v1.6.0 或更高版本。

注意

本教程的完整源代码可以在 pytorch/examples 中找到。

基础知识

上一个教程 分布式 RPC 框架入门 展示了如何使用 torch.distributed.rpc 为 RNN 模型实现分布式模型并行。该教程使用一个 GPU 来托管 EmbeddingTable,提供的代码运行良好。但是,如果模型存在于多个 GPU 上,则需要采取一些额外的步骤来提高所有 GPU 的摊销利用率。管道并行是一种可以帮助解决此问题的范式。

在本教程中,我们以 ResNet50 作为示例模型,该模型也由 单机模型并行最佳实践 教程使用。类似地,ResNet50 模型被划分为两个分片,输入批次被划分为多个子批次,并以流水线方式输入到两个模型分片中。不同之处在于,该教程不是使用 CUDA 流来并行化执行,而是调用异步 RPC。因此,本教程中提出的解决方案也适用于跨机器边界的情况。本教程的其余部分将分四个步骤介绍实现过程。

步骤 1:分割 ResNet50 模型

这是准备步骤,它将 ResNet50 实现为两个模型分片。以下代码借鉴自 torchvision 中的 ResNet 实现ResNetBase 模块包含两个 ResNet 分片的公共构建块和属性。

import threading

import torch
import torch.nn as nn

from torchvision.models.resnet import Bottleneck

num_classes = 1000


def conv1x1(in_planes, out_planes, stride=1):
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)


class ResNetBase(nn.Module):
    def __init__(self, block, inplanes, num_classes=1000,
                groups=1, width_per_group=64, norm_layer=None):
        super(ResNetBase, self).__init__()

        self._lock = threading.Lock()
        self._block = block
        self._norm_layer = nn.BatchNorm2d
        self.inplanes = inplanes
        self.dilation = 1
        self.groups = groups
        self.base_width = width_per_group

    def _make_layer(self, planes, blocks, stride=1):
        norm_layer = self._norm_layer
        downsample = None
        previous_dilation = self.dilation
        if stride != 1 or self.inplanes != planes * self._block.expansion:
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * self._block.expansion, stride),
                norm_layer(planes * self._block.expansion),
            )

        layers = []
        layers.append(self._block(self.inplanes, planes, stride, downsample, self.groups,
                                self.base_width, previous_dilation, norm_layer))
        self.inplanes = planes * self._block.expansion
        for _ in range(1, blocks):
            layers.append(self._block(self.inplanes, planes, groups=self.groups,
                                    base_width=self.base_width, dilation=self.dilation,
                                    norm_layer=norm_layer))

        return nn.Sequential(*layers)

    def parameter_rrefs(self):
        return [RRef(p) for p in self.parameters()]

现在,我们准备定义两个模型分片。对于构造函数,我们只需将所有 ResNet50 层拆分为两部分,并将每个部分移动到提供的设备中。两个分片的 forward 函数都接受输入数据的 RRef,在本地获取数据,然后将其移动到预期设备。在将所有层应用于输入之后,它将输出移动到 CPU 并返回。这是因为 RPC API 要求张量驻留在 CPU 上,以避免在调用者和被调用者的设备数量不匹配时出现无效设备错误。

class ResNetShard1(ResNetBase):
    def __init__(self, device, *args, **kwargs):
        super(ResNetShard1, self).__init__(
            Bottleneck, 64, num_classes=num_classes, *args, **kwargs)

        self.device = device
        self.seq = nn.Sequential(
            nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False),
            self._norm_layer(self.inplanes),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            self._make_layer(64, 3),
            self._make_layer(128, 4, stride=2)
        ).to(self.device)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def forward(self, x_rref):
        x = x_rref.to_here().to(self.device)
        with self._lock:
            out =  self.seq(x)
        return out.cpu()


class ResNetShard2(ResNetBase):
    def __init__(self, device, *args, **kwargs):
        super(ResNetShard2, self).__init__(
            Bottleneck, 512, num_classes=num_classes, *args, **kwargs)

        self.device = device
        self.seq = nn.Sequential(
            self._make_layer(256, 6, stride=2),
            self._make_layer(512, 3, stride=2),
            nn.AdaptiveAvgPool2d((1, 1)),
        ).to(self.device)

        self.fc =  nn.Linear(512 * self._block.expansion, num_classes).to(self.device)

    def forward(self, x_rref):
        x = x_rref.to_here().to(self.device)
        with self._lock:
            out = self.fc(torch.flatten(self.seq(x), 1))
        return out.cpu()

步骤 2:将 ResNet50 模型分片缝合到一个模块中

然后,我们创建一个 DistResNet50 模块来组装两个分片并实现流水线并行逻辑。在构造函数中,我们使用两个 rpc.remote 调用分别将两个分片放到两个不同的 RPC 工作器上,并持有对两个模型部分的 RRef 的引用,以便可以在前向传递中引用它们。 forward 函数将输入批次拆分为多个微批次,并以流水线方式将这些微批次馈送到两个模型部分。它首先使用 rpc.remote 调用将第一个分片应用于一个微批次,然后将返回的中间输出 RRef 传递到第二个模型分片。之后,它收集所有微输出的 Future,并在循环结束后等待它们全部完成。请注意,remote()rpc_async() 都立即返回并异步运行。因此,整个循环是非阻塞的,并将同时启动多个 RPC。一个微批次在两个模型部分上的执行顺序由中间输出 y_rref 保留。跨微批次的执行顺序无关紧要。最后,forward 函数将所有微批次的输出连接到一个单个输出张量中并返回。 parameter_rrefs 函数是一个辅助函数,用于简化分布式优化器构造,稍后将使用它。

class DistResNet50(nn.Module):
    def __init__(self, num_split, workers, *args, **kwargs):
        super(DistResNet50, self).__init__()

        self.num_split = num_split

        # Put the first part of the ResNet50 on workers[0]
        self.p1_rref = rpc.remote(
            workers[0],
            ResNetShard1,
            args = ("cuda:0",) + args,
            kwargs = kwargs
        )

        # Put the second part of the ResNet50 on workers[1]
        self.p2_rref = rpc.remote(
            workers[1],
            ResNetShard2,
            args = ("cuda:1",) + args,
            kwargs = kwargs
        )

    def forward(self, xs):
        out_futures = []
        for x in iter(xs.split(self.num_split, dim=0)):
            x_rref = RRef(x)
            y_rref = self.p1_rref.remote().forward(x_rref)
            z_fut = self.p2_rref.rpc_async().forward(y_rref)
            out_futures.append(z_fut)

        return torch.cat(torch.futures.wait_all(out_futures))

    def parameter_rrefs(self):
        remote_params = []
        remote_params.extend(self.p1_rref.remote().parameter_rrefs().to_here())
        remote_params.extend(self.p2_rref.remote().parameter_rrefs().to_here())
        return remote_params

步骤 3:定义训练循环

定义完模型后,我们来实现训练循环。我们使用一个专用的“主”工作器来准备随机输入和标签,并控制分布式反向传播和分布式优化器步骤。它首先创建一个 DistResNet50 模块的实例。它指定每个批次的微批次数,并提供两个 RPC 工作器的名称(即,“worker1”和“worker2”)。然后,它定义损失函数并使用 parameter_rrefs() 辅助函数创建 DistributedOptimizer,以获取参数 RRefs 列表。然后,主训练循环与常规本地训练非常相似,只是它使用 dist_autograd 启动反向传播,并为反向传播和优化器 step() 提供 context_id

import torch.distributed.autograd as dist_autograd
import torch.optim as optim
from torch.distributed.optim import DistributedOptimizer

num_batches = 3
batch_size = 120
image_w = 128
image_h = 128


def run_master(num_split):
    # put the two model parts on worker1 and worker2 respectively
    model = DistResNet50(num_split, ["worker1", "worker2"])
    loss_fn = nn.MSELoss()
    opt = DistributedOptimizer(
        optim.SGD,
        model.parameter_rrefs(),
        lr=0.05,
    )

    one_hot_indices = torch.LongTensor(batch_size) \
                        .random_(0, num_classes) \
                        .view(batch_size, 1)

    for i in range(num_batches):
        print(f"Processing batch {i}")
        # generate random inputs and labels
        inputs = torch.randn(batch_size, 3, image_w, image_h)
        labels = torch.zeros(batch_size, num_classes) \
                    .scatter_(1, one_hot_indices, 1)

        with dist_autograd.context() as context_id:
            outputs = model(inputs)
            dist_autograd.backward(context_id, [loss_fn(outputs, labels)])
            opt.step(context_id)

步骤 4:启动 RPC 进程

最后,以下代码显示了所有进程的目标函数。主要逻辑定义在 run_master 中。工作器被动地等待主机的命令,因此只需运行 init_rpcshutdown,其中 shutdown 默认情况下会阻塞,直到所有 RPC 参与者完成。

import os
import time

import torch.multiprocessing as mp


def run_worker(rank, world_size, num_split):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=128)

    if rank == 0:
        rpc.init_rpc(
            "master",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=options
        )
        run_master(num_split)
    else:
        rpc.init_rpc(
            f"worker{rank}",
            rank=rank,
            world_size=world_size,
            rpc_backend_options=options
        )
        pass

    # block until all rpcs finish
    rpc.shutdown()


if __name__=="__main__":
    world_size = 3
    for num_split in [1, 2, 4, 8]:
        tik = time.time()
        mp.spawn(run_worker, args=(world_size, num_split), nprocs=world_size, join=True)
        tok = time.time()
        print(f"number of splits = {num_split}, execution time = {tok - tik}")

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取针对初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源