Rendezvous¶
在 Torch 分布式弹性中,我们使用术语 *rendezvous* 来指代一种将 **分布式同步** 原语与 **对等发现** 相结合的特定功能。
Torch 分布式弹性使用它来收集训练作业的参与者(即节点),使它们都同意相同的参与者列表和每个人的角色,并就何时开始/恢复训练做出一致的集体决定。
Torch 分布式弹性 rendezvous 提供以下关键功能
屏障:
执行 rendezvous 的节点将一直阻塞,直到 rendezvous 被认为完成 - 这是当至少 min
个节点加入 rendezvous 屏障(对于同一个作业)时发生的。这也意味着屏障不一定是固定大小的。
达到 min
个节点后,还会有一个额外的短暂等待时间 - 这是为了确保 rendezvous 不会“过快”完成(这可能会排除大约在同一时间尝试加入的额外节点)。
如果在屏障处收集了 max
个节点,则 rendezvous 会立即完成。
还存在一个总体超时时间,如果从未达到 min
个节点,则会导致 rendezvous 失败 - 这是一种简单的故障保护机制,有助于释放部分分配的作业资源,以防资源管理器出现问题,并且应该被解释为不可重试的错误。
排他性:
简单的分布式屏障是不够的,因为我们还需要确保在任何给定时间(对于给定作业)只有一个节点组存在。换句话说,新的节点(即迟到的节点)不应该能够为同一作业形成一个并行的独立工作程序组。
Torch 分布式弹性 rendezvous 确保如果一组节点已经完成了 rendezvous(因此可能已经在训练),那么尝试进行 rendezvous 的额外“迟到”节点只会宣布它们正在等待,并且必须等待先前完成的现有 rendezvous 销毁后才能继续。
一致性:
当 rendezvous 完成时,所有成员将就作业成员资格和每个人在其中的角色达成一致。该角色使用一个称为秩的整数来表示,该整数介于 0 和世界大小之间。
请注意,秩 *不稳定*,因为同一个节点可以在下一次(重新)rendezvous 中被分配不同的秩。
容错性:
Torch 分布式弹性 rendezvous 旨在容忍 rendezvous 过程中的节点故障。如果某个进程在加入 rendezvous 和完成 rendezvous 之间崩溃(或失去网络连接等),则会自动与剩余的健康节点进行重新 rendezvous。
节点也可以在完成 rendezvous 之后失败(或被其他节点观察到已经完成 rendezvous) - 这种情况将由 Torch 分布式弹性 train_loop
处理(它也会触发重新 rendezvous)。
共享键值存储:
当 rendezvous 完成时,会创建一个共享的键值存储并返回。此存储实现 torch.distributed.Store
API(请参阅 分布式通信文档)。
此存储仅由完成 rendezvous 的成员共享。它旨在由 Torch 分布式弹性用来交换初始化作业控制和数据平面所需的信息。
等待中的工作程序和 rendezvous 关闭:
Torch 分布式弹性 rendezvous 处理程序对象提供了一些额外的功能,从技术上讲,这些功能不属于 rendezvous 过程
查询有多少工作程序迟到了屏障,谁可以参与 *下次* rendezvous。
将 rendezvous 设置为 *关闭*,以向所有节点发出信号,表示它们不应参与下次 rendezvous。
DynamicRendezvousHandler:
Torch 分布式弹性附带了 DynamicRendezvousHandler
类,它实现了上面描述的 rendezvous 机制。它是一种与后端无关的类型,它期望在构造期间指定特定的 RendezvousBackend
实例。
Torch 分布式用户可以实现自己的后端类型,也可以使用 PyTorch 附带的以下实现之一
C10dRendezvousBackend
: 使用 C10d 存储(默认情况下为TCPStore
)作为 rendezvous 后端。使用 C10d 存储的主要优点是它不需要任何第三方依赖项(例如 etcd)来建立 rendezvous。EtcdRendezvousBackend
: 替代了旧的EtcdRendezvousHandler
类。将EtcdRendezvousBackend
实例传递给DynamicRendezvousHandler
在功能上等同于实例化一个EtcdRendezvousHandler
.store = TCPStore("localhost") backend = C10dRendezvousBackend(store, "my_run_id") rdzv_handler = DynamicRendezvousHandler.from_backend( run_id="my_run_id", store=store, backend=backend, min_nodes=2, max_nodes=4 )
以下是描述rendezvous工作原理的状态图。

注册表¶
- class torch.distributed.elastic.rendezvous.RendezvousParameters(backend, endpoint, run_id, min_nodes, max_nodes, local_addr=None, **kwargs)[source]¶
保存构建
RendezvousHandler
的参数。- 参数
- class torch.distributed.elastic.rendezvous.RendezvousHandlerRegistry[source]¶
表示
RendezvousHandler
后端的注册表。
处理程序¶
- class torch.distributed.elastic.rendezvous.RendezvousHandler[source]¶
主要的 rendezvous 接口。
注意
分布式 Torch 用户通常**不需要**实现自己的
RendezvousHandler
。基于 C10d Store 的实现已经提供,建议大多数用户使用。- abstract get_run_id()[source]¶
返回 rendezvous 的运行 ID。
运行 ID 是用户定义的 ID,用于唯一标识分布式应用程序的实例。它通常映射到作业 ID,并用于允许节点加入正确的分布式应用程序。
- 返回类型
- abstract is_closed()[source]¶
检查 rendezvous 是否已关闭。
关闭的 rendezvous 表示所有未来尝试在同一作业中重新 rendezvous 都会失败。
is_closed()
和set_closed()
具有最终传播的语义,不应用于同步。意图是如果至少有一个节点决定作业已完成,它将关闭 rendezvous,其他节点很快就会观察到这一点,并停止运行。- 返回类型
- abstract next_rendezvous()[source]¶
rendezvous 障碍的主要入口点。
阻塞直到 rendezvous 完成并且当前进程包含在形成的工作组中,或者超时发生,或者 rendezvous 被标记为关闭。
- 返回值
的实例
RendezvousInfo
.- 引发
RendezvousClosedError – rendezvous 已关闭。
RendezvousConnectionError – 与 rendezvous 后端的连接已失败。
RendezvousStateError – rendezvous 状态已损坏。
RendezvousTimeoutError – rendezvous 未及时完成。
- 返回类型
- abstract num_nodes_waiting()[source]¶
返回到达 rendezvous 障碍过晚的节点数量,因此未包含在当前工作组中。
调用者应定期调用此方法以检查是否有新节点正在等待加入作业,如果有,则通过调用
next_rendezvous()
(重新 rendezvous)来接纳它们。- 返回类型
- abstract shutdown()[source]¶
关闭所有为 rendezvous 打开的资源。
示例
rdzv_handler = ... try: store, rank, world_size = rdzv_handler.next_rendezvous() finally: rdzv_handler.shutdown()
- 返回类型
- property use_agent_store: bool¶
指示由
next_rendezvous()
返回的存储引用可以与用户应用程序共享,并在应用程序生命周期内可用。Rendezous 处理器实现将共享存储详细信息作为
RendezvousStoreInfo
的实例。应用程序按照约定使用 MASTER_ADDR/MASTER_PORT 环境变量来查找存储。
数据类¶
- class torch.distributed.elastic.rendezvous.RendezvousInfo(store, rank, world_size, bootstrap_store_info)[source]¶
保存关于 rendezvous 的信息。
异常¶
- class torch.distributed.elastic.rendezvous.api.RendezvousTimeoutError[source]¶
当 rendezvous 未能在规定时间内完成时引发。
- class torch.distributed.elastic.rendezvous.api.RendezvousConnectionError[source]¶
当与 rendezvous 后端的连接失败时引发。
实现¶
动态 Rendezvous¶
- torch.distributed.elastic.rendezvous.dynamic_rendezvous.create_handler(store, backend, params)[source]¶
从指定参数创建一个新的
DynamicRendezvousHandler
。- 参数
store (Store) – 要作为 rendezvous 部分返回的 C10d 存储。
backend (RendezvousBackend) – 用于保存 rendezvous 状态的后端。
- 返回类型
参数
描述
join_timeout
rendezvous 预计完成的总时间(以秒为单位)。默认为 600 秒。
last_call_timeout
在达到最小节点数后,在完成 rendezvous 之前等待的额外时间量(以秒为单位)。默认为 30 秒。
close_timeout
在调用
RendezvousHandler.set_closed()
或RendezvousHandler.shutdown()
之后,rendezvous 预计关闭的时间(以秒为单位)。默认为 30 秒。
- class torch.distributed.elastic.rendezvous.dynamic_rendezvous.DynamicRendezvousHandler[source]¶
表示在节点集之间建立 rendezvous 的处理程序。
- classmethod from_backend(run_id, store, backend, min_nodes, max_nodes, local_addr=None, timeout=None)[source]¶
创建一个新的
DynamicRendezvousHandler
。- 参数
run_id (str) – rendezvous 的运行 ID。
store (Store) – 要作为 rendezvous 部分返回的 C10d 存储。
backend (RendezvousBackend) – 用于保存 rendezvous 状态的后端。
min_nodes (int) – 允许加入rendezvous的最小节点数。
max_nodes (int) – 允许加入rendezvous的最大节点数。
timeout (Optional[RendezvousTimeout]) – rendezvous 的超时配置。
- class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousBackend[source]¶
表示保存 rendezvous 状态的后端。
- abstract get_state()[source]¶
获取 rendezvous 状态。
- 返回值
编码 rendezvous 状态及其围栏标记的元组,如果后端中没有状态,则为
None
。- 引发
RendezvousConnectionError – 与后端的连接失败。
RendezvousStateError – rendezvous 状态已损坏。
- 返回类型
- 抽象 set_state(state, token=None)[源代码]¶
设置会合状态。
新的会合状态将有条件地设置
如果指定的
token
与后端存储的围栏令牌匹配,则状态将更新。新的状态将连同其围栏令牌一起返回给调用者。如果指定的
token
与后端存储的围栏令牌不匹配,则状态不会更新;相反,现有的状态及其围栏令牌将返回给调用者。如果指定的
token
为None
,则仅当后端不存在现有状态时才会设置新状态。新的状态或现有状态及其围栏令牌将返回给调用者。
- 参数
state (字节) – 编码后的会合状态。
token (可选[任何]) – 一个可选的围栏令牌,它是在先前的对
get_state()
或set_state()
的调用中检索到的。
- 返回值
序列化后的会合状态、其围栏令牌以及一个布尔值元组,表示我们的设置尝试是否成功。
- 引发
RendezvousConnectionError – 与后端的连接失败。
RendezvousStateError – rendezvous 状态已损坏。
- 返回类型
- 类 torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousTimeout(join=None, last_call=None, close=None, heartbeat=None)[源代码]¶
保存会合的超时配置。
- 参数
C10d 后端¶
- torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.create_backend(params)[源代码]¶
从指定的参数创建一个新的
C10dRendezvousBackend
。参数
描述
store_type
C10d 存储的类型。当前支持的类型是“tcp”和“file”,分别对应于
torch.distributed.TCPStore
和torch.distributed.FileStore
。默认值为“tcp”。read_timeout
存储操作的读取超时时间(以秒为单位)。默认值为 60 秒。
请注意,这仅适用于
torch.distributed.TCPStore
。它与torch.distributed.FileStore
不相关,因为torch.distributed.FileStore
不会将超时时间作为参数。is_host
一个布尔值,表示此后端实例是否将托管 C10d 存储。如果未指定,它将通过将此机器的主机名或 IP 地址与指定的会合端点进行匹配,以启发式方式推断。默认值为
None
。请注意,此配置选项仅适用于
torch.distributed.TCPStore
。在正常情况下,您可以安全地跳过它;仅当无法正确确定其值时才需要它(例如,会合端点的主机名是 CNAME 或与机器的 FQDN 不匹配)。- 返回类型
- 类 torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.C10dRendezvousBackend(store, run_id)[源代码]¶
表示一个 C10d 支持的会合后端。
- 参数
store (存储) – 用于与 C10d 存储通信的
torch.distributed.Store
实例。run_id (str) – rendezvous 的运行 ID。
Etcd 后端¶
- torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.create_backend(params)[源代码]¶
从指定参数创建一个新的
EtcdRendezvousBackend
。参数
描述
read_timeout
etcd 操作的读取超时时间,以秒为单位。默认值为 60 秒。
协议
用于与 etcd 通信的协议。有效值为“http”和“https”。默认值为“http”。
ssl_cert
与 HTTPS 一起使用的 SSL 客户端证书的路径。默认值为
None
。ssl_cert_key
与 HTTPS 一起使用的 SSL 客户端证书的私钥路径。默认值为
None
。ca_cert
rool SSL 颁发机构证书的路径。默认值为
None
。- 返回类型
Etcd Rendezvous(遗留)¶
警告
The DynamicRendezvousHandler
class supersedes the EtcdRendezvousHandler
class, and is recommended for most users. EtcdRendezvousHandler
is in maintenance mode and will be deprecated in the future.
- 类 torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvousHandler(rdzv_impl)[源代码]¶
实现一个
torch.distributed.elastic.rendezvous.RendezvousHandler
接口,该接口由torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous
支持。EtcdRendezvousHandler
使用一个 URL 来配置要使用的 rendezvous 类型,并将特定于实现的配置传递给 rendezvous 模块。基本的 etcd rendezvous 配置 URL 如下所示etcd://<etcd_address>:<port>/<job_id>?min_workers=<min_workers>&max_workers=<max_workers> # noqa: W605 -- example -- etcd://:2379/1234?min_workers=1&max_workers=3
上面的 URL 解释如下
使用已注册到
etcd
方案的 rendezvous 处理程序要使用的
etcd
端点是localhost:2379
job_id == 1234
用作 etcd 中的前缀(这允许人们为多个作业共享一个公共 etcd 服务器,只要job_ids
保证是唯一的)。请注意,作业 ID 可以是任何字符串(例如,不需要是数字),只要它是唯一的。min_workers=1
和max_workers=3
指定了成员资格大小的范围 - Torch Distributed Elastic 在集群大小大于或等于min_workers
时开始运行作业,并且最多允许max_workers
加入集群。
以下是可传递给 etcd rendezvous 的参数的完整列表
参数
描述
min_workers
rendezvous 有效的最小工作程序数量
max_workers
允许加入的最大工作程序数量
timeout
预期 next_rendezvous 成功完成的总超时时间(默认 600 秒)
last_call_timeout
达到最小工作程序数量后,额外的等待时间(“最后一次调用”)(默认为 30 秒)
etcd_prefix
路径前缀(从 etcd 根目录),所有 etcd 节点将在其中创建(默认值为
/torchelastic/p2p
)
Etcd Store¶
The EtcdStore
is the C10d Store
instance type returned by next_rendezvous()
when etcd is used as the rendezvous backend.
Etcd 服务器¶
EtcdServer
是一个方便的类,使您可以轻松地在子进程上启动和停止 etcd 服务器。这对于测试或单节点(多工作器)部署很有用,在这些部署中,手动在侧面设置 etcd 服务器很麻烦。
警告
对于生产和多节点部署,请考虑正确部署高可用 etcd 服务器,因为它是分布式作业的单点故障。
- class torch.distributed.elastic.rendezvous.etcd_server.EtcdServer(data_dir=None)[source]¶
注意
在 etcd 服务器 v3.4.3 上测试。
在随机空闲端口上启动和停止本地独立 etcd 服务器。适用于单节点、多工作器启动或测试,在这些情况下,与单独设置 etcd 服务器相比,使用 sidecar etcd 服务器更方便。
此类注册了一个终止处理程序,以便在退出时关闭 etcd 子进程。此终止处理程序不能替代调用
stop()
方法。使用以下回退机制来查找 etcd 二进制文件
使用环境变量 TORCHELASTIC_ETCD_BINARY_PATH
如果存在,则使用
<this file root>/bin/etcd
使用
etcd
从PATH
用法
server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd") server.start() client = server.get_client() # use client server.stop()
- 参数
etcd_binary_path – etcd 服务器二进制文件的路径(有关回退路径,请参见上文)