使用分布式 RPC 框架实现参数服务器¶
创建日期:2020 年 4 月 6 日 | 最后更新:2024 年 5 月 7 日 | 最后验证:未验证
作者:Rohan Varma
注意
在 github 上查看和编辑此教程。
先决条件
本教程将通过一个简单的例子,讲解如何使用 PyTorch 的分布式 RPC 框架实现参数服务器。参数服务器框架是一种范式,其中一组服务器存储参数(例如大型嵌入表),而多个训练器查询参数服务器以检索最新的参数。这些训练器可以在本地运行训练循环,并偶尔与参数服务器同步以获取最新参数。有关参数服务器方法的更多信息,请参阅这篇论文。
使用分布式 RPC 框架,我们将构建一个示例,其中多个训练器使用 RPC 与同一参数服务器通信,并使用RRef访问远程参数服务器实例上的状态。每个训练器都将通过使用分布式自动微分将自动微分图连接到多个节点,从而以分布式方式启动其专用的反向传播。
注意:本教程涵盖了分布式 RPC 框架的使用,这对于将模型拆分到多台机器上,或实现参数服务器训练策略(其中网络训练器获取托管在不同机器上的参数)非常有用。如果你正在寻找在多个 GPU 上复制模型的方法,请参阅分布式数据并行教程。还有一个关于强化学习和 RNN 用例的RPC 教程。
首先从熟悉的内容开始:导入所需的模块并定义一个简单的 ConvNet,它将在 MNIST 数据集上进行训练。以下网络主要采用自pytorch/examples 仓库中定义的网络。
import argparse
import os
import time
from threading import Lock
import torch
import torch.distributed.autograd as dist_autograd
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.distributed.optim import DistributedOptimizer
from torchvision import datasets, transforms
# --------- MNIST Network to train, from pytorch/examples -----
class Net(nn.Module):
def __init__(self, num_gpus=0):
super(Net, self).__init__()
print(f"Using {num_gpus} GPUs to train")
self.num_gpus = num_gpus
device = torch.device(
"cuda:0" if torch.cuda.is_available() and self.num_gpus > 0 else "cpu")
print(f"Putting first 2 convs on {str(device)}")
# Put conv layers on the first cuda device, or CPU if no cuda device
self.conv1 = nn.Conv2d(1, 32, 3, 1).to(device)
self.conv2 = nn.Conv2d(32, 64, 3, 1).to(device)
# Put rest of the network on the 2nd cuda device, if there is one
if "cuda" in str(device) and num_gpus > 1:
device = torch.device("cuda:1")
print(f"Putting rest of layers on {str(device)}")
self.dropout1 = nn.Dropout2d(0.25).to(device)
self.dropout2 = nn.Dropout2d(0.5).to(device)
self.fc1 = nn.Linear(9216, 128).to(device)
self.fc2 = nn.Linear(128, 10).to(device)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.conv2(x)
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
# Move tensor to next device if necessary
next_device = next(self.fc1.parameters()).device
x = x.to(next_device)
x = self.fc1(x)
x = F.relu(x)
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output
接下来,我们将定义一些对脚本其余部分有用的辅助函数。以下使用rpc_sync和RRef来定义一个函数,该函数在远程节点上调用给定对象的方法。下面,远程对象的句柄由rref
参数给出,我们将其在其所属节点上运行:rref.owner()
。在调用节点上,我们通过使用rpc_sync
同步运行此命令,这意味着我们将阻塞直到收到响应。
# --------- Helper Methods --------------------
# On the local node, call a method with first arg as the value held by the
# RRef. Other args are passed in as arguments to the function called.
# Useful for calling instance methods. method could be any matching function, including
# class methods.
def call_method(method, rref, *args, **kwargs):
return method(rref.local_value(), *args, **kwargs)
# Given an RRef, return the result of calling the passed in method on the value
# held by the RRef. This call is done on the remote node that owns
# the RRef and passes along the given argument.
# Example: If the value held by the RRef is of type Foo, then
# remote_method(Foo.bar, rref, arg1, arg2) is equivalent to calling
# <foo_instance>.bar(arg1, arg2) on the remote node and getting the result
# back.
def remote_method(method, rref, *args, **kwargs):
args = [method, rref] + list(args)
return rpc.rpc_sync(rref.owner(), call_method, args=args, kwargs=kwargs)
现在,我们准备定义参数服务器。我们将继承自nn.Module
,并保存我们上面定义的网络的句柄。我们还将保存一个输入设备,该设备是将我们的输入在调用模型之前传输到的设备。
# --------- Parameter Server --------------------
class ParameterServer(nn.Module):
def __init__(self, num_gpus=0):
super().__init__()
model = Net(num_gpus=num_gpus)
self.model = model
self.input_device = torch.device(
"cuda:0" if torch.cuda.is_available() and num_gpus > 0 else "cpu")
接下来,我们将定义一些用于训练和验证目的的杂项函数。第一个函数 get_dist_gradients
将接受一个分布式自动微分上下文 ID,并调用 dist_autograd.get_gradients
API 以检索由分布式自动微分计算的梯度。更多信息可以在分布式自动微分文档中找到。请注意,我们还遍历结果字典并将每个张量转换为 CPU 张量,因为框架目前只支持通过 RPC 发送 CPU 张量。我们特意禁用了通过 RPC 发送 CUDA 张量,因为调用者/被调用者上可能存在不同的设备 (CPU/GPU),但在未来的版本中可能会支持此功能。接下来,get_param_rrefs
将遍历我们的模型参数,并将它们包装成一个(本地)RRef。此方法将由训练器节点通过 RPC 调用,并将返回一个需要优化的参数列表。这是分布式优化器的必需输入,它需要将所有需要优化的参数作为 RRef
列表。
class ParameterServer(nn.Module):
...
def forward(self, inp):
inp = inp.to(self.input_device)
out = self.model(inp)
# This output is forwarded over RPC, which as of 1.5.0 only accepts CPU tensors.
# Tensors must be moved in and out of GPU memory due to this.
out = out.to("cpu")
return out
接下来,我们将定义一些用于训练和验证目的的杂项函数。第一个函数 get_dist_gradients
将接受一个分布式自动微分上下文 ID,并调用 dist_autograd.get_gradients
API 以检索由分布式自动微分计算的梯度。更多信息可以在分布式自动微分文档中找到。请注意,我们还遍历结果字典并将每个张量转换为 CPU 张量,因为框架目前只支持通过 RPC 发送张量。我们特意禁用了通过 RPC 发送 CUDA 张量,因为调用者/被调用者上可能存在不同的设备 (CPU/GPU),但在未来的版本中可能会支持此功能。接下来,get_param_rrefs
将遍历我们的模型参数,并将它们包装成一个(本地)RRef。此方法将由训练器节点通过 RPC 调用,并将返回一个需要优化的参数列表。这是分布式优化器的必需输入,它需要将所有需要优化的参数作为 RRef
列表。
# Use dist autograd to retrieve gradients accumulated for this model.
# Primarily used for verification.
def get_dist_gradients(self, cid):
grads = dist_autograd.get_gradients(cid)
# This output is forwarded over RPC, which as of 1.5.0 only accepts CPU tensors.
# Tensors must be moved in and out of GPU memory due to this.
cpu_grads = {}
for k, v in grads.items():
k_cpu, v_cpu = k.to("cpu"), v.to("cpu")
cpu_grads[k_cpu] = v_cpu
return cpu_grads
# Wrap local parameters in a RRef. Needed for building the
# DistributedOptimizer which optimizes paramters remotely.
def get_param_rrefs(self):
param_rrefs = [rpc.RRef(param) for param in self.model.parameters()]
return param_rrefs
最后,我们将创建初始化参数服务器的方法。请注意,所有进程中只会有一个参数服务器实例,并且所有训练器都将与同一个参数服务器通信并更新相同的存储模型。如 run_parameter_server
所示,服务器本身不执行任何独立操作;它等待来自训练器(尚未定义)的请求,并通过运行请求的函数进行响应。
# The global parameter server instance.
param_server = None
# A lock to ensure we only have one parameter server.
global_lock = Lock()
def get_parameter_server(num_gpus=0):
"""
Returns a singleton parameter server to all trainer processes
"""
global param_server
# Ensure that we get only one handle to the ParameterServer.
with global_lock:
if not param_server:
# construct it once
param_server = ParameterServer(num_gpus=num_gpus)
return param_server
def run_parameter_server(rank, world_size):
# The parameter server just acts as a host for the model and responds to
# requests from trainers.
# rpc.shutdown() will wait for all workers to complete by default, which
# in this case means that the parameter server will wait for all trainers
# to complete, and then exit.
print("PS master initializing RPC")
rpc.init_rpc(name="parameter_server", rank=rank, world_size=world_size)
print("RPC initialized! Running parameter server...")
rpc.shutdown()
print("RPC shutdown on parameter server.")
请注意,上面提到的 rpc.shutdown()
不会立即关闭参数服务器。相反,它会等待所有工作节点(在本例中为训练器)也调用 rpc.shutdown()
。这确保了参数服务器不会在所有训练器(尚未定义)完成其训练过程之前离线。
接下来,我们将定义 TrainerNet
类。它也将是 nn.Module
的子类,并且我们的 __init__
方法将使用 rpc.remote
API 获取一个 RRef
(即远程引用)到我们的参数服务器。请注意,在这里我们不是将参数服务器复制到我们的本地进程,而是可以将 self.param_server_rref
视为一个分布式共享指针,指向位于另一个进程上的参数服务器。
# --------- Trainers --------------------
# nn.Module corresponding to the network trained by this trainer. The
# forward() method simply invokes the network on the given parameter
# server.
class TrainerNet(nn.Module):
def __init__(self, num_gpus=0):
super().__init__()
self.num_gpus = num_gpus
self.param_server_rref = rpc.remote(
"parameter_server", get_parameter_server, args=(num_gpus,))
接下来,我们将定义一个名为 get_global_param_rrefs
的方法。为了解释这个方法的需求,值得阅读DistributedOptimizer的文档,特别是 API 签名。优化器必须传入一个 RRef
列表,对应于需要优化的远程参数,所以这里我们获取必要的 RRef
。由于给定 TrainerNet
交互的唯一远程工作节点是 ParameterServer
,我们只需在 ParameterServer
上调用 remote_method
。我们使用在 ParameterServer
类中定义的 get_param_rrefs
方法。此方法将返回一个需要优化的参数的 RRef
列表。请注意,在这种情况下,我们的 TrainerNet
不定义自己的参数;如果定义了,我们也需要将每个参数包装在 RRef
中,并将其包含在我们对 DistributedOptimizer
的输入中。
class TrainerNet(nn.Module):
...
def get_global_param_rrefs(self):
remote_params = remote_method(
ParameterServer.get_param_rrefs,
self.param_server_rref)
return remote_params
现在,我们准备定义我们的 forward
方法,它将调用(同步)RPC 来运行在 ParameterServer
上定义的网络的前向传播。请注意,我们将 self.param_server_rref
(它是我们 ParameterServer
的远程句柄)传递给我们的 RPC 调用。这个调用将向运行 ParameterServer
的节点发送一个 RPC,调用 forward
传递,并返回与模型输出对应的 Tensor
。
class TrainerNet(nn.Module):
...
def forward(self, x):
model_output = remote_method(
ParameterServer.forward, self.param_server_rref, x)
return model_output
定义好训练器后,现在是时候编写我们的神经网络训练循环了,该循环将创建我们的网络和优化器,运行一些输入通过网络并计算损失。训练循环看起来很像本地训练程序,但由于我们的网络分布在多台机器上而有所修改。
下面,我们初始化 TrainerNet
并构建 DistributedOptimizer
。注意,如上所述,我们必须传入所有全局(跨所有参与分布式训练的节点)需要优化的参数。此外,我们还传入要使用的本地优化器,在本例中为 SGD。注意,我们可以像创建本地优化器一样配置底层优化器算法 - optimizer.SGD
的所有参数都将正确转发。例如,我们传入一个自定义学习率,它将用作所有本地优化器的学习率。
def run_training_loop(rank, num_gpus, train_loader, test_loader):
# Runs the typical nueral network forward + backward + optimizer step, but
# in a distributed fashion.
net = TrainerNet(num_gpus=num_gpus)
# Build DistributedOptimizer.
param_rrefs = net.get_global_param_rrefs()
opt = DistributedOptimizer(optim.SGD, param_rrefs, lr=0.03)
接下来,我们定义主要的训练循环。我们遍历由 PyTorch 的DataLoader提供的可迭代对象。在编写典型的 forward/backward/optimizer 循环之前,我们首先将逻辑包装在分布式自动微分上下文中。请注意,这是记录模型前向传播中调用的 RPC 所必需的,以便可以构建一个包含所有参与分布式工作节点的适当图,用于反向传播。分布式自动微分上下文返回一个 context_id
,它用作累积和优化与特定迭代相对应的梯度的标识符。
与调用通常的 loss.backward()
(它将在本地工作节点上启动反向传播)不同,我们调用 dist_autograd.backward()
并传入我们的 context_id
以及 loss
,后者是我们要开始反向传播的根节点。此外,我们将此 context_id
传入我们的优化器调用中,这是查找此特定反向传播在所有节点上计算出的相应梯度所必需的。
def run_training_loop(rank, num_gpus, train_loader, test_loader):
...
for i, (data, target) in enumerate(train_loader):
with dist_autograd.context() as cid:
model_output = net(data)
target = target.to(model_output.device)
loss = F.nll_loss(model_output, target)
if i % 5 == 0:
print(f"Rank {rank} training batch {i} loss {loss.item()}")
dist_autograd.backward(cid, [loss])
# Ensure that dist autograd ran successfully and gradients were
# returned.
assert remote_method(
ParameterServer.get_dist_gradients,
net.param_server_rref,
cid) != {}
opt.step(cid)
print("Training complete!")
print("Getting accuracy....")
get_accuracy(test_loader, net)
接下来,我们定义我们的 forward
方法,它将调用(同步)RPC 来运行在 ParameterServer
上定义的网络的前向传播。请注意,我们将 self.param_server_rref
(它是我们 ParameterServer
的远程句柄)传递给我们的 RPC 调用。这个调用将向运行 ParameterServer
的节点发送一个 RPC,调用 forward
传递,并返回与模型输出对应的 Tensor
。
def get_accuracy(test_loader, model):
model.eval()
correct_sum = 0
# Use GPU to evaluate if possible
device = torch.device("cuda:0" if model.num_gpus > 0
and torch.cuda.is_available() else "cpu")
with torch.no_grad():
for i, (data, target) in enumerate(test_loader):
out = model(data, -1)
pred = out.argmax(dim=1, keepdim=True)
pred, target = pred.to(device), target.to(device)
correct = pred.eq(target.view_as(pred)).sum().item()
correct_sum += correct
print(f"Accuracy {correct_sum / len(test_loader.dataset)}")
接下来,我们将定义我们的 forward
方法,它将调用(同步)RPC 来运行在 ParameterServer
上定义的网络的前向传播。请注意,我们将 self.param_server_rref
(它是我们 ParameterServer
的远程句柄)传递给我们的 RPC 调用。这个调用将向运行 ParameterServer
的节点发送一个 RPC,调用 forward
传递,并返回与模型输出对应的 Tensor
。
# Main loop for trainers.
def run_worker(rank, world_size, num_gpus, train_loader, test_loader):
print(f"Worker rank {rank} initializing RPC")
rpc.init_rpc(
name=f"trainer_{rank}",
rank=rank,
world_size=world_size)
print(f"Worker {rank} done initializing RPC")
run_training_loop(rank, num_gpus, train_loader, test_loader)
rpc.shutdown()
注意,与 run_parameter_server
类似,rpc.shutdown()
在此节点退出之前,默认会等待所有工作节点(训练器和 ParameterServer)都调用 rpc.shutdown()
。这确保了节点能够优雅地终止,并且在其他节点期望其在线时不会离线。
现在我们已经完成了训练器和参数服务器的特定代码,剩下要做的就是添加用于启动训练器和参数服务器的代码。首先,我们必须接收适用于我们的参数服务器和训练器的各种参数。world_size
对应于将参与训练的节点总数,是所有训练器和参数服务器的总和。我们还必须为每个单独的进程传入一个唯一的 rank
,范围从 0(我们运行单个参数服务器的位置)到 world_size - 1
。master_addr
和 master_port
是可用于标识 rank 0 进程运行位置的参数,并将由各个节点用于相互发现。要在本地测试此示例,只需向所有生成的实例传入 localhost
和相同的 master_port
。请注意,为了演示目的,此示例仅支持 0-2 个 GPU,尽管可以将模式扩展以利用其他 GPU。
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="Parameter-Server RPC based training")
parser.add_argument(
"--world_size",
type=int,
default=4,
help="""Total number of participating processes. Should be the sum of
master node and all training nodes.""")
parser.add_argument(
"--rank",
type=int,
default=None,
help="Global rank of this process. Pass in 0 for master.")
parser.add_argument(
"--num_gpus",
type=int,
default=0,
help="""Number of GPUs to use for training, Currently supports between 0
and 2 GPUs. Note that this argument will be passed to the parameter servers.""")
parser.add_argument(
"--master_addr",
type=str,
default="localhost",
help="""Address of master, will default to localhost if not provided.
Master must be able to accept network traffic on the address + port.""")
parser.add_argument(
"--master_port",
type=str,
default="29500",
help="""Port that master is listening on, will default to 29500 if not
provided. Master must be able to accept network traffic on the host and port.""")
args = parser.parse_args()
assert args.rank is not None, "must provide rank argument."
assert args.num_gpus <= 3, f"Only 0-2 GPUs currently supported (got {args.num_gpus})."
os.environ['MASTER_ADDR'] = args.master_addr
os.environ["MASTER_PORT"] = args.master_port
现在,我们将根据命令行参数创建一个对应于参数服务器或训练器的进程。如果传入的 rank 为 0,我们将创建一个 ParameterServer
,否则创建一个 TrainerNet
。请注意,我们正在使用 torch.multiprocessing
来启动一个对应于我们希望执行的函数的子进程,并通过 p.join()
从主线程等待此进程完成。在初始化训练器的情况下,我们还使用 PyTorch 的dataloaders来指定 MNIST 数据集上的训练和测试数据加载器。
processes = []
world_size = args.world_size
if args.rank == 0:
p = mp.Process(target=run_parameter_server, args=(0, world_size))
p.start()
processes.append(p)
else:
# Get data to train on
train_loader = torch.utils.data.DataLoader(
datasets.MNIST('../data', train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=32, shuffle=True,)
test_loader = torch.utils.data.DataLoader(
datasets.MNIST(
'../data',
train=False,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=32,
shuffle=True,
)
# start training worker on this node
p = mp.Process(
target=run_worker,
args=(
args.rank,
world_size, args.num_gpus,
train_loader,
test_loader))
p.start()
processes.append(p)
for p in processes:
p.join()
要在本地运行该示例,请在不同的终端窗口中为服务器和每个您希望生成的 worker 运行以下命令:python rpc_parameter_server.py --world_size=WORLD_SIZE --rank=RANK
。例如,对于 world size 为 2 的主节点,命令将是 python rpc_parameter_server.py --world_size=2 --rank=0
。然后可以在单独的窗口中运行命令 python rpc_parameter_server.py --world_size=2 --rank=1
启动训练器,这将开始使用一个服务器和一个训练器进行训练。请注意,本教程假定训练使用 0 到 2 个 GPU,此参数可以通过在训练脚本中传入 --num_gpus=N
进行配置。
你可以传入命令行参数 --master_addr=ADDRESS
和 --master_port=PORT
来指定主 worker 监听的地址和端口,例如,测试训练器和主节点运行在不同机器上的功能。