快捷方式

远程引用协议

本说明介绍了远程引用协议的设计细节,并介绍了不同场景中的消息流。在继续之前,请确保你熟悉分布式 RPC 框架

背景

RRef 表示远程引用。它是位于本地或远程工作程序上的对象的引用,并且在底层透明地处理引用计数。从概念上讲,它可以被视为分布式共享指针。应用程序可以通过调用 remote() 来创建 RRef。每个 RRef 都归属于 remote() 调用的被调用者工作程序(即所有者),并且可以被多个用户使用。所有者存储真实数据并跟踪全局引用计数。每个 RRef 都可以通过全局 RRefId 唯一标识,该标识符是在 remote() 调用的调用者上创建时分配的。

在所有者工作程序上,只有一个 OwnerRRef 实例,其中包含真实数据,而在用户工作程序上,可以根据需要创建任意多个 UserRRefs,并且 UserRRef 不包含数据。所有者上的所有用法都将使用全局唯一的 RRefId 检索唯一的 OwnerRRef 实例。UserRRef 将在 rpc_sync()rpc_async()remote() 调用中用作参数或返回值时创建,并且会根据更新引用计数通知所有者。OwnerRRef 及其数据将在全局没有 UserRRef 实例且所有者上也没有对 OwnerRRef 的引用时删除。

假设

RRef 协议在以下假设下设计。

  • 瞬态网络故障:RRef 设计通过重试消息来处理瞬态网络故障。它无法处理节点崩溃或永久网络分区。当发生这些事件时,应用程序应关闭所有工作进程,还原到上一个检查点并恢复训练。

  • 非幂等 UDF:我们假设提供给 rpc_sync()rpc_async()remote() 的用户函数 (UDF) 不是幂等的,因此无法重试。但是,内部 RRef 控制消息是幂等的,并且在消息故障时重试。

  • 无序消息传递:我们不假设任何一对节点之间的消息传递顺序,因为发送方和接收方都在使用多个线程。无法保证哪条消息将首先处理。

RRef 生命周期

该协议的目标是在适当的时候删除 OwnerRRef。删除 OwnerRRef 的正确时间是没有活动 UserRRef 实例且用户代码也没有引用 OwnerRRef 的时候。棘手之处在于确定是否存在任何活动 UserRRef 实例。

设计原理

用户可以在三种情况下获取 UserRRef

  1. 从所有者接收 UserRRef

  2. 从另一个用户接收 UserRRef

  3. 创建由另一个工作进程拥有的新 UserRRef

案例 1 最简单,其中所有者将其 RRef 传递给用户,所有者调用 rpc_sync()rpc_async()remote(),并将其 RRef 用作参数。在这种情况下,将在用户上创建一个新的 UserRRef。由于所有者是调用者,因此它可以轻松地在 OwnerRRef 上更新其本地引用计数。

唯一的要求是任何 UserRRef 都必须在销毁时通知所有者。因此,我们需要第一个保证

G1. 在删除任何 UserRRef 时,将通知所有者。

由于消息可能会延迟或乱序到达,因此我们需要另一个保证来确保删除消息不会过早处理。如果 A 向 B 发送涉及 RRef 的消息,我们称 A 上的 RRef(父 RRef)和 B 上的 RRef(子 RRef)。

G2. 在所有者确认子 RRef 之前,不会删除父 RRef。

在案例 2 和 3 中,所有者可能只对 RRef 分叉图有部分了解,甚至完全不了解。例如,可以在用户上构造 RRef,并且在所有者收到任何 RPC 调用之前,创建者用户可能已与其他用户共享 RRef,而这些用户又可以进一步共享 RRef。一个不变式是任何 RRef 的分叉图始终是一棵树,因为分叉 RRef 始终在被调用者上创建一个新的 UserRRef 实例(除非被调用者是所有者),因此每个 RRef 都只有一个父级。

所有者对树中的任何 UserRRef 的看法有三个阶段

1) unknown -> 2) known -> 3) deleted.

所有者对整个树的看法不断变化。当所有者认为没有活动的 UserRRef 实例时,它会删除其 OwnerRRef 实例,即当 OwnerRRef 被删除时,所有 UserRRef 实例都可以被实际删除或未知。危险的情况是,当某些分叉未知而其他分叉已删除时。

G2 显然保证在所有者知道其所有子级 UserRRef 实例之前,不会删除任何父级 UserRRef。但是,在所有者知道其父级 UserRRef 之前,可能会删除子级 UserRRef

考虑以下示例,其中 OwnerRRef 分叉到 A,然后 A 分叉到 Y,Y 分叉到 Z

OwnerRRef -> A -> Y -> Z

如果所有者在 Y 的消息之前处理了 Z 的所有消息,包括删除消息。所有者将在知道 Y 存在之前了解 Z 的删除。不过,这不会造成任何问题。因为 Y 的至少一个祖先(A)将保持活动状态,并且它将阻止所有者删除 OwnerRRef。更具体地说,如果所有者不知道 Y,则由于 G2,无法删除 A,并且所有者知道 A,因为它就是 A 的父级。

如果 RRef 是在用户上创建的,事情会变得有点棘手

OwnerRRef
    ^
    |
    A -> Y -> Z

如果 Z 在 UserRRef 上调用 to_here(),则在 Z 被删除时,所有者至少知道 A,否则,to_here() 不会完成。如果 Z 不调用 to_here(),则所有者有可能在收到来自 A 和 Y 的任何消息之前收到来自 Z 的所有消息。在这种情况下,由于 OwnerRRef 的真实数据尚未创建,因此也没有要删除的内容。这与 Z 根本不存在相同。因此,仍然可以。

实现

G1 通过在 UserRRef 析构函数中发送删除消息来实现。为了提供 G2,每当分叉时,父级 UserRRef 都会被放入上下文中,其索引为新的 ForkId。仅当父级 UserRRef 从子级收到确认消息 (ACK) 时,才会从上下文中删除该父级,并且只有在所有者确认后,子级才会发送 ACK。

协议场景

现在,我们来讨论上述设计如何转化为四种场景中的协议。

用户与所有者共享 RRef 作为返回值

import torch
import torch.distributed.rpc as rpc

# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rref.to_here()

在这种情况下,UserRRef 在用户工作进程 A 上创建,然后与远程消息一起传递给所有者工作进程 B,然后 B 创建 OwnerRRef。方法 remote() 立即返回,这意味着在所有者知道 UserRRef 之前,可以分叉/使用它。

在所有者收到 remote() 调用时,它将创建 OwnerRRef,并返回 ACK 以确认 {100, 1} (RRefIdForkId)。只有在收到此 ACK 后,A 才能删除其 UserRRef。这涉及 G1G2G1 很明显。对于 G2OwnerRRefUserRRef 的子级,并且在收到所有者的 ACK 之前,不会删除 UserRRef

user_to_owner_ret.png

上图显示了消息流,其中实线箭头包含用户函数,而虚线箭头是内置消息。请注意,从 A 到 B 的前两条消息(remote()to_here())可能会以任何顺序到达 B,但只有在

  • B 确认 UserRRef {100, 1} (G2),

  • Python GC 同意删除本地 UserRRef 实例。当 RRef 不再处于作用域中并且符合垃圾回收条件时,就会发生这种情况。

用户将 RRef 与所有者作为参数共享

import torch
import torch.distributed.rpc as rpc

# on worker A and worker B
def func(rref):
  pass

# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rpc.rpc_async('B', func, args=(rref, ))

在这种情况下,在 A 上创建 UserRRef 之后,A 将其用作对 B 的后续 RPC 调用中的参数。A 将保持 UserRRef {100, 1} 存活,直到收到 B 的确认(G2,而不是 RPC 调用的返回值)。这是必要的,因为在收到所有先前消息之前,A 不应发送删除消息,否则,OwnerRRef 可能会在使用前被删除,因为我们不保证消息传递顺序。这是通过创建 RRef 的子 ForkId 来完成的,在映射中保存它们,直到收到所有者确认子 ForkId。下图显示了消息流。

user_to_owner_arg.png

请注意,UserRRef 可以在 func 完成甚至开始之前在 B 上被删除。但是,这是可以的,因为当 B 为子 ForkId 发送 ACK 时,它已经获取了 OwnerRRef 实例,这将防止它过早被删除。

所有者与用户共享 RRef

所有者到用户是最简单的情况,所有者可以在本地更新引用计数,并且不需要任何其他控制消息来通知其他人。关于 G2,它与父级立即从所有者处收到 ACK 相同,因为父级是所有者。

import torch
import torch.distributed.rpc as RRef, rpc

# on worker B and worker C
def func(rref):
  pass

# on worker B, creating a local RRef
rref = RRef("data")
# say the rref has RRefId 100
dist.rpc_async('C', func, args=(rref, ))
owner_to_user.png

上图显示了消息流。请注意,当 OwnerRRef 在 rpc_async 调用后退出作用域时,它不会被删除,因为内部有一个映射来保持它存活,如果存在任何已知的 fork,在这种情况下是 UserRRef {100, 1}。 (G2)

用户与用户共享 RRef

这是最复杂的情况,其中调用方用户(父级 UserRRef)、被调用方用户(子级 UserRRef)和所有者都需要参与。

import torch
import torch.distributed.rpc as rpc

# on worker A and worker C
def func(rref):
  pass

# on worker A
rref = rpc.remote('B', torch.add, args=(torch.ones(2), 1))
# say the rref has RRefId 100 and ForkId 1
rpc.rpc_async('C', func, args=(rref, ))
user_to_user.png

当 C 从 A 接收子级 UserRRef 时,它会向所有者 B 发送一个 fork 请求。稍后,当 B 在 C 上确认 UserRRef 时,C 将并行执行两个操作:1) 向 A 发送子级 ACK,2) 运行用户提供的函数。在此期间,父级 (A) 将保持其 UserRRef {100, 1} 处于活动状态以实现G2

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取面向初学者和高级开发者的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源