快捷方式

Rendezvous

在 Torch 分布式弹性环境中,我们使用“Rendezvous”来指代一种特定功能,它结合了**分布式同步**原语和**对等发现**。

Torch 分布式弹性使用它来收集训练作业的参与者(即节点),以便它们都就参与者列表和每个人的角色达成一致,并就训练何时可以开始/恢复做出一致的集体决定。

Torch 分布式弹性 Rendezvous 提供以下关键功能

屏障:

执行 Rendezvous 的节点将全部阻塞,直到 Rendezvous 被认为已完成 - 当至少 min 个节点加入 Rendezvous 屏障(对于同一个作业)时,就会发生这种情况。这也意味着屏障的大小不一定是固定的。

达到 min 个节点后,还会有一段额外的短时间等待 - 这是为了确保 Rendezvous 不会“过快”完成(这可能会排除大约在同一时间尝试加入的额外节点)。

如果在屏障处收集了 max 个节点,则 Rendezvous 会立即完成。

还有一个总体超时,如果从未达到 min 个节点,则会导致 Rendezvous 失败 - 这旨在成为一个简单的故障安全措施,以帮助释放部分分配的作业资源,以防资源管理器出现问题,并且应该被解释为不可重试的。

排他性:

一个简单的分布式屏障将不足够,因为我们还需要确保在任何给定时间只有一个节点组存在(对于给定作业)。换句话说,新的节点(即延迟加入)不应该能够为同一个作业形成一个独立的并行工作组。

Torch 分布式弹性 Rendezvous 确保如果一组节点已经完成了 Rendezvous(因此可能已经在训练),那么尝试进行 Rendezvous 的其他“延迟”节点只会宣布自己处于等待状态,并且必须等到之前完成的现有 Rendezvous 首先被销毁。

一致性:

当 Rendezvous 完成时,所有成员都将就作业成员资格和每个人的角色达成一致。该角色使用一个整数表示,称为等级,它介于 0 和世界大小之间。

请注意,等级是不稳定的,因为同一个节点可以在下一次(重新)Rendezvous 中被分配一个不同的等级。

容错性:

Torch 分布式弹性 Rendezvous 旨在容忍 Rendezvous 过程中的节点故障。如果一个进程在加入 Rendezvous 和 Rendezvous 完成之间崩溃(或失去网络连接等),那么对剩余的健康节点进行重新 Rendezvous 将会自动发生。

一个节点也可以在完成(或被其他节点观察到已完成)Rendezvous 后失败 - 此场景将由 Torch 分布式弹性 train_loop 处理(它也将触发重新 Rendezvous)。

共享键值存储:

当 Rendezvous 完成时,将创建并返回一个共享键值存储。此存储实现 torch.distributed.Store API(请参阅分布式通信文档)。

此存储仅由已完成的 Rendezvous 的成员共享。它旨在由 Torch 分布式弹性用于交换初始化作业控制和数据平面所需的信息。

等待的工作进程和 Rendezvous 关闭:

Torch 分布式弹性 Rendezvous 处理程序对象提供其他功能,这些功能从技术上来说不属于 Rendezvous 过程

  1. 查询有多少工作进程在屏障处延迟到达,哪些工作进程可以参与下一次 Rendezvous。

  2. 设置 Rendezvous 已关闭,以向所有节点发出信号,不要参与下一次 Rendezvous。

DynamicRendezvousHandler:

Torch 分布式弹性附带了 DynamicRendezvousHandler 类,它实现了上面描述的 Rendezvous 机制。它是一种与后端无关的类型,它期望在构造期间指定一个特定的 RendezvousBackend 实例。

Torch 分布式用户可以实现自己的后端类型,也可以使用 PyTorch 附带的以下实现之一

下面是一个描述 rendezvous 工作原理的状态图。

../_images/etcd_rdzv_diagram.png

注册表

class torch.distributed.elastic.rendezvous.RendezvousParameters(backend, endpoint, run_id, min_nodes, max_nodes, local_addr=None, **kwargs)[source]

保存用于构建 RendezvousHandler 的参数。

参数
  • backend (str) – 用于处理 rendezvous 的后端名称。

  • endpoint (str) – rendezvous 的端点,通常为 <hostname>[:<port>] 形式。

  • run_id (str) – rendezvous 的 id。

  • min_nodes (int) – 加入 rendezvous 的节点数的最小值。

  • max_nodes (int) – 加入 rendezvous 的节点数的最大值。

  • local_addr (Optional[str]) – 本地节点的地址。

  • **kwargs – 指定后端的其他参数。

get(key, default=None)[source]

如果 key 存在,则返回 key 的值,否则返回 default

返回类型

Any

get_as_bool(key, default=None)[source]

bool 形式返回 key 的值。

返回类型

Optional[bool]

get_as_int(key, default=None)[source]

int 形式返回 key 的值。

返回类型

Optional[int]

class torch.distributed.elastic.rendezvous.RendezvousHandlerRegistry[source]

表示 RendezvousHandler 后端的注册表。

处理程序

class torch.distributed.elastic.rendezvous.RendezvousHandler[source]

主要的 rendezvous 接口。

注意

分布式 Torch 用户通常 **不需要** 实现自己的 RendezvousHandler。基于 C10d Store 的实现已经提供,并且推荐大多数用户使用。

abstract get_backend()[source]

返回 rendezvous 后端的名称。

返回类型

str

abstract get_run_id()[source]

返回 rendezvous 的运行 ID。

运行 ID 是用户定义的 ID,用于唯一标识分布式应用程序的实例。它通常映射到作业 ID,并用于允许节点加入正确的分布式应用程序。

返回类型

str

abstract is_closed()[source]

检查 rendezvous 是否已关闭。

关闭的 rendezvous 意味着所有未来在同一作业内重新进行 rendezvous 的尝试都将失败。

is_closed()set_closed() 具有最终传播的语义,不应用于同步。意图是,如果至少一个节点决定作业已完成,它将关闭 rendezvous,并且其他节点很快会观察到这一点并停止运行。

返回类型

bool

abstract next_rendezvous()[source]

进入 rendezvous 障碍的主要入口。

阻塞直到 rendezvous 完成且当前进程包含在已形成的 worker 组中,或者超时发生,或者 rendezvous 被标记为已关闭。

返回

的实例 RendezvousInfo.

引发
返回类型

RendezvousInfo

abstract num_nodes_waiting()[source]

返回在 rendezvous 障碍到达迟到的节点数量,因此未包含在当前 worker 组中。

调用者应定期调用此方法以检查是否有新的节点正在等待加入作业,如果有,则通过调用 next_rendezvous() (重新 rendezvous) 承认它们。

返回类型

int

abstract set_closed()[source]

将 rendezvous 标记为已关闭。

abstract shutdown()[source]

关闭与 rendezvous 相关的所有资源。

示例

rdzv_handler = ...
try:
    store, rank, world_size = rdzv_handler.next_rendezvous()
finally:
    rdzv_handler.shutdown()
返回类型

bool

property use_agent_store: bool

指示由 next_rendezvous() 返回的存储引用可以与用户应用程序共享,并在应用程序生命周期中可用。

Rendezvous 处理器实现将以 RendezvousStoreInfo 实例的形式共享存储详细信息。作为约定,应用程序使用 MASTER_ADDR/MASTER_PORT 环境变量来查找存储。

数据类

class torch.distributed.elastic.rendezvous.RendezvousInfo(store, rank, world_size, bootstrap_store_info)[source]

保存有关 rendezvous 的信息。

class torch.distributed.elastic.rendezvous.api.RendezvousStoreInfo(master_addr, master_port)[source]

用于启动训练器分布式通信的存储地址和端口

static build(rank, store)[source]

工厂方法,在 rank0 主机上查找未使用的端口,并获取所有 rank 的地址/端口信息。

如果知道 master_addr/master_port(在共享现有 tcp 存储服务器时有用),则使用构造函数。

参数
  • rank (int) – 当前节点的 rank

  • store (Store) – 用于 rendezvous 的存储

  • local_addr (Optional[str]) – 当前节点的地址,如果没有提供,将从主机名解析

返回类型

RendezvousStoreInfo

异常

class torch.distributed.elastic.rendezvous.api.RendezvousError[source]

表示 rendezvous 错误的基本类型。

class torch.distributed.elastic.rendezvous.api.RendezvousClosedError[source]

当 rendezvous 关闭时引发。

class torch.distributed.elastic.rendezvous.api.RendezvousTimeoutError[source]

当 rendezvous 未能在指定时间内完成时引发。

class torch.distributed.elastic.rendezvous.api.RendezvousConnectionError[source]

当与 rendezvous 后端连接失败时引发。

class torch.distributed.elastic.rendezvous.api.RendezvousStateError[source]

当 rendezvous 的状态损坏时引发。

class torch.distributed.elastic.rendezvous.api.RendezvousGracefulExitError[source]

当节点未包含在 rendezvous 中并优雅地退出时引发。

异常是一种退出堆栈的机制,但这并不意味着失败。

实现

动态 Rendezvous

torch.distributed.elastic.rendezvous.dynamic_rendezvous.create_handler(store, backend, params)[source]

从指定的参数创建新的 DynamicRendezvousHandler

参数
  • store (Store) – 作为 rendezvous 部分返回的 C10d 存储。

  • backend (RendezvousBackend) – 用于保存 rendezvous 状态的后端。

返回类型

DynamicRendezvousHandler

参数

描述

join_timeout

预计 rendezvous 完成的总时间,以秒为单位。默认值为 600 秒。

last_call_timeout

在达到最少节点数后,在完成 rendezvous 之前的额外等待时间,以秒为单位。默认值为 30 秒。

close_timeout

在调用 RendezvousHandler.set_closed()RendezvousHandler.shutdown() 之后,预计 rendezvous 关闭的时间,以秒为单位。默认值为 30 秒。

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.DynamicRendezvousHandler[source]

表示在节点集中建立 rendezvous 的处理程序。

classmethod from_backend(run_id, store, backend, min_nodes, max_nodes, local_addr=None, timeout=None)[source]

创建一个新的 DynamicRendezvousHandler

参数
  • run_id (str) – rendezvous 的运行 ID。

  • store (Store) – 作为 rendezvous 部分返回的 C10d 存储。

  • backend (RendezvousBackend) – 用于保存 rendezvous 状态的后端。

  • min_nodes (int) – 加入 rendezvous 的节点数的最小值。

  • max_nodes (int) – 加入 rendezvous 的节点数的最大值。

  • local_addr (Optional[str]) – 本地节点地址。

  • timeout (Optional[RendezvousTimeout]) – rendezvous 的超时配置。

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousBackend[source]

表示保存 rendezvous 状态的后端。

abstract get_state()[source]

获取 Rendezvous 状态。

返回

返回一个元组,包含编码后的 Rendezvous 状态和其栅栏令牌,如果后端没有找到状态,则返回 None

引发
返回类型

Optional[Tuple[bytes, Any]]

abstract property name: str

获取后端名称。

abstract set_state(state, token=None)[source]

设置 Rendezvous 状态。

新的 Rendezvous 状态将被有条件地设置

  • 如果指定的 token 与后端中存储的栅栏令牌匹配,状态将被更新。新的状态将与它的栅栏令牌一起返回给调用者。

  • 如果指定的 token 与后端中存储的栅栏令牌不匹配,状态将不会更新;相反,现有的状态及其栅栏令牌将被返回给调用者。

  • 如果指定的 tokenNone,新的状态将只在后端中没有现有状态的情况下设置。新的状态或现有的状态及其栅栏令牌将被返回给调用者。

参数
  • state (bytes) – 编码后的 Rendezvous 状态。

  • token (Optional[Any]) – 一个可选的栅栏令牌,它是由之前的 get_state()set_state() 调用获取的。

返回

返回一个元组,包含序列化后的 Rendezvous 状态、其栅栏令牌,以及一个布尔值,指示我们的设置尝试是否成功。

引发
返回类型

Optional[Tuple[bytes, Any, bool]]

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousTimeout(join=None, last_call=None, close=None, heartbeat=None)[source]

保存 Rendezvous 的超时配置。

参数
  • join (Optional[timedelta]) – 预计 Rendezvous 完成的时间。

  • last_call (Optional[timedelta]) – Rendezvous 达到最少所需参与者数量后,额外的等待时间,在 Rendezvous 完成前。

  • close (Optional[timedelta]) – 调用 RendezvousHandler.set_closed()RendezvousHandler.shutdown() 后,预计 Rendezvous 关闭的时间。

  • keep_alive – 预计保活心跳完成的时间。

property close: timedelta

获取关闭超时。

property heartbeat: timedelta

获取保活心跳超时。

property join: timedelta

获取加入超时。

property last_call: timedelta

获取最后调用超时。

C10d 后端

torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.create_backend(params)[source]

从指定的参数创建一个新的 C10dRendezvousBackend

参数

描述

store_type

C10d 存储的类型。当前支持的类型是 “tcp” 和 “file”,分别对应于 torch.distributed.TCPStoretorch.distributed.FileStore。默认为 “tcp”。

read_timeout

存储操作的读取超时时间(以秒为单位)。默认为 60 秒。

注意,这仅适用于 torch.distributed.TCPStore。它与 torch.distributed.FileStore 不相关,因为 torch.distributed.FileStore 不接受超时作为参数。

is_host

一个布尔值,指示此后端实例是否将托管 C10d 存储。如果未指定,它将通过将此机器的主机名或 IP 地址与指定的 Rendezvous 端点进行匹配来进行启发式推断。默认为 None

注意,此配置选项仅适用于 torch.distributed.TCPStore。在正常情况下,您可以安全地跳过它;它仅在无法正确确定其值时(例如,Rendezvous 端点使用 CNAME 作为主机名或与机器的 FQDN 不匹配)才需要。

返回类型

Tuple[C10dRendezvousBackend, Store]

class torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.C10dRendezvousBackend(store, run_id)[source]

表示一个 C10d 支持的 Rendezvous 后端。

参数
get_state()[source]

参见基类。

返回类型

Optional[Tuple[bytes, Any]]

property name: str

参见基类。

set_state(state, token=None)[source]

参见基类。

返回类型

Optional[Tuple[bytes, Any, bool]]

Etcd 后端

torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.create_backend(params)[source]

从指定的参数创建新的 EtcdRendezvousBackend

参数

描述

read_timeout

etcd 操作的读取超时时间(以秒为单位)。默认值为 60 秒。

协议

与 etcd 通信时使用的协议。有效值为“http”和“https”。默认值为“http”。

ssl_cert

与 HTTPS 一起使用的 SSL 客户端证书的路径。默认值为 None

ssl_cert_key

与 HTTPS 一起使用的 SSL 客户端证书的私钥的路径。默认值为 None

ca_cert

root SSL 权威证书的路径。默认值为 None

返回类型

Tuple[EtcdRendezvousBackend, Store]

class torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.EtcdRendezvousBackend(client, run_id, key_prefix=None, ttl=None)[source]

表示基于 etcd 的 rendezvous 后端。

参数
  • client (Client) – 用于与 etcd 通信的 etcd.Client 实例。

  • run_id (str) – rendezvous 的运行 ID。

  • key_prefix (Optional[str]) – 在 etcd 中存储 rendezvous 状态的路径。

  • ttl (Optional[int]) – rendezvous 状态的 TTL。如果未指定,则默认为 2 小时。

get_state()[source]

参见基类。

返回类型

Optional[Tuple[bytes, Any]]

property name: str

参见基类。

set_state(state, token=None)[source]

参见基类。

返回类型

Optional[Tuple[bytes, Any, bool]]

Etcd Rendezvous(遗留)

警告

DynamicRendezvousHandler 类取代了 EtcdRendezvousHandler 类,并且建议大多数用户使用。 EtcdRendezvousHandler 处于维护模式,将在未来弃用。

class torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvousHandler(rdzv_impl, local_addr)[source]

通过 torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous 实现 torch.distributed.elastic.rendezvous.RendezvousHandler 接口。 EtcdRendezvousHandler 使用 URL 来配置要使用的 rendezvous 类型,并将特定于实现的配置传递给 rendezvous 模块。基本的 etcd rendezvous 配置 URL 看起来像下面这样

etcd://<etcd_address>:<port>/<job_id>?min_workers=<min_workers>&max_workers=<max_workers>  # noqa: W605

-- example --

etcd://localhost:2379/1234?min_workers=1&max_workers=3

上面的 URL 解释如下

  1. 使用注册到 etcd 方案的 rendezvous 处理程序

  2. 要使用的 etcd 端点为 localhost:2379

  3. job_id == 1234 用作 etcd 中的前缀(这允许人们为多个作业共享一个公共 etcd 服务器,只要 job_ids 保证是唯一的)。请注意,只要作业 ID 是唯一的,它可以是任何字符串(例如,不需要是数字)。

  4. min_workers=1max_workers=3 指定了成员资格大小的范围 - Torch Distributed Elastic 只要集群大小大于或等于 min_workers 就会开始运行作业,并将最多 max_workers 加入到集群中。

以下是可传递给 etcd rendezvous 的参数的完整列表

参数

描述

min_workers

rendezvous 有效的最小工作程序数量

max_workers

要允许加入的最大工作程序数量

timeout

预期 next_rendezvous 成功的时间总超时时间(默认 600 秒)

last_call_timeout

达到最小工作程序数量后,额外的等待量(“最后一次调用”)(默认为 30 秒)

etcd_prefix

路径前缀(从 etcd 根目录开始),所有 etcd 节点将在该路径前缀内创建(默认为 /torchelastic/p2p

Etcd 存储

EtcdStore 是当 etcd 用作 rendezvous 后端时,由 next_rendezvous() 返回的 C10d Store 实例类型。

class torch.distributed.elastic.rendezvous.etcd_store.EtcdStore(etcd_client, etcd_store_prefix, timeout=None)[source]

通过搭载 rendezvous etcd 实例来实现 c10 Store 接口。

这是由 EtcdRendezvous 返回的存储对象。

add(key, num)[source]

以原子方式将值增加一个整数。

整数以基数 10 的字符串形式表示。如果 key 不存在,则将假定默认值为 0

返回

新的(增量的)值

返回类型

int

check(keys)[source]

检查所有 key 是否立即存在(不等待)。

返回类型

bool

get(key)[source]

通过 key 获取值,可能进行阻塞等待。

如果 key 不立即存在,将进行阻塞等待,最多持续 timeout 时间,或者直到 key 发布为止。

返回

(bytes)

引发

LookupError - 如果超时后 key 仍未发布

返回类型

字节

set(key, value)[source]

将键值对写入 EtcdStore

key 和 value 都可以是 Python strbytes

wait(keys, override_timeout=None)[source]

等待所有键发布,或超时。

引发

LookupError - 如果超时发生

Etcd 服务器

EtcdServer 类是一个便捷类,它使您能够轻松地在子进程上启动和停止 etcd 服务器。这对于测试或单节点(多工作进程)部署非常有用,在这些部署中,手动在旁边设置 etcd 服务器会很麻烦。

警告

对于生产环境和多节点部署,请考虑正确部署一个高可用性 etcd 服务器,因为它是您分布式作业的单点故障。

class torch.distributed.elastic.rendezvous.etcd_server.EtcdServer(data_dir=None)[source]

注意

在 etcd 服务器 v3.4.3 上测试。

在随机空闲端口上启动和停止本地独立 etcd 服务器。对于单节点、多工作进程启动或测试很有用,在这些情况下,与单独设置 etcd 服务器相比,侧边车 etcd 服务器更方便。

此类注册一个终止处理程序,以便在退出时关闭 etcd 子进程。此终止处理程序不是调用 stop() 方法的替代品。

以下回退机制用于查找 etcd 二进制文件

  1. 使用环境变量 TORCHELASTIC_ETCD_BINARY_PATH

  2. 使用 <this file root>/bin/etcd(如果存在)

  3. 使用 etcd 来自 PATH

用法

server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd")
server.start()
client = server.get_client()
# use client
server.stop()
参数

etcd_binary_path – etcd 服务器二进制文件的路径(有关回退路径,请参阅上面)

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源