弹性代理¶
服务器¶
弹性代理是 torchelastic 的控制平面。
它是一个进程,用于启动和管理底层工作进程。代理负责
使用分布式 torch:工作进程在启动时会获得所有必要的信息,以便成功且轻松地调用
torch.distributed.init_process_group()
。容错:监控工作进程,并在检测到工作进程故障或不健康时,拆除所有工作进程并重新启动所有进程。
弹性:对成员资格更改做出反应,并使用新成员重新启动工作进程。
最简单的代理按节点部署,并与本地进程一起工作。更高级的代理可以远程启动和管理工作进程。代理可以完全分散,根据其管理的工作进程做出决策。或者可以协调,与管理同一作业中工作进程的其他代理通信以做出集体决策。
下面是管理本地工作进程组的代理的图表。

概念¶
本节介绍与理解 agent
在 torchelastic 中的作用相关的核心类和概念。
- class torch.distributed.elastic.agent.server.ElasticAgent[source]¶
负责管理一个或多个工作进程的代理进程。
假定工作进程是常规的分布式 PyTorch 脚本。当工作进程由代理创建时,代理会提供必要的信息,以便工作进程正确初始化 torch 进程组。
代理到工作进程的确切部署拓扑和比例取决于代理的特定实现以及用户的作业放置偏好。例如,要在 GPU 上运行具有 8 个训练器(每个 GPU 一个)的分布式训练作业,可以使用
8 x 单 GPU 实例,每个实例放置一个代理,每个代理管理 1 个工作进程。
4 x 双 GPU 实例,每个实例放置一个代理,每个代理管理 2 个工作进程。
2 x 四 GPU 实例,每个实例放置一个代理,每个代理管理 4 个工作进程。
1 x 8 GPU 实例,每个实例放置一个代理,每个代理管理 8 个工作进程。
用法
group_result = agent.run() if group_result.is_failed(): # workers failed failure = group_result.failures[0] logger.exception("worker 0 failed with exit code : %s", failure.exit_code) else: return group_result.return_values[0] # return rank 0's results
- class torch.distributed.elastic.agent.server.WorkerSpec(role, local_world_size, rdzv_handler, fn=None, entrypoint=None, args=(), max_restarts=3, monitor_interval=0.1, master_port=None, master_addr=None, local_addr=None)[source]¶
特定类型的 worker 的蓝图信息。
对于给定的角色,只能存在一个 worker spec。Worker spec 应该在所有节点(机器)上都是同质的,即每个节点运行相同数量的 worker 来执行特定的 spec。
- 参数
role (str) – 用户为使用此 spec 的 worker 定义的角色
local_world_size (int) – 要运行的本地 worker 数量
args (Tuple) – 传递给
entrypoint
的参数rdzv_handler (RendezvousHandler) – 处理此组 worker 的 rdzv
max_restarts (int) – worker 最大重试次数
monitor_interval (float) – 每
n
秒监控一次 worker 状态master_port (Optional[int]) – 如果未指定,则在 rank 0 上运行 c10d 存储的固定端口,否则将选择一个随机的空闲端口
master_addr (Optional[str]) – 如果未指定,则在 rank 0 上运行 c10d 存储的固定 master_addr,否则将选择 agent rank 0 上的主机名
redirects – 将 std 流重定向到文件,通过传递映射选择性地重定向特定本地等级。
tee – 将指定的 std 流(s)写入控制台 + 文件,通过传递映射选择性地为特定本地等级进行 tee,优先于
redirects
设置。
- class torch.distributed.elastic.agent.server.WorkerState(value)[source]¶
WorkerGroup
的状态。一个 worker group 中的 worker 作为一个单元更改状态。如果一个 worker group 中的一个 worker 失败,则整个集合被视为失败。
UNKNOWN - agent lost track of worker group state, unrecoverable INIT - worker group object created not yet started HEALTHY - workers running and healthy UNHEALTHY - workers running and unhealthy STOPPED - workers stopped (interrupted) by the agent SUCCEEDED - workers finished running (exit 0) FAILED - workers failed to successfully finish (exit !0)
一个 worker group 从初始的
INIT
状态开始,然后进展到HEALTHY
或UNHEALTHY
状态,最后到达一个最终的SUCCEEDED
或FAILED
状态。worker group 可以被中断并暂时由 agent 放入
STOPPED
状态。处于STOPPED
状态的 worker 将由 agent 在不久的将来安排重新启动。worker 被置于STOPPED
状态的一些例子是Worker group 失败 | 观察到不健康
检测到成员资格更改
当 worker group 上的操作(启动、停止、rdzv、重试等)失败,并导致操作仅部分应用于 worker group 时,状态将为
UNKNOWN
。通常,这发生在 agent 状态更改事件期间未捕获/未处理的异常。agent 预计不会恢复处于UNKNOWN
状态的 worker group,最好是自终止并让作业管理器重试节点。
- class torch.distributed.elastic.agent.server.Worker(local_rank, global_rank=-1, role_rank=-1, world_size=-1, role_world_size=-1)[source]¶
一个 worker 实例。
将此与表示 worker 规格的
WorkerSpec
进行对比。一个Worker
是从一个WorkerSpec
创建的。一个Worker
对于一个WorkerSpec
就像一个对象对于一个类一样。worker 的
id
由ElasticAgent
的特定实现进行解释。对于本地代理,它可以是 worker 的pid (int)
,对于远程代理,它可以被编码为host:port (string)
。
实现¶
以下是 torchelastic 提供的 agent 实现。
- class torch.distributed.elastic.agent.server.local_elastic_agent.LocalElasticAgent(spec, logs_specs, start_method='spawn', exit_barrier_timeout=300, log_line_prefix_template=None)[source]¶
torchelastic.agent.server.ElasticAgent
的一个实现,用于处理主机本地工作者。此代理在每个主机上部署,并配置为生成
n
个工作者。 使用 GPU 时,n
映射到主机上可用的 GPU 数量。本地代理不会与部署在其他主机上的其他本地代理通信,即使工作者可能进行主机间通信。 工作者 ID 被解释为本地进程。 代理作为一个单元启动和停止所有工作者进程。
传递给工作者函数的工作者函数和参数必须与 Python 多处理兼容。 要将多处理数据结构传递给工作者,您可以在与指定的
start_method
相同的多处理上下文中创建数据结构,并将其作为函数参数传递。exit_barrier_timeout
指定等待其他代理完成的时间量(以秒为单位)。 这充当安全网,用于处理工作者在不同时间完成的情况,以防止代理将过早完成的工作者视为缩减事件。 强烈建议用户代码处理确保以同步方式终止工作者,而不是依赖于 exit_barrier_timeout。如果环境变量
TORCHELASTIC_ENABLE_FILE_TIMER
在`LocalElasticAgent`
进程中定义并值为 1,则可以在`LocalElasticAgent`
中启用基于命名管道的看门狗。 可选地,可以使用另一个环境变量`TORCHELASTIC_TIMER_FILE`
设置命名管道的唯一文件名。 如果未设置环境变量`TORCHELASTIC_TIMER_FILE`
,`LocalElasticAgent`
将在内部创建一个唯一的文件名并将其设置为环境变量`TORCHELASTIC_TIMER_FILE`
,并且此环境变量将传播到工作者进程,以允许它们连接到`LocalElasticAgent`
使用的相同命名管道。日志将写入指定的日志目录。 默认情况下,每行日志的前缀为
[${role_name}${local_rank}]:
(例如[trainer0]: foobar
)。 可以通过将 模板字符串 作为log_line_prefix_template
参数传递来自定义日志前缀。 以下宏(标识符)在运行时替换:${role_name}, ${local_rank}, ${rank}
。 例如,要使用全局排名而不是本地排名来为每行日志添加前缀,请设置log_line_prefix_template = "[${rank}]:
。示例启动函数
def trainer(args) -> str: return "do train" def main(): start_method="spawn" shared_queue= multiprocessing.get_context(start_method).Queue() spec = WorkerSpec( role="trainer", local_world_size=nproc_per_process, entrypoint=trainer, args=("foobar",), ...<OTHER_PARAMS...>) agent = LocalElasticAgent(spec, start_method) results = agent.run() if results.is_failed(): print("trainer failed") else: print(f"rank 0 return value: {results.return_values[0]}") # prints -> rank 0 return value: do train
示例启动二进制文件
def main(): spec = WorkerSpec( role="trainer", local_world_size=nproc_per_process, entrypoint="/usr/local/bin/trainer", args=("--trainer-args", "foobar"), ...<OTHER_PARAMS...>) agent = LocalElasticAgent(spec) results = agent.run() if not results.is_failed(): print("binary launches do not have return values")
扩展代理¶
要扩展代理,您可以直接实现 `ElasticAgent`
,但是我们建议您扩展 SimpleElasticAgent
,它提供大多数脚手架,并为您留下一些需要实现的特定抽象方法。
- class torch.distributed.elastic.agent.server.SimpleElasticAgent(spec, exit_barrier_timeout=300)[source]¶
一个
ElasticAgent
,用于管理一种特定类型的工人角色。一个
ElasticAgent
,用于管理单个WorkerSpec
的工作者 (WorkerGroup
),例如一种特定类型的工人角色。- _assign_worker_ranks(store, group_rank, group_world_size, spec)[source]¶
确定工人进程的适当排名。
排名分配是根据以下算法进行的
每个代理将其配置(group_rank、group_world_size、num_workers)写入公共存储。
排名为 0 的代理从存储中读取所有 role_info,并确定每个代理的工作者排名。
确定全局排名:工作者的全局排名通过前面所有工作者的 local_world_size 的累积和计算得出。 出于效率原因,每个工作者都被分配了一个基本全局排名,因此其工作者位于 [base_global_rank, base_global_rank + local_world_size) 范围内。
确定角色排名:角色排名是使用点 3 中的算法确定的,区别在于排名是相对于角色名称计算的。
排名为 0 的代理将分配的排名写入存储。
每个代理从存储中读取分配的排名。
时间复杂度:每个工作者 O(1),排名 0 O(n),总体 O(n)
- _exit_barrier()[source]¶
定义一个屏障,使代理进程保持活动状态,直到所有工作者完成。
等待
exit_barrier_timeout
秒,直到所有代理完成执行其本地工作者(成功或不成功)。 这充当对用户脚本的保护措施,这些脚本在不同的时间终止。
- _initialize_workers(worker_group)[source]¶
为 worker_group 启动一组新的工作者。
本质上,这是一个集合,然后是一个
start_workers
。 调用者应首先调用_stop_workers()
来停止正在运行的工作者,然后再调用此方法。乐观地将刚启动的工作者组的状态设置为
HEALTHY
,并将实际状态监控委托给_monitor_workers()
方法
- abstract _shutdown(death_sig=Signals.SIGTERM, is_restart=False)[source]¶
清理代理工作期间分配的任何资源。
- 参数
death_sig (Signals) – 要发送到子进程的信号,SIGTERM 为默认值
- class torch.distributed.elastic.agent.server.api.RunResult(state, return_values=<factory>, failures=<factory>)[source]¶
返回工作进程执行的结果。
运行结果遵循“全有或全无”的策略,即当且仅当此代理管理的所有本地工作进程都成功完成时,运行才算成功。
如果结果成功(例如
is_failed() = False
),则return_values
字段包含由 THIS 代理管理的工作进程的输出(返回值),按其全局排名映射。也就是说result.return_values[0]
是全局排名为 0 的工作进程的返回值。注意
return_values
仅对工作进程入口点是函数时有意义。指定为二进制入口点的工作进程没有规范的返回值,并且return_values
字段没有意义,可能为空。如果
is_failed()
返回True
,则failures
字段包含失败信息,同样按失败工作进程的全局排名映射。return_values
和failures
中的键是互斥的,也就是说,工作进程的最终状态只能是:成功或失败。根据代理的重启策略,代理故意终止的工作进程,既不在return_values
中,也不在failures
中。
代理中的看门狗¶
如果环境变量 TORCHELASTIC_ENABLE_FILE_TIMER
在 `LocalElasticAgent`
进程中定义并值为 1,则可以在 `LocalElasticAgent`
中启用基于命名管道的看门狗。 可选地,可以使用另一个环境变量 `TORCHELASTIC_TIMER_FILE`
设置命名管道的唯一文件名。 如果未设置环境变量 `TORCHELASTIC_TIMER_FILE`
,`LocalElasticAgent`
将在内部创建一个唯一的文件名并将其设置为环境变量 `TORCHELASTIC_TIMER_FILE`
,并且此环境变量将传播到工作者进程,以允许它们连接到 `LocalElasticAgent`
使用的相同命名管道。
健康检查服务器¶
如果在 `LocalElasticAgent`
进程中定义了环境变量 TORCHELASTIC_HEALTH_CHECK_PORT
,则可以在 `LocalElasticAgent`
中启用健康检查监控服务器。添加健康检查服务器的接口,可以扩展它,在指定的端口号上启动 tcp/http 服务器。此外,健康检查服务器将具有回调功能,用于检查看门狗是否存活。
- class torch.distributed.elastic.agent.server.health_check_server.HealthCheckServer(alive_callback, port, timeout)[source]¶
健康检查监控服务器的接口,可以通过在指定端口上启动 tcp/http 服务器来扩展。
- 参数