快捷方式

使用 TensorPipe CUDA RPC 实现设备间直接通信

注意

设备间直接 RPC (CUDA RPC) 在 PyTorch 1.8 中作为原型功能引入。此 API 可能会发生更改。

在本食谱中,您将学习

  • CUDA RPC 的高级思想。

  • 如何使用 CUDA RPC。

要求

什么是 CUDA RPC?

CUDA RPC 支持直接将张量从本地 CUDA 内存发送到远程 CUDA 内存。在 v1.8 版本发布之前,PyTorch RPC 仅接受 CPU 张量。因此,当应用程序需要通过 RPC 发送 CUDA 张量时,它必须首先将张量移动到调用方的 CPU 上,通过 RPC 发送它,然后将其移动到被调用方目标设备上,这会产生不必要的同步以及 D2H 和 H2D 复制。从 v1.8 版本开始,RPC 允许用户使用 set_device_map API 配置每个进程的全局设备映射,指定如何将本地设备映射到远程设备。更具体地说,如果 worker0 的设备映射中有一项 "worker1" : {"cuda:0" : "cuda:1"},来自 worker0"cuda:0" 的所有 RPC 参数将直接发送到 worker1 上的 "cuda:1"。RPC 的响应将使用调用方设备映射的反向映射,即,如果 worker1"cuda:1" 上返回一个张量,它将直接发送到 worker0 上的 "cuda:0"。所有预期的设备到设备的直接通信必须在每个进程的设备映射中指定。否则,只允许 CPU 张量。

在底层,PyTorch RPC 依赖于 TensorPipe 作为通信后端。PyTorch RPC 将每个请求或响应中的所有张量提取到一个列表中,并将其他所有内容打包到一个二进制有效负载中。然后,TensorPipe 将根据张量设备类型以及调用方和被调用方上的通道可用性自动为每个张量选择一个通信通道。现有的 TensorPipe 通道涵盖 NVLink、InfiniBand、SHM、CMA、TCP 等。

如何使用 CUDA RPC?

下面的代码展示了如何使用 CUDA RPC。该模型包含两个线性层,并被分成两个分片。这两个分片分别放置在 worker0worker1 上,worker0 充当驱动前向和反向传递的主节点。请注意,我们有意跳过了 DistributedOptimizer 以突出显示使用 CUDA RPC 时的性能改进。实验重复进行前向和反向传递 10 次,并测量总执行时间。它将使用 CUDA RPC 与手动分阶段到 CPU 内存和使用 CPU RPC 进行比较。

import torch
import torch.distributed.autograd as autograd
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import torch.nn as nn

import os
import time


class MyModule(nn.Module):
    def __init__(self, device, comm_mode):
        super().__init__()
        self.device = device
        self.linear = nn.Linear(1000, 1000).to(device)
        self.comm_mode = comm_mode

    def forward(self, x):
        # x.to() is a no-op if x is already on self.device
        y = self.linear(x.to(self.device))
        return y.cpu() if self.comm_mode == "cpu" else y

    def parameter_rrefs(self):
        return [rpc.RRef(p) for p in self.parameters()]


def measure(comm_mode):
    # local module on "worker0/cuda:0"
    lm = MyModule("cuda:0", comm_mode)
    # remote module on "worker1/cuda:1"
    rm = rpc.remote("worker1", MyModule, args=("cuda:1", comm_mode))
    # prepare random inputs
    x = torch.randn(1000, 1000).cuda(0)

    tik = time.time()
    for _ in range(10):
        with autograd.context() as ctx:
            y = rm.rpc_sync().forward(lm(x))
            autograd.backward(ctx, [y.sum()])
    # synchronize on "cuda:0" to make sure that all pending CUDA ops are
    # included in the measurements
    torch.cuda.current_stream("cuda:0").synchronize()
    tok = time.time()
    print(f"{comm_mode} RPC total execution time: {tok - tik}")


def run_worker(rank):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=128)

    if rank == 0:
        options.set_device_map("worker1", {0: 1})
        rpc.init_rpc(
            f"worker{rank}",
            rank=rank,
            world_size=2,
            rpc_backend_options=options
        )
        measure(comm_mode="cpu")
        measure(comm_mode="cuda")
    else:
        rpc.init_rpc(
            f"worker{rank}",
            rank=rank,
            world_size=2,
            rpc_backend_options=options
        )

    # block until all rpcs finish
    rpc.shutdown()


if __name__=="__main__":
    world_size = 2
    mp.spawn(run_worker, nprocs=world_size, join=True)

输出显示在下面,表明在本次实验中,CUDA RPC 可以帮助实现比 CPU RPC 快 34 倍的速度提升。

cpu RPC total execution time: 2.3145179748535156 Seconds
cuda RPC total execution time: 0.06867480278015137 Seconds

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取适合初学者和高级开发者的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源