快捷方式

弹性 Agent

服务器

弹性 Agent 是 torchelastic 的控制平面。

它是一个启动和管理底层工作进程的进程。Agent 负责以下职责:

  1. 与分布式 torch 协作:工作进程启动时会获得所有必要信息,以便成功且简单地调用 torch.distributed.init_process_group()

  2. 容错:监视工作进程,并在检测到工作进程故障或不健康时,关闭所有工作进程并重启所有进程。

  3. 弹性:对成员变更做出反应,并用新成员重启工作进程。

最简单的 Agent 是按节点部署的,并与本地进程一起工作。更高级的 Agent 可以远程启动和管理工作进程。Agent 可以完全去中心化,根据其管理的工作进程做出决策;也可以是协调一致的,与其他管理同一作业中工作进程的 Agent 通信,以做出集体决策。

下面是一个管理本地工作进程组的 Agent 示意图。

../_images/agent_diagram.jpg

概念

本节描述了与理解 agent 在 torchelastic 中的作用相关的高级类和概念。

class torch.distributed.elastic.agent.server.ElasticAgent[source][source]

负责管理一个或多个工作进程的 Agent 进程。

这些工作进程被假定为常规的分布式 PyTorch 脚本。当 Agent 创建工作进程时,Agent 会提供必要的信息,使工作进程能够正确初始化 torch 进程组。

Agent 的具体实现以及用户的作业 placement 偏好决定了 Agent 与工作进程的确切部署拓扑和比例。例如,要在 GPU 上运行一个包含 8 个训练器(每个 GPU 一个)的分布式训练作业,可以采用以下方式:

  1. 使用 8 个单 GPU 实例,每个实例放置一个 Agent,每个 Agent 管理 1 个工作进程。

  2. 使用 4 个双 GPU 实例,每个实例放置一个 Agent,每个 Agent 管理 2 个工作进程。

  3. 使用 2 个四 GPU 实例,每个实例放置一个 Agent,每个 Agent 管理 4 个工作进程。

  4. 使用 1 个八 GPU 实例,每个实例放置一个 Agent,每个 Agent 管理 8 个工作进程。

用法

group_result = agent.run()
 if group_result.is_failed():
   # workers failed
   failure = group_result.failures[0]
   logger.exception("worker 0 failed with exit code : %s", failure.exit_code)
 else:
   return group_result.return_values[0] # return rank 0's results
abstract get_worker_group(role='default')[source][source]

返回给定 role 对应的 WorkerGroup

请注意,工作组是一个可变对象,因此在多线程/多进程环境中,其状态可能会改变。鼓励(但不强制要求)实现者返回一个防御性的只读副本。

返回类型

WorkerGroup

abstract run(role='default')[source][source]

运行 Agent。

支持在故障时重试工作组,最多 max_restarts 次。

返回值

执行结果,包含按工作进程全局 rank 映射的每个工作进程的返回值或故障详细信息。

抛出

Exception - 任何与工作进程无关的其他故障

返回类型

RunResult

class torch.distributed.elastic.agent.server.WorkerSpec(role, local_world_size, rdzv_handler, fn=None, entrypoint=None, args=(), max_restarts=3, monitor_interval=0.1, master_port=None, master_addr=None, local_addr=None)[source][source]

关于特定类型工作进程的蓝图信息。

对于给定的 role,只能存在一个 worker spec。Worker spec 在所有节点(机器)上应是同质的,即每个节点针对特定的 spec 运行相同数量的工作进程。

参数
  • role (str) – 具有此 spec 的工作进程的用户定义 role

  • local_world_size (int) – 要运行的本地工作进程数量

  • fn (Optional[Callable]) – (已弃用,请改用 entrypoint)

  • entrypoint (Optional[Union[Callable, str]]) – 工作进程函数或命令

  • args (tuple) – 传递给 entrypoint 的参数

  • rdzv_handler (RendezvousHandler) – 处理这组工作进程的 rdzv

  • max_restarts (int) – 工作进程的最大重试次数

  • monitor_interval (float) – 每隔 n 秒监控工作进程状态

  • master_port (Optional[int]) – 在 rank 0 上运行 c10d store 的固定端口,如果未指定,则选择一个随机的空闲端口

  • master_addr (Optional[str]) – 在 rank 0 上运行 c10d store 的固定 master_addr,如果未指定,则选择 Agent rank 0 上的主机名

  • redirects – 将标准流重定向到文件,通过传递一个 map 有选择地重定向特定 local rank 的流

  • tee – 将指定的标准流(s) 同时输出到控制台和文件,通过传递一个 map 有选择地对特定 local rank 进行 tee 操作,其优先级高于 redirects 设置。

get_entrypoint_name()[source][source]

获取入口点名称。

如果入口点是函数(例如 Callable),则返回其 __qualname__;如果入口点是二进制文件(例如 str),则返回二进制文件名称。

class torch.distributed.elastic.agent.server.WorkerState(value)[source][source]

WorkerGroup 的状态。

工作组中的工作进程作为一个单元改变状态。如果工作组中的一个工作进程失败,则整个集合被视为失败。

UNKNOWN - agent lost track of worker group state, unrecoverable
INIT - worker group object created not yet started
HEALTHY - workers running and healthy
UNHEALTHY - workers running and unhealthy
STOPPED - workers stopped (interrupted) by the agent
SUCCEEDED - workers finished running (exit 0)
FAILED - workers failed to successfully finish (exit !0)

工作组从初始的 INIT 状态开始,然后进展到 HEALTHY(健康)或 UNHEALTHY(不健康)状态,最终达到终止的 SUCCEEDED(成功)或 FAILED(失败)状态。

Agent 可以中断工作组并暂时将其置于 STOPPED(停止)状态。处于 STOPPED 状态的工作进程计划在不久的将来由 Agent 重启。将工作进程置于 STOPPED 状态的一些示例包括:

  1. 观察到工作组失败 | 不健康

  2. 检测到成员变更

当对工作组执行操作(启动、停止、rdzv、重试等)失败,并且该操作部分应用于工作组时,状态将变为 UNKNOWN(未知)。这通常发生在 Agent 上状态变更事件期间出现未捕获/未处理的异常时。Agent 不期望恢复处于 UNKNOWN 状态的工作组,最好自行终止并允许作业管理器重试节点。

static is_running(state)[source][source]

返回 Worker 的状态。

返回值

如果工作进程状态表示工作进程仍在运行(例如,进程存在但不一定健康),则为 True。

返回类型

bool

class torch.distributed.elastic.agent.server.Worker(local_rank, global_rank=-1, role_rank=-1, world_size=-1, role_world_size=-1)[source][source]

一个工作进程实例。

将其与表示工作进程规范的 WorkerSpec 进行对比。一个 Worker 是从一个 WorkerSpec 创建的。Worker 之于 WorkerSpec 就像对象之于类。

工作进程的 idElasticAgent 的具体实现来解释。对于本地 Agent,它可以是工作进程的 pid (int);对于远程 Agent,它可以编码为 host:port (string)

参数
  • id (Any) – 唯一标识一个工作进程(由 Agent 解释)

  • local_rank (int) – 工作进程的本地 rank

  • global_rank (int) – 工作进程的全局 rank

  • role_rank (int) – 在所有具有相同 role 的工作进程中的 rank

  • world_size (int) – 工作进程数量(全局)

  • role_world_size (int) – 具有相同 role 的工作进程数量

class torch.distributed.elastic.agent.server.WorkerGroup(spec)[source][source]

一组 Worker 实例。

该类定义了由 ElasticAgent 管理的给定 WorkerSpec 对应的一组 Worker 实例。工作组是否包含跨实例工作进程取决于 Agent 的实现。

实现

以下是 torchelastic 提供的 Agent 实现。

class torch.distributed.elastic.agent.server.local_elastic_agent.LocalElasticAgent(spec, logs_specs, start_method='spawn', exit_barrier_timeout=300, log_line_prefix_template=None)[source][source]

这是 torchelastic.agent.server.ElasticAgent 的一个实现,用于处理主机本地的工作进程。

此 Agent 按主机部署,配置为生成 n 个工作进程。使用 GPU 时,n 对应于主机上可用的 GPU 数量。

本地 Agent 不会与部署在其他主机上的其他本地 Agent 通信,即使工作进程之间可能进行跨主机通信。工作进程 id 被解释为本地进程。Agent 将所有工作进程作为一个单元启动和停止。

传递给工作进程函数及其参数必须与 python multiprocessing 兼容。要将多进程数据结构传递给工作进程,可以在与指定的 start_method 相同的多进程上下文中创建该数据结构,并将其作为函数参数传递。

exit_barrier_timeout 指定等待其他 Agent 完成的时间量(以秒为单位)。这作为一个安全网,用于处理工作进程在不同时间完成的情况,以防止 Agent 将提前完成的工作进程视为缩减事件。强烈建议用户代码处理确保工作进程同步终止,而不是依赖于 exit_barrier_timeout。

如果在于 `LocalElasticAgent` 进程中定义了环境变量 TORCHELASTIC_ENABLE_FILE_TIMER 且其值为 1,则可以在 `LocalElasticAgent` 中启用基于命名管道的看门狗。可选地,可以设置另一个环境变量 `TORCHELASTIC_TIMER_FILE`,为其指定命名管道的唯一文件名。如果未设置环境变量 `TORCHELASTIC_TIMER_FILE``LocalElasticAgent` 将在内部创建一个唯一的文件名,并将其设置为环境变量 `TORCHELASTIC_TIMER_FILE`,此环境变量将传播到工作进程,使其能够连接到 `LocalElasticAgent` 使用的同一命名管道。

日志会写入指定的日志目录。默认情况下,每行日志都会以 [${role_name}${local_rank}]: 作为前缀(例如 [trainer0]: foobar)。日志前缀可以通过传递一个 模板字符串 作为 log_line_prefix_template 参数来定制。运行时会替换以下宏(标识符):${role_name}, ${local_rank}, ${rank}。例如,要用全局 rank 代替本地 rank 作为每行日志的前缀,请设置 log_line_prefix_template = "[${rank}]:

启动函数示例

def trainer(args) -> str:
    return "do train"

def main():
    start_method="spawn"
    shared_queue= multiprocessing.get_context(start_method).Queue()
    spec = WorkerSpec(
                role="trainer",
                local_world_size=nproc_per_process,
                entrypoint=trainer,
                args=("foobar",),
                ...<OTHER_PARAMS...>)
    agent = LocalElasticAgent(spec, start_method)
    results = agent.run()

    if results.is_failed():
        print("trainer failed")
    else:
        print(f"rank 0 return value: {results.return_values[0]}")
        # prints -> rank 0 return value: do train

启动二进制文件示例

def main():
    spec = WorkerSpec(
                role="trainer",
                local_world_size=nproc_per_process,
                entrypoint="/usr/local/bin/trainer",
                args=("--trainer-args", "foobar"),
                ...<OTHER_PARAMS...>)
    agent = LocalElasticAgent(spec)
    results = agent.run()

    if not results.is_failed():
        print("binary launches do not have return values")

扩展 Agent

要扩展 Agent,可以直接实现 `ElasticAgent`,但我们建议改为扩展 SimpleElasticAgent,它提供了大部分基础结构,只需您实现一些特定的抽象方法。

class torch.distributed.elastic.agent.server.SimpleElasticAgent(spec, exit_barrier_timeout=300)[source][source]

管理特定类型工作进程 role 的 ElasticAgent

管理单个 WorkerSpec(例如特定类型的工作进程 role)对应的工作进程(WorkerGroup)的 ElasticAgent

_assign_worker_ranks(store, group_rank, group_world_size, spec)[source][source]

确定工作进程的正确 rank。

快速路径:当所有工作进程具有相同的 role 和 world size 时。我们将全局 rank 计算为 group_rank * group_world_size + local_rank。role_world_sizeglobal_world_size 相同。在这种情况下不使用 TCP store。此模式仅在用户将环境变量 TORCH_ELASTIC_WORKER_IDENTICAL 设置为 1 时启用。

时间复杂度:每个工作进程 O(1),总体 O(1)

慢速路径:当工作进程具有不同的 role 和 world size 时。我们使用以下算法:

  1. 每个 Agent 将其配置(group_rank, group_world_size, num_workers)写入公共 store。

  2. Rank 0 的 Agent 从 store 读取所有 role_info,并确定每个 Agent 的工作进程 rank。

  3. 确定全局 rank:worker 的全局 rank 是通过在其前面所有 worker 的 local_world_size 的累加和计算得出的。出于效率原因,每个 worker 都被分配一个基础全局 rank,使得其 worker 位于 [base_global_rank, base_global_rank + local_world_size) 范围内。

  4. 确定角色 rank:角色 rank 是使用点 3 中的算法确定的,但 rank 是相对于角色名称计算的。

  5. rank 0 代理将分配的 rank 写入 store。

  6. 每个代理从 store 读取分配的 rank。

时间复杂度:每个 worker O(1),rank0 O(n),总体 O(n)

返回类型

列表[torch.distributed.elastic.agent.server.api.Worker]

_exit_barrier()[source][source]

定义一个屏障,保持代理进程存活直到所有 worker 完成。

等待 exit_barrier_timeout 秒,直到所有代理完成执行其本地 worker(无论成功与否)。这作为一道安全防护,防止用户脚本在不同时间终止。

_initialize_workers(worker_group)[source][source]

为 `worker_group` 启动一组新的 worker。

本质上是一个 rendezvous,然后是 start_workers。调用者应在此方法之前先调用 _stop_workers() 来停止正在运行的 worker。

乐观地将刚启动的 worker 组的状态设置为 HEALTHY,并将实际的状态监控委托给 _monitor_workers() 方法

abstract _monitor_workers(worker_group)[source][source]

检查 worker_group 中的 worker。

此函数也返回 worker 组的新状态。

返回类型

RunResult

_rendezvous(worker_group)[source][source]

为 worker 规范指定的 worker 运行 rendezvous。

为 worker 分配新的全局 rank 和 world size。更新 worker 组的 rendezvous store。

_restart_workers(worker_group)[source][source]

重启(停止、rendezvous、启动)组中的所有本地 worker。

abstract _shutdown(death_sig=Signals.SIGTERM, is_restart=False)[source][source]

清理代理工作期间分配的任何资源。

参数

death_sig (Signals) – 发送给子进程的信号,默认为 SIGTERM

abstract _start_workers(worker_group)[source][source]

启动数量为 worker_group.spec.local_world_size 的 worker。

这是根据 worker 组的 worker 规范进行的。返回一个 local_rank 到 worker id 的映射。

返回类型

字典[整型, 任何类型]

abstract _stop_workers(worker_group, is_restart=False)[source][source]

停止给定 worker 组中的所有 worker。

实现者必须处理 WorkerState 定义的所有状态下的 worker。也就是说,必须优雅地处理停止不存在的 worker、不健康(卡住)的 worker 等情况。

class torch.distributed.elastic.agent.server.api.RunResult(state, return_values=<factory>, failures=<factory>)[source][source]

返回 worker 执行的结果。

运行结果遵循“全有或全无”策略,即当且仅当此代理管理的所有本地 worker 都成功完成时,运行才算成功。

如果结果成功(例如 is_failed() = False),则 return_values 字段包含由此代理管理的 worker 的输出(返回值),按其全局 rank 进行映射。即 result.return_values[0] 是全局 rank 0 的返回值。

注意

return_values 仅在 worker 入口点是函数时才有意义。指定为二进制入口点的 worker 通常没有返回值,因此 return_values 字段没有意义,可能为空。

如果 is_failed() 返回 True,则 failures 字段包含失败信息,同样按失败 worker 的全局 rank 进行映射。

return_valuesfailures 中的键是互斥的,即 worker 的最终状态只能是以下之一:成功、失败。由代理根据其重启策略有意终止的 worker 不会出现在 return_valuesfailures 中。

代理中的看门狗

如果在于 `LocalElasticAgent` 进程中定义了环境变量 TORCHELASTIC_ENABLE_FILE_TIMER 且其值为 1,则可以在 `LocalElasticAgent` 中启用基于命名管道的看门狗。可选地,可以设置另一个环境变量 `TORCHELASTIC_TIMER_FILE`,为其指定命名管道的唯一文件名。如果未设置环境变量 `TORCHELASTIC_TIMER_FILE``LocalElasticAgent` 将在内部创建一个唯一的文件名,并将其设置为环境变量 `TORCHELASTIC_TIMER_FILE`,此环境变量将传播到工作进程,使其能够连接到 `LocalElasticAgent` 使用的同一命名管道。

健康检查服务器

如果在 ``LocalElasticAgent`` 进程中定义了环境变量 ``TORCHELASTIC_HEALTH_CHECK_PORT``,则可以在 ``LocalElasticAgent`` 中启用健康检查监控服务器。添加健康检查服务器接口,可以通过在指定端口号上启动 tcp/http 服务器来扩展。此外,健康检查服务器将有一个回调函数来检查看门狗是否存活。

class torch.distributed.elastic.agent.server.health_check_server.HealthCheckServer(alive_callback, port, timeout)[source][source]

健康检查监控服务器接口,可以通过在指定端口上启动 tcp/http 服务器来扩展。

参数
  • alive_callback (Callable[[], int]) – Callable[[], int],代理最后进度时间的回调函数。

  • port (int) – int,启动 tcp/http 服务器的端口号。

  • timeout (int) – int,判断代理是否存活/死亡的超时秒数。

start()[source][source]

Pytorch 不支持此功能,不启动任何健康检查服务器。

stop()[source][source]

停止健康检查服务器的函数。

torch.distributed.elastic.agent.server.health_check_server.create_healthcheck_server(alive_callback, port, timeout)[source][source]

创建健康检查服务器对象。

返回类型

HealthCheckServer

文档

访问 PyTorch 的完整开发者文档

查看文档

教程

获取面向初学者和高级开发者的深入教程。

查看教程

资源

查找开发资源并获得问题解答。

查看资源