• 教程 >
  • 分布式 RPC 框架入门
快捷方式

分布式 RPC 框架入门

创建日期:2020 年 1 月 1 日 | 最后更新:2025 年 1 月 21 日 | 最后验证:2024 年 11 月 5 日

作者: Shen Li

注意

editgithub 上查看和编辑本教程。

先决条件

本教程使用两个简单的示例来演示如何使用 torch.distributed.rpc 包构建分布式训练,该包最初在 PyTorch v1.4 中作为实验性功能引入。这两个示例的源代码可在 PyTorch examples 中找到。

之前的教程,分布式数据并行 (DDP) 入门使用 PyTorch 编写分布式应用程序,描述了 DistributedDataParallel,它支持一种特定的训练范式,即模型在多个进程中复制,每个进程处理输入数据的一部分。有时,您可能会遇到需要不同训练范式的场景。例如

  1. 在强化学习中,从环境中获取训练数据的成本可能相对较高,而模型本身可能很小。在这种情况下,并行启动多个 Observer 并共享一个 Agent 可能很有用。在这种情况下,Agent 在本地负责训练,但应用程序仍然需要库来在 Observer 和 Trainer 之间发送和接收数据。

  2. 您的模型可能太大,无法容纳在单机的 GPU 中,因此需要一个库来帮助将模型拆分到多台机器上。或者您可能正在实现一个参数服务器 (parameter server) 训练框架,其中模型参数和 Trainer 位于不同的机器上。

torch.distributed.rpc 包可以帮助解决上述场景。在场景 1 中,RPCRRef 允许在 worker 之间发送数据,同时轻松引用远程数据对象。在场景 2 中,分布式 Autograd分布式 Optimizer 使执行反向传播和 Optimizer 步骤就像本地训练一样。在接下来的两个部分中,我们将使用一个强化学习示例和一个语言模型示例来演示 torch.distributed.rpc 的 API。请注意,本教程并非旨在构建最准确或最高效的模型来解决给定问题,相反,主要目标是展示如何使用 torch.distributed.rpc 包来构建分布式训练应用程序。

使用 RPC 和 RRef 进行分布式强化学习

本节介绍使用 RPC 构建一个玩具分布式强化学习模型以解决 OpenAI Gym 中的 CartPole-v1 的步骤。策略代码主要借鉴了现有的单线程示例,如下所示。我们将跳过 Policy 设计的细节,重点关注 RPC 的使用。

import torch.nn as nn
import torch.nn.functional as F

class Policy(nn.Module):

    def __init__(self):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(4, 128)
        self.dropout = nn.Dropout(p=0.6)
        self.affine2 = nn.Linear(128, 2)

    def forward(self, x):
        x = self.affine1(x)
        x = self.dropout(x)
        x = F.relu(x)
        action_scores = self.affine2(x)
        return F.softmax(action_scores, dim=1)

我们准备介绍 Observer。在此示例中,每个 Observer 创建自己的环境,并等待 Agent 的命令来运行一个 episode。在每个 episode 中,一个 Observer 最多循环 n_steps 次迭代,并且在每次迭代中,它使用 RPC 将其环境状态传递给 Agent 并获取一个动作。然后,它将该动作应用于其环境,并从环境中获取奖励和下一个状态。之后,Observer 使用另一个 RPC 将奖励报告给 Agent。再次请注意,这显然不是最高效的 Observer 实现。例如,一个简单的优化可以是将在一个 RPC 中打包当前状态和最后奖励,以减少通信开销。然而,目标是演示 RPC API,而不是为 CartPole 构建最佳求解器。因此,在此示例中,我们保持逻辑简单并明确这两个步骤。

import argparse
import gym
import torch.distributed.rpc as rpc

parser = argparse.ArgumentParser(
    description="RPC Reinforcement Learning Example",
    formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)

parser.add_argument('--world_size', default=2, type=int, metavar='W',
                    help='number of workers')
parser.add_argument('--log_interval', type=int, default=10, metavar='N',
                    help='interval between training status logs')
parser.add_argument('--gamma', type=float, default=0.99, metavar='G',
                    help='how much to value future rewards')
parser.add_argument('--seed', type=int, default=1, metavar='S',
                    help='random seed  for reproducibility')
args = parser.parse_args()

class Observer:

    def __init__(self):
        self.id = rpc.get_worker_info().id
        self.env = gym.make('CartPole-v1')
        self.env.seed(args.seed)

    def run_episode(self, agent_rref):
        state, ep_reward = self.env.reset(), 0
        for _ in range(10000):
            # send the state to the agent to get an action
            action = agent_rref.rpc_sync().select_action(self.id, state)

            # apply the action to the environment, and get the reward
            state, reward, done, _ = self.env.step(action)

            # report the reward to the agent for training purpose
            agent_rref.rpc_sync().report_reward(self.id, reward)

            # finishes after the number of self.env._max_episode_steps
            if done:
                break

Agent 的代码稍微复杂一些,我们将分拆成多个部分。在此示例中,Agent 同时充当 Trainer 和 Master,它向多个分布式 Observer 发送命令以运行 episode,并记录所有动作和奖励(本地),这些将在每个 episode 后的训练阶段使用。下面的代码显示了 Agent 的构造函数,其中大部分行都在初始化各种组件。末尾的循环在其他 worker 上远程初始化 Observer,并在本地持有这些 Observer 的 RRefs。Agent 稍后将使用这些 Observer 的 RRefs 发送命令。应用程序无需担心 RRefs 的生命周期。每个 RRef 的所有者维护一个引用计数映射以跟踪其生命周期,并保证只要该 RRef 存在任何活跃用户,远程数据对象就不会被删除。有关详细信息,请参阅 RRef设计文档

import gym
import numpy as np

import torch
import torch.distributed.rpc as rpc
import torch.optim as optim
from torch.distributed.rpc import RRef, rpc_async, remote
from torch.distributions import Categorical

class Agent:
    def __init__(self, world_size):
        self.ob_rrefs = []
        self.agent_rref = RRef(self)
        self.rewards = {}
        self.saved_log_probs = {}
        self.policy = Policy()
        self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
        self.eps = np.finfo(np.float32).eps.item()
        self.running_reward = 0
        self.reward_threshold = gym.make('CartPole-v1').spec.reward_threshold
        for ob_rank in range(1, world_size):
            ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
            self.ob_rrefs.append(remote(ob_info, Observer))
            self.rewards[ob_info.id] = []
            self.saved_log_probs[ob_info.id] = []

接下来,Agent 向 Observer 暴露两个 API,用于选择动作和报告奖励。这些函数仅在 Agent 本地运行,但将由 Observer 通过 RPC 触发。

class Agent:
    ...
    def select_action(self, ob_id, state):
        state = torch.from_numpy(state).float().unsqueeze(0)
        probs = self.policy(state)
        m = Categorical(probs)
        action = m.sample()
        self.saved_log_probs[ob_id].append(m.log_prob(action))
        return action.item()

    def report_reward(self, ob_id, reward):
        self.rewards[ob_id].append(reward)

让我们在 Agent 上添加一个 run_episode 函数,该函数告诉所有 Observer 执行一个 episode。在此函数中,它首先创建一个列表来收集异步 RPC 的 future,然后遍历所有 Observer 的 RRefs 以进行异步 RPC。在这些 RPC 中,Agent 还将其自身的 RRef 传递给 Observer,以便 Observer 也可以调用 Agent 上的函数。如上所示,每个 Observer 将对 Agent 进行 RPC 调用,这些都是嵌套的 RPC。在每个 episode 之后,saved_log_probsrewards 将包含记录的动作概率和奖励。

class Agent:
    ...
    def run_episode(self):
        futs = []
        for ob_rref in self.ob_rrefs:
            # make async RPC to kick off an episode on all observers
            futs.append(
                rpc_async(
                    ob_rref.owner(),
                    ob_rref.rpc_sync().run_episode,
                    args=(self.agent_rref,)
                )
            )

        # wait until all obervers have finished this episode
        for fut in futs:
            fut.wait()

最后,在一个 episode 之后,Agent 需要训练模型,这在下面的 finish_episode 函数中实现。此函数中没有 RPC 调用,并且大部分内容都借鉴了单线程示例。因此,我们跳过对其内容的描述。

class Agent:
    ...
    def finish_episode(self):
      # joins probs and rewards from different observers into lists
      R, probs, rewards = 0, [], []
      for ob_id in self.rewards:
          probs.extend(self.saved_log_probs[ob_id])
          rewards.extend(self.rewards[ob_id])

      # use the minimum observer reward to calculate the running reward
      min_reward = min([sum(self.rewards[ob_id]) for ob_id in self.rewards])
      self.running_reward = 0.05 * min_reward + (1 - 0.05) * self.running_reward

      # clear saved probs and rewards
      for ob_id in self.rewards:
          self.rewards[ob_id] = []
          self.saved_log_probs[ob_id] = []

      policy_loss, returns = [], []
      for r in rewards[::-1]:
          R = r + args.gamma * R
          returns.insert(0, R)
      returns = torch.tensor(returns)
      returns = (returns - returns.mean()) / (returns.std() + self.eps)
      for log_prob, R in zip(probs, returns):
          policy_loss.append(-log_prob * R)
      self.optimizer.zero_grad()
      policy_loss = torch.cat(policy_loss).sum()
      policy_loss.backward()
      self.optimizer.step()
      return min_reward

有了 PolicyObserverAgent 类,我们就可以启动多个进程来执行分布式训练了。在此示例中,所有进程都运行相同的 run_worker 函数,并使用 rank 来区分其角色。Rank 0 始终是 Agent,所有其他 rank 都是 Observer。Agent 通过重复调用 run_episodefinish_episode 来充当 Master,直到运行奖励超过环境指定的奖励阈值。所有 Observer 被动地等待来自 Agent 的命令。代码由 rpc.init_rpcrpc.shutdown 封装,它们分别初始化和终止 RPC 实例。更多详细信息可在 API 页面中找到。

import os
from itertools import count

import torch.multiprocessing as mp

AGENT_NAME = "agent"
OBSERVER_NAME="obs{}"

def run_worker(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 0:
        # rank0 is the agent
        rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)

        agent = Agent(world_size)
        print(f"This will run until reward threshold of {agent.reward_threshold}"
                " is reached. Ctrl+C to exit.")
        for i_episode in count(1):
            agent.run_episode()
            last_reward = agent.finish_episode()

            if i_episode % args.log_interval == 0:
                print(f"Episode {i_episode}\tLast reward: {last_reward:.2f}\tAverage reward: "
                    f"{agent.running_reward:.2f}")
            if agent.running_reward > agent.reward_threshold:
                print(f"Solved! Running reward is now {agent.running_reward}!")
                break
    else:
        # other ranks are the observer
        rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
        # observers passively waiting for instructions from the agent

    # block until all rpcs finish, and shutdown the RPC instance
    rpc.shutdown()


mp.spawn(
    run_worker,
    args=(args.world_size, ),
    nprocs=args.world_size,
    join=True
)

以下是在 world_size=2 的情况下进行训练的一些示例输出。

This will run until reward threshold of 475.0 is reached. Ctrl+C to exit.
Episode 10      Last reward: 26.00      Average reward: 10.01
Episode 20      Last reward: 16.00      Average reward: 11.27
Episode 30      Last reward: 49.00      Average reward: 18.62
Episode 40      Last reward: 45.00      Average reward: 26.09
Episode 50      Last reward: 44.00      Average reward: 30.03
Episode 60      Last reward: 111.00     Average reward: 42.23
Episode 70      Last reward: 131.00     Average reward: 70.11
Episode 80      Last reward: 87.00      Average reward: 76.51
Episode 90      Last reward: 86.00      Average reward: 95.93
Episode 100     Last reward: 13.00      Average reward: 123.93
Episode 110     Last reward: 33.00      Average reward: 91.39
Episode 120     Last reward: 73.00      Average reward: 76.38
Episode 130     Last reward: 137.00     Average reward: 88.08
Episode 140     Last reward: 89.00      Average reward: 104.96
Episode 150     Last reward: 97.00      Average reward: 98.74
Episode 160     Last reward: 150.00     Average reward: 100.87
Episode 170     Last reward: 126.00     Average reward: 104.38
Episode 180     Last reward: 500.00     Average reward: 213.74
Episode 190     Last reward: 322.00     Average reward: 300.22
Episode 200     Last reward: 165.00     Average reward: 272.71
Episode 210     Last reward: 168.00     Average reward: 233.11
Episode 220     Last reward: 184.00     Average reward: 195.02
Episode 230     Last reward: 284.00     Average reward: 208.32
Episode 240     Last reward: 395.00     Average reward: 247.37
Episode 250     Last reward: 500.00     Average reward: 335.42
Episode 260     Last reward: 500.00     Average reward: 386.30
Episode 270     Last reward: 500.00     Average reward: 405.29
Episode 280     Last reward: 500.00     Average reward: 443.29
Episode 290     Last reward: 500.00     Average reward: 464.65
Solved! Running reward is now 475.3163778435275!

在此示例中,我们展示了如何使用 RPC 作为通信载体在 worker 之间传递数据,以及如何使用 RRef 引用远程对象。诚然,您可以直接在 ProcessGroupsendrecv API 上构建整个结构,或使用其他通信/RPC 库。然而,通过使用 torch.distributed.rpc,您可以获得原生支持和底层持续优化的性能。

接下来,我们将展示如何将 RPC 和 RRef 与分布式 Autograd 和分布式 Optimizer 结合起来进行分布式模型并行训练。

使用分布式 Autograd 和分布式 Optimizer 进行分布式 RNN

在本节中,我们使用一个 RNN 模型来展示如何使用 RPC API 构建分布式模型并行训练。示例 RNN 模型非常小,可以轻松放入单个 GPU,但我们仍将其层划分到两个不同的 worker 上以演示其思想。开发者可以应用类似的技术将更大的模型分发到多个设备和机器上。

RNN 模型设计借鉴了 PyTorch 示例仓库中的词语语言模型,该模型包含三个主要组件:嵌入表、一个 LSTM 层和一个 Decoder。下面的代码将嵌入表和 Decoder 封装到子模块中,以便将其构造函数传递给 RPC API。在 EmbeddingTable 子模块中,我们故意将 Embedding 层放在 GPU 上以覆盖该用例。在 v1.4 中,RPC 始终在目标 worker 上创建 CPU tensor 参数或返回值。如果函数接受 GPU tensor,则需要将其显式移动到适当的设备上。

class EmbeddingTable(nn.Module):
    r"""
    Encoding layers of the RNNModel
    """
    def __init__(self, ntoken, ninp, dropout):
        super(EmbeddingTable, self).__init__()
        self.drop = nn.Dropout(dropout)
        self.encoder = nn.Embedding(ntoken, ninp).cuda()
        self.encoder.weight.data.uniform_(-0.1, 0.1)

    def forward(self, input):
        return self.drop(self.encoder(input.cuda()).cpu()


class Decoder(nn.Module):
    def __init__(self, ntoken, nhid, dropout):
        super(Decoder, self).__init__()
        self.drop = nn.Dropout(dropout)
        self.decoder = nn.Linear(nhid, ntoken)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-0.1, 0.1)

    def forward(self, output):
        return self.decoder(self.drop(output))

有了上述子模块,我们现在可以使用 RPC 将它们组合在一起创建一个 RNN 模型。在下面的代码中,ps 代表一个参数服务器,它托管着嵌入表和 Decoder 的参数。构造函数使用 remote API 在参数服务器上创建一个 EmbeddingTable 对象和一个 Decoder 对象,并在本地创建 LSTM 子模块。在前向传播过程中,Trainer 使用 EmbeddingTableRRef 找到远程子模块,并使用 RPC 将输入数据传递给 EmbeddingTable 并获取查找结果。然后,它将嵌入结果通过本地的 LSTM 层运行,最后使用另一个 RPC 将输出发送到 Decoder 子模块。通常,为了实现分布式模型并行训练,开发者可以将模型划分为子模块,调用 RPC 远程创建子模块实例,并在需要时使用 RRef 查找它们。正如您在下面的代码中看到的,它看起来与单机模型并行训练非常相似。主要区别在于用 RPC 函数替换 Tensor.to(device)

class RNNModel(nn.Module):
    def __init__(self, ps, ntoken, ninp, nhid, nlayers, dropout=0.5):
        super(RNNModel, self).__init__()

        # setup embedding table remotely
        self.emb_table_rref = rpc.remote(ps, EmbeddingTable, args=(ntoken, ninp, dropout))
        # setup LSTM locally
        self.rnn = nn.LSTM(ninp, nhid, nlayers, dropout=dropout)
        # setup decoder remotely
        self.decoder_rref = rpc.remote(ps, Decoder, args=(ntoken, nhid, dropout))

    def forward(self, input, hidden):
        # pass input to the remote embedding table and fetch emb tensor back
        emb = _remote_method(EmbeddingTable.forward, self.emb_table_rref, input)
        output, hidden = self.rnn(emb, hidden)
        # pass output to the rremote decoder and get the decoded output back
        decoded = _remote_method(Decoder.forward, self.decoder_rref, output)
        return decoded, hidden

在介绍分布式 Optimizer 之前,让我们添加一个 helper 函数来生成模型参数的 RRefs 列表,该列表将由分布式 Optimizer 消费。在本地训练中,应用程序可以调用 Module.parameters() 获取所有参数 tensor 的引用,并将其传递给本地 Optimizer 以进行后续更新。然而,相同的 API 在分布式训练场景中不起作用,因为某些参数位于远程机器上。因此,分布式 Optimizer 不是接受一个参数 Tensors 列表,而是接受一个 RRefs 列表,本地和远程模型参数的每个参数对应一个 RRef。该 helper 函数非常简单,只需调用 Module.parameters() 并在每个参数上创建一个本地 RRef

def _parameter_rrefs(module):
    param_rrefs = []
    for param in module.parameters():
        param_rrefs.append(RRef(param))
    return param_rrefs

然后,由于 RNNModel 包含三个子模块,我们需要调用 _parameter_rrefs 三次,并将其封装到另一个 helper 函数中。

class RNNModel(nn.Module):
    ...
    def parameter_rrefs(self):
        remote_params = []
        # get RRefs of embedding table
        remote_params.extend(_remote_method(_parameter_rrefs, self.emb_table_rref))
        # create RRefs for local parameters
        remote_params.extend(_parameter_rrefs(self.rnn))
        # get RRefs of decoder
        remote_params.extend(_remote_method(_parameter_rrefs, self.decoder_rref))
        return remote_params

现在,我们准备实现训练循环。初始化模型参数后,我们创建 RNNModelDistributedOptimizer。分布式 optimizer 将接收一个参数 RRefs 列表,查找所有不同的 owner worker,并使用给定参数 (即此处为 lr=0.05) 在每个 owner worker 上创建给定的本地 optimizer (即此处为 SGD,您也可以使用其他本地 optimizer)。

在训练循环中,它首先创建一个分布式 Autograd context,这将帮助分布式 Autograd 引擎找到梯度和涉及的 RPC 发送/接收函数。分布式 Autograd 引擎的设计细节可在其设计说明中找到。然后,它启动前向传播(就像本地模型一样),并运行分布式反向传播。对于分布式反向传播,您只需要指定一个 roots 列表,在本例中是 loss Tensor。分布式 Autograd 引擎将自动遍历分布式图并正确写入梯度。接下来,它在分布式 optimizer 上运行 step 函数,该函数将联系所有涉及的本地 optimizer 以更新模型参数。与本地训练相比,一个细微的区别是您不需要运行 zero_grad(),因为每个 Autograd context 都有专门的空间来存储梯度,并且由于我们为每次迭代创建一个 context,因此不同迭代中的这些梯度不会累积到同一组 Tensors 中。

def run_trainer():
    batch = 5
    ntoken = 10
    ninp = 2

    nhid = 3
    nindices = 3
    nlayers = 4
    hidden = (
        torch.randn(nlayers, nindices, nhid),
        torch.randn(nlayers, nindices, nhid)
    )

    model = rnn.RNNModel('ps', ntoken, ninp, nhid, nlayers)

    # setup distributed optimizer
    opt = DistributedOptimizer(
        optim.SGD,
        model.parameter_rrefs(),
        lr=0.05,
    )

    criterion = torch.nn.CrossEntropyLoss()

    def get_next_batch():
        for _ in range(5):
            data = torch.LongTensor(batch, nindices) % ntoken
            target = torch.LongTensor(batch, ntoken) % nindices
            yield data, target

    # train for 10 iterations
    for epoch in range(10):
        for data, target in get_next_batch():
            # create distributed autograd context
            with dist_autograd.context() as context_id:
                hidden[0].detach_()
                hidden[1].detach_()
                output, hidden = model(data, hidden)
                loss = criterion(output, target)
                # run distributed backward pass
                dist_autograd.backward(context_id, [loss])
                # run distributed optimizer
                opt.step(context_id)
                # not necessary to zero grads since they are
                # accumulated into the distributed autograd context
                # which is reset every iteration.
        print("Training epoch {}".format(epoch))

最后,让我们添加一些粘合代码来启动参数服务器和 Trainer 进程。

def run_worker(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 1:
        rpc.init_rpc("trainer", rank=rank, world_size=world_size)
        _run_trainer()
    else:
        rpc.init_rpc("ps", rank=rank, world_size=world_size)
        # parameter server do nothing
        pass

    # block until all rpcs finish
    rpc.shutdown()


if __name__=="__main__":
    world_size = 2
    mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)

文档

访问 PyTorch 全面开发者文档

查看文档

教程

获取面向初学者和高级开发者的深入教程

查看教程

资源

查找开发资源并解答您的疑问

查看资源