使用异步执行实现批量 RPC 处理¶
创建时间:2020 年 7 月 28 日 | 最后更新:2024 年 11 月 13 日 | 最后验证:未验证
作者:Shen Li
注意
在 github 中查看和编辑本教程。
前提条件
本教程演示了如何使用 @rpc.functions.async_execution 装饰器构建批量处理 RPC 应用程序,这有助于通过减少阻塞的 RPC 线程数量和整合 callee 上的 CUDA 操作来加速训练。这与 使用 TorchServe 进行批量推理 有着相同的思路。
注意
本教程需要 PyTorch v1.6.0 或更高版本。
基础知识¶
先前的教程展示了如何使用 torch.distributed.rpc 构建分布式训练应用程序,但没有详细说明在处理 RPC 请求时 callee 端会发生什么。在 PyTorch v1.5 中,每个 RPC 请求都会阻塞 callee 上的一个线程来执行该请求中的函数,直到该函数返回。这适用于许多用例,但有一个注意事项。如果用户函数因 IO(例如,嵌套 RPC 调用)或信号(例如,等待另一个 RPC 请求解除阻塞)而阻塞,callee 上的 RPC 线程将不得不空闲等待直到 IO 完成或信号事件发生。因此,RPC callee 可能会使用比实际需要更多的线程。这个问题的原因是 RPC 将用户函数视为黑盒,对函数内部发生的事情知之甚少。为了让用户函数能够让出并释放 RPC 线程,需要向 RPC 系统提供更多提示。
从 v1.6.0 开始,PyTorch 通过引入两个新概念解决了这个问题
一种封装异步执行的 torch.futures.Future 类型,它还支持安装回调函数。
一个 @rpc.functions.async_execution 装饰器,它允许应用程序告诉 callee 目标函数将返回一个 future,并且可以在执行期间多次暂停和让出。
借助这两个工具,应用程序代码可以将用户函数分解为多个更小的函数,将它们作为回调链式地连接到 Future
对象上,并返回包含最终结果的 Future
。在 callee 端,当获取到 Future
对象时,它也会将后续的 RPC 响应准备和通信作为回调安装上去,这些回调将在最终结果准备好时触发。通过这种方式,callee 不再需要阻塞一个线程并等待最终返回值就绪。请参阅 @rpc.functions.async_execution 的 API 文档获取简单示例。
除了减少 callee 上的空闲线程数量之外,这些工具还有助于使批量 RPC 处理更简单、更快。本教程的以下两部分演示了如何使用 @rpc.functions.async_execution 装饰器构建分布式批量更新参数服务器和批量处理强化学习应用程序。
批量更新参数服务器¶
考虑一个同步参数服务器训练应用程序,它有一个参数服务器 (PS) 和多个 trainer。在此应用程序中,PS 持有参数并等待所有 trainer 报告梯度。在每次迭代中,它都会等待接收到所有 trainer 的梯度,然后一次性更新所有参数。下面的代码展示了 PS 类的实现。update_and_fetch_model
方法使用 @rpc.functions.async_execution
进行装饰,并将由 trainer 调用。每次调用都返回一个 Future
对象,该对象将用更新后的模型填充。大多数 trainer 发起的调用只是将梯度累积到 .grad
字段,立即返回,并让出 PS 上的 RPC 线程。最后到达的 trainer 将触发优化器步骤并消耗所有先前报告的梯度。然后它用更新后的模型设置 future_model
,这反过来又通过 Future
对象通知来自其他 trainer 的所有先前请求,并将更新后的模型发送给所有 trainer。
import threading
import torchvision
import torch
import torch.distributed.rpc as rpc
from torch import optim
num_classes, batch_update_size = 30, 5
class BatchUpdateParameterServer(object):
def __init__(self, batch_update_size=batch_update_size):
self.model = torchvision.models.resnet50(num_classes=num_classes)
self.lock = threading.Lock()
self.future_model = torch.futures.Future()
self.batch_update_size = batch_update_size
self.curr_update_size = 0
self.optimizer = optim.SGD(self.model.parameters(), lr=0.001, momentum=0.9)
for p in self.model.parameters():
p.grad = torch.zeros_like(p)
def get_model(self):
return self.model
@staticmethod
@rpc.functions.async_execution
def update_and_fetch_model(ps_rref, grads):
# Using the RRef to retrieve the local PS instance
self = ps_rref.local_value()
with self.lock:
self.curr_update_size += 1
# accumulate gradients into .grad field
for p, g in zip(self.model.parameters(), grads):
p.grad += g
# Save the current future_model and return it to make sure the
# returned Future object holds the correct model even if another
# thread modifies future_model before this thread returns.
fut = self.future_model
if self.curr_update_size >= self.batch_update_size:
# update the model
for p in self.model.parameters():
p.grad /= self.batch_update_size
self.curr_update_size = 0
self.optimizer.step()
self.optimizer.zero_grad()
# by settiing the result on the Future object, all previous
# requests expecting this updated model will be notified and
# the their responses will be sent accordingly.
fut.set_result(self.model)
self.future_model = torch.futures.Future()
return fut
对于 trainer,它们都使用 PS 中的同一组参数进行初始化。在每次迭代中,每个 trainer 首先运行前向和后向传播以在本地生成梯度。然后,每个 trainer 使用 RPC 将其梯度报告给 PS,并通过同一 RPC 请求的返回值取回更新后的参数。在 trainer 的实现中,目标函数是否标记了 @rpc.functions.async_execution
没有区别。trainer 只需使用 rpc_sync
调用 update_and_fetch_model
,这将在 trainer 上阻塞,直到返回更新后的模型。
batch_size, image_w, image_h = 20, 64, 64
class Trainer(object):
def __init__(self, ps_rref):
self.ps_rref, self.loss_fn = ps_rref, torch.nn.MSELoss()
self.one_hot_indices = torch.LongTensor(batch_size) \
.random_(0, num_classes) \
.view(batch_size, 1)
def get_next_batch(self):
for _ in range(6):
inputs = torch.randn(batch_size, 3, image_w, image_h)
labels = torch.zeros(batch_size, num_classes) \
.scatter_(1, self.one_hot_indices, 1)
yield inputs.cuda(), labels.cuda()
def train(self):
name = rpc.get_worker_info().name
# get initial model parameters
m = self.ps_rref.rpc_sync().get_model().cuda()
# start training
for inputs, labels in self.get_next_batch():
self.loss_fn(m(inputs), labels).backward()
m = rpc.rpc_sync(
self.ps_rref.owner(),
BatchUpdateParameterServer.update_and_fetch_model,
args=(self.ps_rref, [p.grad for p in m.cpu().parameters()]),
).cuda()
本教程跳过了启动多个进程的代码,请参考 examples 仓库获取完整实现。注意,不使用 @rpc.functions.async_execution 装饰器也可以实现批量处理。然而,这将需要在 PS 上阻塞更多 RPC 线程,或者使用另一轮 RPC 来获取更新后的模型,后一种方法会增加代码复杂性和通信开销。
本节使用一个简单的参数服务器训练示例来展示如何使用 @rpc.functions.async_execution 装饰器实现批量 RPC 应用程序。在下一节中,我们将使用批量处理重新实现先前 分布式 RPC 框架入门 教程中的强化学习示例,并演示其对训练速度的影响。
批量处理 CartPole Solver¶
本节使用 OpenAI Gym 中的 CartPole-v1 作为示例,展示批量处理 RPC 对性能的影响。请注意,由于目标是演示 @rpc.functions.async_execution 的用法,而不是构建最好的 CartPole solver 或解决大多数不同的 RL 问题,我们使用非常简单的策略和奖励计算方法,重点关注多 observer 单 agent 的批量 RPC 实现。我们使用与先前教程类似的 Policy
模型,如下所示。与之前的教程相比,区别在于其构造函数接受一个额外的 batch
参数,该参数控制 F.softmax
的 dim
参数,因为批量处理时,forward
函数中的 x
参数包含来自多个 observer 的状态,因此维度需要相应改变。其余部分保持不变。
import argparse
import torch.nn as nn
import torch.nn.functional as F
parser = argparse.ArgumentParser(description='PyTorch RPC Batch RL example')
parser.add_argument('--gamma', type=float, default=1.0, metavar='G',
help='discount factor (default: 1.0)')
parser.add_argument('--seed', type=int, default=543, metavar='N',
help='random seed (default: 543)')
parser.add_argument('--num-episode', type=int, default=10, metavar='E',
help='number of episodes (default: 10)')
args = parser.parse_args()
torch.manual_seed(args.seed)
class Policy(nn.Module):
def __init__(self, batch=True):
super(Policy, self).__init__()
self.affine1 = nn.Linear(4, 128)
self.dropout = nn.Dropout(p=0.6)
self.affine2 = nn.Linear(128, 2)
self.dim = 2 if batch else 1
def forward(self, x):
x = self.affine1(x)
x = self.dropout(x)
x = F.relu(x)
action_scores = self.affine2(x)
return F.softmax(action_scores, dim=self.dim)
Observer
的构造函数也做了相应的调整。它还接受一个 batch
参数,该参数控制它使用哪个 Agent
函数来选择动作。在批量模式下,它会调用 Agent
上的 select_action_batch
函数(稍后会介绍),该函数将使用 @rpc.functions.async_execution 进行装饰。
import gym
import torch.distributed.rpc as rpc
class Observer:
def __init__(self, batch=True):
self.id = rpc.get_worker_info().id - 1
self.env = gym.make('CartPole-v1')
self.env.seed(args.seed)
self.select_action = Agent.select_action_batch if batch else Agent.select_action
与之前的教程 分布式 RPC 框架入门 相比,observer 的行为略有不同。它不是在环境停止时退出,而是在每个 episode 中始终运行 n_steps
次迭代。当环境返回时,observer 只需重置环境并重新开始。通过这种设计,agent 将从每个 observer 接收固定数量的状态,因此可以将它们打包到一个固定大小的张量中。在每一步中,Observer
使用 RPC 将其状态发送给 Agent
,并通过返回值获取动作。在每个 episode 结束时,它将所有步骤的奖励返回给 Agent
。请注意,这个 run_episode
函数将由 Agent
使用 RPC 调用。因此,此函数中的 rpc_sync
调用将是一个嵌套的 RPC 调用。我们也可以将此函数标记为 @rpc.functions.async_execution
,以避免阻塞 Observer
上的一个线程。然而,由于瓶颈在 Agent
而不是 Observer
,阻塞 Observer
进程上的一个线程应该是可以接受的。
import torch
class Observer:
...
def run_episode(self, agent_rref, n_steps):
state, ep_reward = self.env.reset(), NUM_STEPS
rewards = torch.zeros(n_steps)
start_step = 0
for step in range(n_steps):
state = torch.from_numpy(state).float().unsqueeze(0)
# send the state to the agent to get an action
action = rpc.rpc_sync(
agent_rref.owner(),
self.select_action,
args=(agent_rref, self.id, state)
)
# apply the action to the environment, and get the reward
state, reward, done, _ = self.env.step(action)
rewards[step] = reward
if done or step + 1 >= n_steps:
curr_rewards = rewards[start_step:(step + 1)]
R = 0
for i in range(curr_rewards.numel() -1, -1, -1):
R = curr_rewards[i] + args.gamma * R
curr_rewards[i] = R
state = self.env.reset()
if start_step == 0:
ep_reward = min(ep_reward, step - start_step + 1)
start_step = step + 1
return [rewards, ep_reward]
Agent
的构造函数也接受一个 batch
参数,该参数控制动作概率如何进行批量处理。在批量模式下,saved_log_probs
包含一个张量列表,其中每个张量包含所有 observer 在一步中的动作概率。在非批量处理模式下,saved_log_probs
是一个字典,其中键是 observer ID,值是该 observer 的动作概率列表。
import threading
from torch.distributed.rpc import RRef
class Agent:
def __init__(self, world_size, batch=True):
self.ob_rrefs = []
self.agent_rref = RRef(self)
self.rewards = {}
self.policy = Policy(batch).cuda()
self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
self.running_reward = 0
for ob_rank in range(1, world_size):
ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
self.ob_rrefs.append(rpc.remote(ob_info, Observer, args=(batch,)))
self.rewards[ob_info.id] = []
self.states = torch.zeros(len(self.ob_rrefs), 1, 4)
self.batch = batch
self.saved_log_probs = [] if batch else {k:[] for k in range(len(self.ob_rrefs))}
self.future_actions = torch.futures.Future()
self.lock = threading.Lock()
self.pending_states = len(self.ob_rrefs)
非批量处理模式下的 select_acion
简单地通过策略运行状态,保存动作概率,然后立即将动作返回给 observer。
from torch.distributions import Categorical
class Agent:
...
@staticmethod
def select_action(agent_rref, ob_id, state):
self = agent_rref.local_value()
probs = self.policy(state.cuda())
m = Categorical(probs)
action = m.sample()
self.saved_log_probs[ob_id].append(m.log_prob(action))
return action.item()
启用批量处理后,状态存储在一个 2D 张量 self.states
中,使用 observer ID 作为行 ID。然后,它通过在批量生成的 self.future_actions
Future
对象上安装回调函数来链式生成一个 Future
,该 Future
将填充该 observer ID 索引到的特定行。最后到达的 observer 一次性将所有批量状态通过策略运行,并相应地设置 self.future_actions
。当这种情况发生时,安装在 self.future_actions
上的所有回调函数将被触发,它们的返回值将用于填充链式生成的 Future
对象,这反过来又通知 Agent
为来自其他 observer 的所有先前 RPC 请求准备和通信响应。
class Agent:
...
@staticmethod
@rpc.functions.async_execution
def select_action_batch(agent_rref, ob_id, state):
self = agent_rref.local_value()
self.states[ob_id].copy_(state)
future_action = self.future_actions.then(
lambda future_actions: future_actions.wait()[ob_id].item()
)
with self.lock:
self.pending_states -= 1
if self.pending_states == 0:
self.pending_states = len(self.ob_rrefs)
probs = self.policy(self.states.cuda())
m = Categorical(probs)
actions = m.sample()
self.saved_log_probs.append(m.log_prob(actions).t()[0])
future_actions = self.future_actions
self.future_actions = torch.futures.Future()
future_actions.set_result(actions.cpu())
return future_action
现在我们来定义不同的 RPC 函数是如何组合在一起的。Agent
控制每个 episode 的执行。它首先使用 rpc_async
在所有 observer 上启动 episode,并阻塞在返回的 future 上,这些 future 将填充 observer 奖励。请注意,下面的代码使用 RRef 助手 ob_rref.rpc_async()
在 ob_rref
RRef 的所有者上使用提供的参数启动 run_episode
函数。然后它将保存的动作概率和返回的 observer 奖励转换为期望的数据格式,并启动训练步骤。最后,它重置所有状态并返回当前 episode 的奖励。此函数是运行一个 episode 的入口点。
class Agent:
...
def run_episode(self, n_steps=0):
futs = []
for ob_rref in self.ob_rrefs:
# make async RPC to kick off an episode on all observers
futs.append(ob_rref.rpc_async().run_episode(self.agent_rref, n_steps))
# wait until all obervers have finished this episode
rets = torch.futures.wait_all(futs)
rewards = torch.stack([ret[0] for ret in rets]).cuda().t()
ep_rewards = sum([ret[1] for ret in rets]) / len(rets)
# stack saved probs into one tensor
if self.batch:
probs = torch.stack(self.saved_log_probs)
else:
probs = [torch.stack(self.saved_log_probs[i]) for i in range(len(rets))]
probs = torch.stack(probs)
policy_loss = -probs * rewards / len(rets)
policy_loss.sum().backward()
self.optimizer.step()
self.optimizer.zero_grad()
# reset variables
self.saved_log_probs = [] if self.batch else {k:[] for k in range(len(self.ob_rrefs))}
self.states = torch.zeros(len(self.ob_rrefs), 1, 4)
# calculate running rewards
self.running_reward = 0.5 * ep_rewards + 0.5 * self.running_reward
return ep_rewards, self.running_reward
其余代码是正常的进程启动和日志记录,类似于其他 RPC 教程。在本教程中,所有 observer 都被动地等待来自 agent 的命令。请参考 examples 仓库获取完整实现。
def run_worker(rank, world_size, n_episode, batch, print_log=True):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
if rank == 0:
# rank0 is the agent
rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)
agent = Agent(world_size, batch)
for i_episode in range(n_episode):
last_reward, running_reward = agent.run_episode(n_steps=NUM_STEPS)
if print_log:
print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
i_episode, last_reward, running_reward))
else:
# other ranks are the observer
rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
# observers passively waiting for instructions from agents
rpc.shutdown()
def main():
for world_size in range(2, 12):
delays = []
for batch in [True, False]:
tik = time.time()
mp.spawn(
run_worker,
args=(world_size, args.num_episode, batch),
nprocs=world_size,
join=True
)
tok = time.time()
delays.append(tok - tik)
print(f"{world_size}, {delays[0]}, {delays[1]}")
if __name__ == '__main__':
main()
批量 RPC 有助于将动作推理整合到更少的 CUDA 操作中,从而降低了分摊开销。上面的 main
函数使用不同数量的 observer(从 1 到 10),在批量和非批量模式下运行相同的代码。下图绘制了使用默认参数值在不同 world size 下的执行时间。结果证实了我们的预期,即批量处理有助于加速训练。
