• 教程 >
  • 使用异步执行实现批量 RPC 处理
快捷方式

使用异步执行实现批量 RPC 处理

创建时间:2020 年 7 月 28 日 | 最后更新:2024 年 11 月 13 日 | 最后验证:未验证

作者Shen Li

注意

编辑github 中查看和编辑本教程。

前提条件

本教程演示了如何使用 @rpc.functions.async_execution 装饰器构建批量处理 RPC 应用程序,这有助于通过减少阻塞的 RPC 线程数量和整合 callee 上的 CUDA 操作来加速训练。这与 使用 TorchServe 进行批量推理 有着相同的思路。

注意

本教程需要 PyTorch v1.6.0 或更高版本。

基础知识

先前的教程展示了如何使用 torch.distributed.rpc 构建分布式训练应用程序,但没有详细说明在处理 RPC 请求时 callee 端会发生什么。在 PyTorch v1.5 中,每个 RPC 请求都会阻塞 callee 上的一个线程来执行该请求中的函数,直到该函数返回。这适用于许多用例,但有一个注意事项。如果用户函数因 IO(例如,嵌套 RPC 调用)或信号(例如,等待另一个 RPC 请求解除阻塞)而阻塞,callee 上的 RPC 线程将不得不空闲等待直到 IO 完成或信号事件发生。因此,RPC callee 可能会使用比实际需要更多的线程。这个问题的原因是 RPC 将用户函数视为黑盒,对函数内部发生的事情知之甚少。为了让用户函数能够让出并释放 RPC 线程,需要向 RPC 系统提供更多提示。

从 v1.6.0 开始,PyTorch 通过引入两个新概念解决了这个问题

  • 一种封装异步执行的 torch.futures.Future 类型,它还支持安装回调函数。

  • 一个 @rpc.functions.async_execution 装饰器,它允许应用程序告诉 callee 目标函数将返回一个 future,并且可以在执行期间多次暂停和让出。

借助这两个工具,应用程序代码可以将用户函数分解为多个更小的函数,将它们作为回调链式地连接到 Future 对象上,并返回包含最终结果的 Future。在 callee 端,当获取到 Future 对象时,它也会将后续的 RPC 响应准备和通信作为回调安装上去,这些回调将在最终结果准备好时触发。通过这种方式,callee 不再需要阻塞一个线程并等待最终返回值就绪。请参阅 @rpc.functions.async_execution 的 API 文档获取简单示例。

除了减少 callee 上的空闲线程数量之外,这些工具还有助于使批量 RPC 处理更简单、更快。本教程的以下两部分演示了如何使用 @rpc.functions.async_execution 装饰器构建分布式批量更新参数服务器和批量处理强化学习应用程序。

批量更新参数服务器

考虑一个同步参数服务器训练应用程序,它有一个参数服务器 (PS) 和多个 trainer。在此应用程序中,PS 持有参数并等待所有 trainer 报告梯度。在每次迭代中,它都会等待接收到所有 trainer 的梯度,然后一次性更新所有参数。下面的代码展示了 PS 类的实现。update_and_fetch_model 方法使用 @rpc.functions.async_execution 进行装饰,并将由 trainer 调用。每次调用都返回一个 Future 对象,该对象将用更新后的模型填充。大多数 trainer 发起的调用只是将梯度累积到 .grad 字段,立即返回,并让出 PS 上的 RPC 线程。最后到达的 trainer 将触发优化器步骤并消耗所有先前报告的梯度。然后它用更新后的模型设置 future_model,这反过来又通过 Future 对象通知来自其他 trainer 的所有先前请求,并将更新后的模型发送给所有 trainer。

import threading
import torchvision
import torch
import torch.distributed.rpc as rpc
from torch import optim

num_classes, batch_update_size = 30, 5

class BatchUpdateParameterServer(object):
    def __init__(self, batch_update_size=batch_update_size):
        self.model = torchvision.models.resnet50(num_classes=num_classes)
        self.lock = threading.Lock()
        self.future_model = torch.futures.Future()
        self.batch_update_size = batch_update_size
        self.curr_update_size = 0
        self.optimizer = optim.SGD(self.model.parameters(), lr=0.001, momentum=0.9)
        for p in self.model.parameters():
            p.grad = torch.zeros_like(p)

    def get_model(self):
        return self.model

    @staticmethod
    @rpc.functions.async_execution
    def update_and_fetch_model(ps_rref, grads):
        # Using the RRef to retrieve the local PS instance
        self = ps_rref.local_value()
        with self.lock:
            self.curr_update_size += 1
            # accumulate gradients into .grad field
            for p, g in zip(self.model.parameters(), grads):
                p.grad += g

            # Save the current future_model and return it to make sure the
            # returned Future object holds the correct model even if another
            # thread modifies future_model before this thread returns.
            fut = self.future_model

            if self.curr_update_size >= self.batch_update_size:
                # update the model
                for p in self.model.parameters():
                    p.grad /= self.batch_update_size
                self.curr_update_size = 0
                self.optimizer.step()
                self.optimizer.zero_grad()
                # by settiing the result on the Future object, all previous
                # requests expecting this updated model will be notified and
                # the their responses will be sent accordingly.
                fut.set_result(self.model)
                self.future_model = torch.futures.Future()

        return fut

对于 trainer,它们都使用 PS 中的同一组参数进行初始化。在每次迭代中,每个 trainer 首先运行前向和后向传播以在本地生成梯度。然后,每个 trainer 使用 RPC 将其梯度报告给 PS,并通过同一 RPC 请求的返回值取回更新后的参数。在 trainer 的实现中,目标函数是否标记了 @rpc.functions.async_execution 没有区别。trainer 只需使用 rpc_sync 调用 update_and_fetch_model,这将在 trainer 上阻塞,直到返回更新后的模型。

batch_size, image_w, image_h  = 20, 64, 64

class Trainer(object):
    def __init__(self, ps_rref):
        self.ps_rref, self.loss_fn = ps_rref, torch.nn.MSELoss()
        self.one_hot_indices = torch.LongTensor(batch_size) \
                                    .random_(0, num_classes) \
                                    .view(batch_size, 1)

    def get_next_batch(self):
        for _ in range(6):
            inputs = torch.randn(batch_size, 3, image_w, image_h)
            labels = torch.zeros(batch_size, num_classes) \
                        .scatter_(1, self.one_hot_indices, 1)
            yield inputs.cuda(), labels.cuda()

    def train(self):
        name = rpc.get_worker_info().name
        # get initial model parameters
        m = self.ps_rref.rpc_sync().get_model().cuda()
        # start training
        for inputs, labels in self.get_next_batch():
            self.loss_fn(m(inputs), labels).backward()
            m = rpc.rpc_sync(
                self.ps_rref.owner(),
                BatchUpdateParameterServer.update_and_fetch_model,
                args=(self.ps_rref, [p.grad for p in m.cpu().parameters()]),
            ).cuda()

本教程跳过了启动多个进程的代码,请参考 examples 仓库获取完整实现。注意,不使用 @rpc.functions.async_execution 装饰器也可以实现批量处理。然而,这将需要在 PS 上阻塞更多 RPC 线程,或者使用另一轮 RPC 来获取更新后的模型,后一种方法会增加代码复杂性和通信开销。

本节使用一个简单的参数服务器训练示例来展示如何使用 @rpc.functions.async_execution 装饰器实现批量 RPC 应用程序。在下一节中,我们将使用批量处理重新实现先前 分布式 RPC 框架入门 教程中的强化学习示例,并演示其对训练速度的影响。

批量处理 CartPole Solver

本节使用 OpenAI Gym 中的 CartPole-v1 作为示例,展示批量处理 RPC 对性能的影响。请注意,由于目标是演示 @rpc.functions.async_execution 的用法,而不是构建最好的 CartPole solver 或解决大多数不同的 RL 问题,我们使用非常简单的策略和奖励计算方法,重点关注多 observer 单 agent 的批量 RPC 实现。我们使用与先前教程类似的 Policy 模型,如下所示。与之前的教程相比,区别在于其构造函数接受一个额外的 batch 参数,该参数控制 F.softmaxdim 参数,因为批量处理时,forward 函数中的 x 参数包含来自多个 observer 的状态,因此维度需要相应改变。其余部分保持不变。

import argparse
import torch.nn as nn
import torch.nn.functional as F

parser = argparse.ArgumentParser(description='PyTorch RPC Batch RL example')
parser.add_argument('--gamma', type=float, default=1.0, metavar='G',
                    help='discount factor (default: 1.0)')
parser.add_argument('--seed', type=int, default=543, metavar='N',
                    help='random seed (default: 543)')
parser.add_argument('--num-episode', type=int, default=10, metavar='E',
                    help='number of episodes (default: 10)')
args = parser.parse_args()

torch.manual_seed(args.seed)

class Policy(nn.Module):
    def __init__(self, batch=True):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(4, 128)
        self.dropout = nn.Dropout(p=0.6)
        self.affine2 = nn.Linear(128, 2)
        self.dim = 2 if batch else 1

    def forward(self, x):
        x = self.affine1(x)
        x = self.dropout(x)
        x = F.relu(x)
        action_scores = self.affine2(x)
        return F.softmax(action_scores, dim=self.dim)

Observer 的构造函数也做了相应的调整。它还接受一个 batch 参数,该参数控制它使用哪个 Agent 函数来选择动作。在批量模式下,它会调用 Agent 上的 select_action_batch 函数(稍后会介绍),该函数将使用 @rpc.functions.async_execution 进行装饰。

import gym
import torch.distributed.rpc as rpc

class Observer:
    def __init__(self, batch=True):
        self.id = rpc.get_worker_info().id - 1
        self.env = gym.make('CartPole-v1')
        self.env.seed(args.seed)
        self.select_action = Agent.select_action_batch if batch else Agent.select_action

与之前的教程 分布式 RPC 框架入门 相比,observer 的行为略有不同。它不是在环境停止时退出,而是在每个 episode 中始终运行 n_steps 次迭代。当环境返回时,observer 只需重置环境并重新开始。通过这种设计,agent 将从每个 observer 接收固定数量的状态,因此可以将它们打包到一个固定大小的张量中。在每一步中,Observer 使用 RPC 将其状态发送给 Agent,并通过返回值获取动作。在每个 episode 结束时,它将所有步骤的奖励返回给 Agent。请注意,这个 run_episode 函数将由 Agent 使用 RPC 调用。因此,此函数中的 rpc_sync 调用将是一个嵌套的 RPC 调用。我们也可以将此函数标记为 @rpc.functions.async_execution,以避免阻塞 Observer 上的一个线程。然而,由于瓶颈在 Agent 而不是 Observer,阻塞 Observer 进程上的一个线程应该是可以接受的。

import torch

class Observer:
    ...

    def run_episode(self, agent_rref, n_steps):
        state, ep_reward = self.env.reset(), NUM_STEPS
        rewards = torch.zeros(n_steps)
        start_step = 0
        for step in range(n_steps):
            state = torch.from_numpy(state).float().unsqueeze(0)
            # send the state to the agent to get an action
            action = rpc.rpc_sync(
                agent_rref.owner(),
                self.select_action,
                args=(agent_rref, self.id, state)
            )

            # apply the action to the environment, and get the reward
            state, reward, done, _ = self.env.step(action)
            rewards[step] = reward

            if done or step + 1 >= n_steps:
                curr_rewards = rewards[start_step:(step + 1)]
                R = 0
                for i in range(curr_rewards.numel() -1, -1, -1):
                    R = curr_rewards[i] + args.gamma * R
                    curr_rewards[i] = R
                state = self.env.reset()
                if start_step == 0:
                    ep_reward = min(ep_reward, step - start_step + 1)
                start_step = step + 1

        return [rewards, ep_reward]

Agent 的构造函数也接受一个 batch 参数,该参数控制动作概率如何进行批量处理。在批量模式下,saved_log_probs 包含一个张量列表,其中每个张量包含所有 observer 在一步中的动作概率。在非批量处理模式下,saved_log_probs 是一个字典,其中键是 observer ID,值是该 observer 的动作概率列表。

import threading
from torch.distributed.rpc import RRef

class Agent:
    def __init__(self, world_size, batch=True):
        self.ob_rrefs = []
        self.agent_rref = RRef(self)
        self.rewards = {}
        self.policy = Policy(batch).cuda()
        self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
        self.running_reward = 0

        for ob_rank in range(1, world_size):
            ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
            self.ob_rrefs.append(rpc.remote(ob_info, Observer, args=(batch,)))
            self.rewards[ob_info.id] = []

        self.states = torch.zeros(len(self.ob_rrefs), 1, 4)
        self.batch = batch
        self.saved_log_probs = [] if batch else {k:[] for k in range(len(self.ob_rrefs))}
        self.future_actions = torch.futures.Future()
        self.lock = threading.Lock()
        self.pending_states = len(self.ob_rrefs)

非批量处理模式下的 select_acion 简单地通过策略运行状态,保存动作概率,然后立即将动作返回给 observer。

from torch.distributions import Categorical

class Agent:
    ...

    @staticmethod
    def select_action(agent_rref, ob_id, state):
        self = agent_rref.local_value()
        probs = self.policy(state.cuda())
        m = Categorical(probs)
        action = m.sample()
        self.saved_log_probs[ob_id].append(m.log_prob(action))
        return action.item()

启用批量处理后,状态存储在一个 2D 张量 self.states 中,使用 observer ID 作为行 ID。然后,它通过在批量生成的 self.future_actions Future 对象上安装回调函数来链式生成一个 Future,该 Future 将填充该 observer ID 索引到的特定行。最后到达的 observer 一次性将所有批量状态通过策略运行,并相应地设置 self.future_actions。当这种情况发生时,安装在 self.future_actions 上的所有回调函数将被触发,它们的返回值将用于填充链式生成的 Future 对象,这反过来又通知 Agent 为来自其他 observer 的所有先前 RPC 请求准备和通信响应。

class Agent:
    ...

    @staticmethod
    @rpc.functions.async_execution
    def select_action_batch(agent_rref, ob_id, state):
        self = agent_rref.local_value()
        self.states[ob_id].copy_(state)
        future_action = self.future_actions.then(
            lambda future_actions: future_actions.wait()[ob_id].item()
        )

        with self.lock:
            self.pending_states -= 1
            if self.pending_states == 0:
                self.pending_states = len(self.ob_rrefs)
                probs = self.policy(self.states.cuda())
                m = Categorical(probs)
                actions = m.sample()
                self.saved_log_probs.append(m.log_prob(actions).t()[0])
                future_actions = self.future_actions
                self.future_actions = torch.futures.Future()
                future_actions.set_result(actions.cpu())
        return future_action

现在我们来定义不同的 RPC 函数是如何组合在一起的。Agent 控制每个 episode 的执行。它首先使用 rpc_async 在所有 observer 上启动 episode,并阻塞在返回的 future 上,这些 future 将填充 observer 奖励。请注意,下面的代码使用 RRef 助手 ob_rref.rpc_async()ob_rref RRef 的所有者上使用提供的参数启动 run_episode 函数。然后它将保存的动作概率和返回的 observer 奖励转换为期望的数据格式,并启动训练步骤。最后,它重置所有状态并返回当前 episode 的奖励。此函数是运行一个 episode 的入口点。

class Agent:
    ...

    def run_episode(self, n_steps=0):
        futs = []
        for ob_rref in self.ob_rrefs:
            # make async RPC to kick off an episode on all observers
            futs.append(ob_rref.rpc_async().run_episode(self.agent_rref, n_steps))

        # wait until all obervers have finished this episode
        rets = torch.futures.wait_all(futs)
        rewards = torch.stack([ret[0] for ret in rets]).cuda().t()
        ep_rewards = sum([ret[1] for ret in rets]) / len(rets)

        # stack saved probs into one tensor
        if self.batch:
            probs = torch.stack(self.saved_log_probs)
        else:
            probs = [torch.stack(self.saved_log_probs[i]) for i in range(len(rets))]
            probs = torch.stack(probs)

        policy_loss = -probs * rewards / len(rets)
        policy_loss.sum().backward()
        self.optimizer.step()
        self.optimizer.zero_grad()

        # reset variables
        self.saved_log_probs = [] if self.batch else {k:[] for k in range(len(self.ob_rrefs))}
        self.states = torch.zeros(len(self.ob_rrefs), 1, 4)

        # calculate running rewards
        self.running_reward = 0.5 * ep_rewards + 0.5 * self.running_reward
        return ep_rewards, self.running_reward

其余代码是正常的进程启动和日志记录,类似于其他 RPC 教程。在本教程中,所有 observer 都被动地等待来自 agent 的命令。请参考 examples 仓库获取完整实现。

def run_worker(rank, world_size, n_episode, batch, print_log=True):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 0:
        # rank0 is the agent
        rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)

        agent = Agent(world_size, batch)
        for i_episode in range(n_episode):
            last_reward, running_reward = agent.run_episode(n_steps=NUM_STEPS)

            if print_log:
                print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
                    i_episode, last_reward, running_reward))
    else:
        # other ranks are the observer
        rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
        # observers passively waiting for instructions from agents
    rpc.shutdown()


def main():
    for world_size in range(2, 12):
        delays = []
        for batch in [True, False]:
            tik = time.time()
            mp.spawn(
                run_worker,
                args=(world_size, args.num_episode, batch),
                nprocs=world_size,
                join=True
            )
            tok = time.time()
            delays.append(tok - tik)

        print(f"{world_size}, {delays[0]}, {delays[1]}")


if __name__ == '__main__':
    main()

批量 RPC 有助于将动作推理整合到更少的 CUDA 操作中,从而降低了分摊开销。上面的 main 函数使用不同数量的 observer(从 1 到 10),在批量和非批量模式下运行相同的代码。下图绘制了使用默认参数值在不同 world size 下的执行时间。结果证实了我们的预期,即批量处理有助于加速训练。

文档

查阅 PyTorch 的完整开发者文档

查看文档

教程

获取面向初学者和高级开发者的深度教程

查看教程

资源

查找开发资源并获取问题解答

查看资源