弹性代理¶
服务器¶
弹性代理是 torchelastic 的控制平面。
它是一个启动和管理底层工作进程的进程。代理负责
与分布式 torch 协作:工作进程在启动时会获得所有必要的信息,以便成功且轻松地调用
torch.distributed.init_process_group()
。容错:监控工作进程,并在检测到工作进程故障或不健康时,拆除所有工作进程并重新启动所有进程。
弹性:对成员资格更改做出反应,并使用新成员重新启动工作进程。
最简单的代理部署在每个节点上,并与本地进程协作。更高级的代理可以远程启动和管理工作进程。代理可以完全去中心化,根据其管理的工作进程做出决策。或者可以协调,与管理同一作业中工作进程的其他代理进行通信,以做出集体决策。
以下是管理本地工作进程组的代理的示意图。

概念¶
本节介绍与理解 agent
在 torchelastic 中的角色相关的顶级类和概念。
- class torch.distributed.elastic.agent.server.ElasticAgent[source]¶
负责管理一个或多个工作进程的代理进程。
假设工作进程是常规的分布式 PyTorch 脚本。当代理创建工作进程时,代理会提供工作进程正确初始化 torch 进程组所需的信息。
代理到工作进程的具体部署拓扑和比例取决于代理的具体实现和用户的作业放置偏好。例如,要使用 8 个训练器(每个 GPU 一个)在 GPU 上运行分布式训练作业,可以
使用 8 个单 GPU 实例,每个实例放置一个代理,每个代理管理 1 个工作进程。
使用 4 个双 GPU 实例,每个实例放置一个代理,每个代理管理 2 个工作进程。
使用 2 个四 GPU 实例,每个实例放置一个代理,每个代理管理 4 个工作进程。
使用 1 个 8 GPU 实例,每个实例放置一个代理,每个代理管理 8 个工作进程。
用法
group_result = agent.run() if group_result.is_failed(): # workers failed failure = group_result.failures[0] log.exception("worker 0 failed with exit code : %s", failure.exit_code) else: return group_result.return_values[0] # return rank 0's results
- class torch.distributed.elastic.agent.server.WorkerSpec(role, local_world_size, rdzv_handler, fn=None, entrypoint=None, args=(), max_restarts=3, monitor_interval=30.0, master_port=None, master_addr=None, local_addr=None)[source]¶
关于特定类型工作者的蓝图信息。
对于给定的角色,只能存在一个工作者规范。工作者规范预计在所有节点(机器)上是同质的,也就是说每个节点为特定规范运行相同数量的工作者。
- 参数
role (str) – 用户为使用此规范的工作者定义的角色
local_world_size (int) – 要运行的本地工作者数量
args (元组) – 传递给
entrypoint
的参数rdzv_handler (RendezvousHandler) – 处理此组工作进程的 rdzv
max_restarts (整数) – 工作进程最大重试次数
monitor_interval (浮点数) – 每
n
秒监控一次工作进程状态master_port (可选[整数]) – 在 rank 0 上运行 c10d 存储的固定端口,如果未指定,则会选择一个随机的空闲端口
master_addr (可选[字符串]) – 在 rank 0 上运行 c10d 存储的固定 master_addr,如果未指定,则会选择 agent rank 0 上的主机名
redirects – 将标准流重定向到文件,通过传递映射选择性地重定向特定本地排名。
tee – 将指定的标准流(s) tee 到控制台 + 文件,通过传递映射选择性地 tee 特定本地排名,优先于
redirects
设置。
- class torch.distributed.elastic.agent.server.WorkerState(value)[source]¶
一个
WorkerGroup
的状态。工作组中的工作进程作为一个单元更改状态。如果工作组中的单个工作进程失败,则整个集合被视为失败
UNKNOWN - agent lost track of worker group state, unrecoverable INIT - worker group object created not yet started HEALTHY - workers running and healthy UNHEALTHY - workers running and unhealthy STOPPED - workers stopped (interrupted) by the agent SUCCEEDED - workers finished running (exit 0) FAILED - workers failed to successfully finish (exit !0)
工作组从初始
INIT
状态开始,然后进展到HEALTHY
或UNHEALTHY
状态,最后达到终端SUCCEEDED
或FAILED
状态。工作组可能会被代理中断并暂时置于
STOPPED
状态。处于STOPPED
状态的 worker 将在不久的将来由代理重新启动。以下是一些将 worker 置于STOPPED
状态的示例:工作组故障 | 观察到不健康
检测到成员资格变更
当对工作组的操作(启动、停止、rdzv、重试等)失败并导致操作仅部分应用于工作组时,状态将变为
UNKNOWN
。通常,这种情况发生在代理状态更改事件期间未捕获/未处理的异常。代理预计不会恢复处于UNKNOWN
状态的工作组,最好是自行终止并允许作业管理器重试节点。
- class torch.distributed.elastic.agent.server.Worker(local_rank, global_rank=-1, role_rank=-1, world_size=-1, role_world_size=-1)[source]¶
一个 worker 实例。
将此与
WorkerSpec
进行对比,后者代表 worker 的规格。一个Worker
是从一个WorkerSpec
创建的。一个Worker
对于一个WorkerSpec
就像一个对象对于一个类。工作进程的
id
由ElasticAgent
的具体实现来解释。对于本地代理,它可能是工作进程的pid (int)
,对于远程代理,它可以编码为host:port (string)
。
实现¶
以下是 torchelastic 提供的代理实现。
- class torch.distributed.elastic.agent.server.local_elastic_agent.LocalElasticAgent(spec, logs_specs, start_method='spawn', exit_barrier_timeout=300, log_line_prefix_template=None)[source]¶
一个
torchelastic.agent.server.ElasticAgent
的实现,用于处理主机本地工作者。此代理在每个主机上部署,并配置为生成
n
个工作者。当使用 GPU 时,n
映射到主机上可用的 GPU 数量。本地代理不会与部署在其他主机上的其他本地代理通信,即使工作者可能在主机之间进行通信。工作者 ID 被解释为本地进程。代理将所有工作者进程作为一个单元启动和停止。
传递给工作者函数的工作者函数和参数必须与 Python 多进程兼容。要将多进程数据结构传递给工作者,您可以在与指定
start_method
相同的多进程上下文中创建数据结构,并将其作为函数参数传递。exit_barrier_timeout
指定其他代理完成等待的时间(以秒为单位)。这充当安全网,用于处理工作者在不同时间完成的情况,以防止代理将提前完成的工作者视为缩减事件。强烈建议用户代码处理确保工作者以同步方式终止,而不是依赖于 exit_barrier_timeout。如果在
`LocalElasticAgent`
中定义了值为 1 的环境变量TORCHELASTIC_ENABLE_FILE_TIMER
,则可以在`LocalElasticAgent`
中启用基于命名管道的看门狗。可选地,可以使用另一个环境变量`TORCHELASTIC_TIMER_FILE`
设置命名管道的唯一文件名。如果未设置环境变量`TORCHELASTIC_TIMER_FILE`
,`LocalElasticAgent`
将在内部创建一个唯一的文件名并将其设置为环境变量`TORCHELASTIC_TIMER_FILE`
,并且此环境变量将传播到工作者进程,以允许它们连接到`LocalElasticAgent`
使用的相同命名管道。日志将写入指定的日志目录。默认情况下,每行日志将以
[${role_name}${local_rank}]:
为前缀(例如[trainer0]: foobar
)。可以通过将 模板字符串 作为log_line_prefix_template
参数传递来自定义日志前缀。以下宏(标识符)将在运行时被替换:${role_name}, ${local_rank}, ${rank}
。例如,要将每行日志以全局排名而不是本地排名为前缀,请设置log_line_prefix_template = "[${rank}]:
。示例启动函数
def trainer(args) -> str: return "do train" def main(): start_method="spawn" shared_queue= multiprocessing.get_context(start_method).Queue() spec = WorkerSpec( role="trainer", local_world_size=nproc_per_process, entrypoint=trainer, args=("foobar",), ...<OTHER_PARAMS...>) agent = LocalElasticAgent(spec, start_method) results = agent.run() if results.is_failed(): print("trainer failed") else: print(f"rank 0 return value: {results.return_values[0]}") # prints -> rank 0 return value: do train
示例启动二进制文件
def main(): spec = WorkerSpec( role="trainer", local_world_size=nproc_per_process, entrypoint="/usr/local/bin/trainer", args=("--trainer-args", "foobar"), ...<OTHER_PARAMS...>) agent = LocalElasticAgent(spec) results = agent.run() if not results.is_failed(): print("binary launches do not have return values")
扩展代理¶
要扩展代理,您可以直接实现 `ElasticAgent`
,但我们建议您扩展 SimpleElasticAgent
,它提供了大部分脚手架,只留下一些特定的抽象方法供您实现。
- class torch.distributed.elastic.agent.server.SimpleElasticAgent(spec, exit_barrier_timeout=300)[source]¶
一个
ElasticAgent
,它管理一种特定类型的 worker 角色。一个
ElasticAgent
,它为单个WorkerSpec
(例如一种特定类型的 worker 角色)管理 worker (WorkerGroup
)。- _assign_worker_ranks(store, group_rank, group_world_size, spec)[source]¶
确定 worker 进程的适当排名。
排名分配是根据以下算法完成的
每个代理将它的配置(group_rank、group_world_size、num_workers)写入公共存储。
每个代理检索所有代理的配置,并使用角色和排名执行两级排序。
确定全局排名:当前代理的 worker 的全局排名是代理 group_rank 之前的 infos 数组的偏移量。偏移量计算为所有排名小于 group_rank 的代理的 local_world_size 的总和。worker 将拥有以下排名:[offset, offset+local_world_size)
确定角色排名:角色排名是使用第 3 点中的算法确定的,不同之处在于偏移量是从具有与当前代理相同角色且具有最小 group_rank 的第一个代理开始的。
- _exit_barrier()[source]¶
定义一个屏障,使代理进程保持活动状态,直到所有工作进程完成。
等待
exit_barrier_timeout
秒,直到所有代理完成执行其本地工作进程(成功或失败)。这充当对在不同时间终止的用户脚本的安全保护。
- _initialize_workers(worker_group)[source]¶
为工作进程组启动一组新的工作进程。
本质上,这是一个集合点,然后执行
start_workers
。调用者应首先调用_stop_workers()
停止正在运行的工作进程,然后再调用此方法。乐观地将刚启动的工作进程组的状态设置为
HEALTHY
,并将实际状态监控委托给_monitor_workers()
方法。
- abstract _shutdown(death_sig=Signals.SIGTERM)[source]¶
清理代理工作期间分配的任何资源。
- 参数
death_sig (Signals) – 发送给子进程的信号,默认值为 SIGTERM
- class torch.distributed.elastic.agent.server.api.RunResult(state, return_values=<factory>, failures=<factory>)[source]¶
返回工作进程执行的结果。
运行结果遵循“全有或全无”策略,即只有当此代理管理的所有本地工作进程都成功完成时,运行才算成功。
如果结果成功(例如
is_failed() = False
),则return_values
字段包含此代理管理的工作进程的输出(返回值),并按其全局排名映射。也就是说result.return_values[0]
是全局排名为 0 的返回值。注意
return_values
仅在工作进程入口点为函数时才有意义。指定为二进制入口点的 worker 通常没有返回值,return_values
字段没有意义,可以为空。如果
is_failed()
返回True
,则failures
字段包含失败信息,同样,由失败 worker 的 GLOBAL 秩映射。return_values
和failures
中的键是互斥的,也就是说,worker 的最终状态只能是以下之一:成功、失败。根据代理的重启策略,由代理有意终止的 worker 不会在return_values
或failures
中表示。
代理中的看门狗¶
如果在 `LocalElasticAgent`
中定义了值为 1 的环境变量 TORCHELASTIC_ENABLE_FILE_TIMER
,则可以在 `LocalElasticAgent`
中启用基于命名管道的看门狗。可选地,可以使用另一个环境变量 `TORCHELASTIC_TIMER_FILE`
设置命名管道的唯一文件名。如果未设置环境变量 `TORCHELASTIC_TIMER_FILE`
,`LocalElasticAgent`
将在内部创建一个唯一的文件名并将其设置为环境变量 `TORCHELASTIC_TIMER_FILE`
,并且此环境变量将传播到工作者进程,以允许它们连接到 `LocalElasticAgent`
使用的相同命名管道。