设备到设备直接通信,使用 TensorPipe CUDA RPC¶
创建于:2021 年 3 月 19 日 | 最后更新:2021 年 3 月 19 日 | 最后验证:2024 年 11 月 5 日
注意
设备到设备直接 RPC (CUDA RPC) 作为原型功能在 PyTorch 1.8 中引入。此 API 可能随时更改。
在本秘籍中,你将学到
CUDA RPC 的高层概念。
如何使用 CUDA RPC。
要求¶
PyTorch 1.8 及更高版本
什么是 CUDA RPC?¶
CUDA RPC 支持直接将张量从本地 CUDA 显存发送到远程 CUDA 显存。在 v1.8 版本之前,PyTorch RPC 只接受 CPU 张量。因此,当应用程序需要通过 RPC 发送 CUDA 张量时,必须先在调用端将张量移动到 CPU,通过 RPC 发送,然后在被调用端将其移动到目标设备,这会导致不必要的同步以及 D2H 和 H2D 复制。从 v1.8 开始,RPC 允许用户使用 set_device_map API 配置每个进程的全局设备映射,指定如何将本地设备映射到远程设备。更具体地说,如果 worker0
的设备映射包含条目 "worker1" : {"cuda:0" : "cuda:1"}
,则来自 worker0
的所有位于 "cuda:0"
上的 RPC 参数都将直接发送到 worker1
上的 "cuda:1"
。RPC 的响应将使用调用方设备映射的逆映射,即如果 worker1
返回位于 "cuda:1"
上的张量,它将直接发送到 worker0
上的 "cuda:0"
。所有预期的设备到设备直接通信都必须在每个进程的设备映射中指定。否则,只允许使用 CPU 张量。
在底层,PyTorch RPC 依赖 TensorPipe 作为通信后端。PyTorch RPC 从每个请求或响应中提取所有张量到一个列表中,并将其他所有内容打包成二进制负载。然后,TensorPipe 将根据张量设备类型以及调用方和被调用方上的通道可用性,自动为每个张量选择一个通信通道。现有的 TensorPipe 通道包括 NVLink、InfiniBand、SHM、CMA、TCP 等。
如何使用 CUDA RPC?¶
下面的代码展示了如何使用 CUDA RPC。模型包含两个线性层,并被分成两个分片。这两个分片分别放置在 worker0
和 worker1
上,其中 worker0
作为主节点驱动前向和后向传播。注意,我们有意跳过了 DistributedOptimizer,以突出使用 CUDA RPC 时的性能提升。实验重复进行前向和后向传播 10 次,并测量总执行时间。它将使用 CUDA RPC 与手动中转到 CPU 内存并使用 CPU RPC 的方法进行比较。
import torch
import torch.distributed.autograd as autograd
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import torch.nn as nn
import os
import time
class MyModule(nn.Module):
def __init__(self, device, comm_mode):
super().__init__()
self.device = device
self.linear = nn.Linear(1000, 1000).to(device)
self.comm_mode = comm_mode
def forward(self, x):
# x.to() is a no-op if x is already on self.device
y = self.linear(x.to(self.device))
return y.cpu() if self.comm_mode == "cpu" else y
def parameter_rrefs(self):
return [rpc.RRef(p) for p in self.parameters()]
def measure(comm_mode):
# local module on "worker0/cuda:0"
lm = MyModule("cuda:0", comm_mode)
# remote module on "worker1/cuda:1"
rm = rpc.remote("worker1", MyModule, args=("cuda:1", comm_mode))
# prepare random inputs
x = torch.randn(1000, 1000).cuda(0)
tik = time.time()
for _ in range(10):
with autograd.context() as ctx:
y = rm.rpc_sync().forward(lm(x))
autograd.backward(ctx, [y.sum()])
# synchronize on "cuda:0" to make sure that all pending CUDA ops are
# included in the measurements
torch.cuda.current_stream("cuda:0").synchronize()
tok = time.time()
print(f"{comm_mode} RPC total execution time: {tok - tik}")
def run_worker(rank):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=128)
if rank == 0:
options.set_device_map("worker1", {0: 1})
rpc.init_rpc(
f"worker{rank}",
rank=rank,
world_size=2,
rpc_backend_options=options
)
measure(comm_mode="cpu")
measure(comm_mode="cuda")
else:
rpc.init_rpc(
f"worker{rank}",
rank=rank,
world_size=2,
rpc_backend_options=options
)
# block until all rpcs finish
rpc.shutdown()
if __name__=="__main__":
world_size = 2
mp.spawn(run_worker, nprocs=world_size, join=True)
输出如下所示,这表明在本实验中,与 CPU RPC 相比,CUDA RPC 可以帮助实现 34 倍的加速。
cpu RPC total execution time: 2.3145179748535156 Seconds
cuda RPC total execution time: 0.06867480278015137 Seconds