快捷方式

Elastic Agent

服务器

弹性代理是 torchelastic 的控制平面。

它是一个启动和管理底层工作进程的进程。代理负责

  1. 与分布式 torch 协同工作:工作进程在启动时会获得所有必要的信息,以便成功且轻松地调用 torch.distributed.init_process_group()

  2. 容错:监控工作进程,并在检测到工作进程故障或不健康时,拆除所有工作进程并重新启动所有进程。

  3. 弹性:对成员资格更改做出反应,并使用新成员重新启动工作进程。

最简单的代理按节点部署,并与本地进程协同工作。更高级的代理可以远程启动和管理工作进程。代理可以完全去中心化,根据其管理的工作进程做出决策。或者可以协调,与其他代理(管理同一作业中的工作进程)通信以做出集体决策。

以下是管理本地工作进程组的代理的示意图。

../_images/agent_diagram.jpg

概念

本节介绍与理解 agent 在 torchelastic 中的角色相关的顶级类和概念。

class torch.distributed.elastic.agent.server.ElasticAgent[source]

负责管理一个或多个工作进程的代理进程。

假定工作进程是常规的分布式 PyTorch 脚本。当代理创建工作进程时,代理会提供必要的信息,以便工作进程正确初始化 torch 进程组。

代理与工作进程的确切部署拓扑和比率取决于代理的具体实现和用户的作业放置偏好。例如,要在具有 8 个训练器(每个 GPU 一个)的 GPU 上运行分布式训练作业,可以

  1. 使用 8 个单 GPU 实例,每个实例放置一个代理,每个代理管理 1 个工作进程。

  2. 使用 4 个双 GPU 实例,每个实例放置一个代理,每个代理管理 2 个工作进程。

  3. 使用 2 个四 GPU 实例,每个实例放置一个代理,每个代理管理 4 个工作进程。

  4. 使用 1 个 8 GPU 实例,每个实例放置一个代理,每个代理管理 8 个工作进程。

用法

group_result = agent.run()
 if group_result.is_failed():
   # workers failed
   failure = group_result.failures[0]
   logger.exception("worker 0 failed with exit code : %s", failure.exit_code)
 else:
   return group_result.return_values[0] # return rank 0's results
abstract get_worker_group(role='default')[source]

返回给定 roleWorkerGroup

请注意,工作进程组是可变对象,因此在多线程/进程环境中,其状态可能会发生变化。鼓励(但不是必需)实现者返回一个防御性的只读副本。

返回类型

WorkerGroup

abstract run(role='default')[source]

运行代理。

支持在失败时重新尝试工作进程组,最多 max_restarts 次。

返回值

执行结果,包含每个工作进程的返回值或失败详细信息,按工作进程的全局排名映射。

引发

异常 - 与工作进程无关的其他任何故障

返回类型

RunResult

class torch.distributed.elastic.agent.server.WorkerSpec(role, local_world_size, rdzv_handler, fn=None, entrypoint=None, args=(), max_restarts=3, monitor_interval=0.1, master_port=None, master_addr=None, local_addr=None)[source]

特定类型工作程序的蓝图信息。

对于给定的角色,只能存在一个工作程序规范。工作程序规范预计在所有节点(机器)上都是同质的,也就是说每个节点为特定规范运行相同数量的工作程序。

参数
  • role (str) – 用户为具有此规范的工作程序定义的角色

  • local_world_size (int) – 要运行的本地工作程序数量

  • fn (Optional[Callable]) – (已弃用,请改用 entrypoint)

  • entrypoint (Optional[Union[Callable, str]]) – 工作程序函数或命令

  • args (Tuple) – 传递给 entrypoint 的参数

  • rdzv_handler (RendezvousHandler) – 处理此组工作程序的 rdzv

  • max_restarts (int) – 工作程序的最大重试次数

  • monitor_interval (float) – 每隔 n 秒监控工作程序的状态

  • master_port (Optional[int]) – 如果未指定,则在 rank 0 上运行 c10d 存储的固定端口,否则将选择一个随机的空闲端口

  • master_addr (Optional[str]) – 如果未指定,则在 rank 0 上运行 c10d 存储的固定 master_addr,否则将选择 agent rank 0 上的主机名

  • redirects – 将标准流重定向到文件,通过传递映射选择性地为特定本地等级重定向。

  • tee – 将指定的标准流(s)输出到控制台和文件,通过传递映射选择性地为特定本地等级输出,优先于 redirects 设置。

get_entrypoint_name()[source]

获取入口点名称。

如果入口点是函数(例如 Callable),则返回其 __qualname__;否则,如果入口点是二进制文件(例如 str),则返回二进制文件名。

class torch.distributed.elastic.agent.server.WorkerState(value)[source]

WorkerGroup 的状态。

工作组中的工作程序作为单元更改状态。如果工作组中的单个工作程序失败,则整个集合被视为失败。

UNKNOWN - agent lost track of worker group state, unrecoverable
INIT - worker group object created not yet started
HEALTHY - workers running and healthy
UNHEALTHY - workers running and unhealthy
STOPPED - workers stopped (interrupted) by the agent
SUCCEEDED - workers finished running (exit 0)
FAILED - workers failed to successfully finish (exit !0)

工作组从初始 INIT 状态开始,然后进展到 HEALTHYUNHEALTHY 状态,最后达到最终的 SUCCEEDEDFAILED 状态。

工作组可以被中断,并由代理暂时置于 STOPPED 状态。处于 STOPPED 状态的工作程序将由代理在不久的将来安排重新启动。一些将工作程序置于 STOPPED 状态的示例是

  1. 工作组失败 | 观察到不健康

  2. 检测到成员资格更改

当工作组上的操作(启动、停止、rdzv、重试等)失败并导致操作被部分应用于工作组时,状态将为 UNKNOWN。通常,这种情况发生在代理上状态更改事件期间未捕获/未处理的异常。代理预计不会恢复处于 UNKNOWN 状态的工作组,最好是自行终止并允许作业管理器重试节点。

static is_running(state)[source]

返回工作程序的状态。

返回值

如果工作程序状态表示工作程序仍在运行(例如,进程存在但不一定健康),则为 True。

返回类型

布尔值

class torch.distributed.elastic.agent.server.Worker(local_rank, global_rank=-1, role_rank=-1, world_size=-1, role_world_size=-1)[source]

工作程序实例。

将其与表示工作程序规范的 WorkerSpec 进行对比。 Worker 是从 WorkerSpec 创建的。 Worker 对于 WorkerSpec 就像对象对于类。

工作程序的 idElasticAgent 的特定实现来解释。对于本地代理,它可能是工作程序的 pid (int),对于远程代理,它可以编码为 host:port (string)

参数
  • id (Any) – 唯一标识工作程序(由代理解释)

  • local_rank (int) – 工作程序的本地等级

  • global_rank (int) – 工作程序的全局等级

  • role_rank (int) – 所有具有相同角色的工作程序中工作程序的等级

  • world_size (int) – 工作程序的数量(全局)

  • role_world_size (int) – 具有相同角色的工作程序的数量

class torch.distributed.elastic.agent.server.WorkerGroup(spec)[source]

一组 Worker 实例。

该类为给定的 WorkerSpec 定义了一组由 ElasticAgent 管理的 Worker 实例。工作组是否包含跨实例工作程序取决于代理的实现。

实现

以下是 torchelastic 提供的代理实现。

class torch.distributed.elastic.agent.server.local_elastic_agent.LocalElasticAgent(spec, logs_specs, start_method='spawn', exit_barrier_timeout=300, log_line_prefix_template=None)[source]

用于处理主机本地工作进程的 torchelastic.agent.server.ElasticAgent 实现。

此代理在每个主机上部署,并配置为生成 n 个工作进程。使用 GPU 时,n 对应于主机上可用 GPU 的数量。

即使工作进程可能进行主机间通信,本地代理也不会与部署在其他主机上的其他本地代理通信。工作进程 ID 被解释为本地进程。代理将所有工作进程作为一个单元启动和停止。

传递给工作进程函数的工作进程函数和参数必须与 Python 多进程兼容。要将多进程数据结构传递给工作进程,可以在与指定的 start_method 相同的多进程上下文中创建数据结构,并将其作为函数参数传递。

exit_barrier_timeout 指定等待其他代理完成的时间量(以秒为单位)。这充当安全网,用于处理工作进程在不同时间完成的情况,以防止代理将提前完成的工作进程视为缩减事件。强烈建议用户代码处理以同步方式确保工作进程终止,而不是依赖于 exit_barrier_timeout。

如果在 `LocalElasticAgent` 进程中定义了值为 1 的环境变量 TORCHELASTIC_ENABLE_FILE_TIMER,则可以在 `LocalElasticAgent` 中启用基于命名管道的看门狗。可选地,可以使用唯一的文件名为命名管道设置另一个环境变量 `TORCHELASTIC_TIMER_FILE`。如果未设置环境变量 `TORCHELASTIC_TIMER_FILE`,则 `LocalElasticAgent` 将在内部创建一个唯一的文件名并将其设置为环境变量 `TORCHELASTIC_TIMER_FILE`,并且此环境变量将传播到工作进程,以允许它们连接到 `LocalElasticAgent` 使用的相同命名管道。

日志写入指定的日志目录。默认情况下,每行日志的前缀为 [${role_name}${local_rank}]:(例如 [trainer0]: foobar)。可以通过将 模板字符串 作为 log_line_prefix_template 参数传递来自定义日志前缀。以下宏(标识符)在运行时被替换:${role_name}, ${local_rank}, ${rank}。例如,要以全局排名而不是本地排名作为每行日志的前缀,请设置 log_line_prefix_template = "[${rank}]:

启动函数示例

def trainer(args) -> str:
    return "do train"

def main():
    start_method="spawn"
    shared_queue= multiprocessing.get_context(start_method).Queue()
    spec = WorkerSpec(
                role="trainer",
                local_world_size=nproc_per_process,
                entrypoint=trainer,
                args=("foobar",),
                ...<OTHER_PARAMS...>)
    agent = LocalElasticAgent(spec, start_method)
    results = agent.run()

    if results.is_failed():
        print("trainer failed")
    else:
        print(f"rank 0 return value: {results.return_values[0]}")
        # prints -> rank 0 return value: do train

启动二进制文件示例

def main():
    spec = WorkerSpec(
                role="trainer",
                local_world_size=nproc_per_process,
                entrypoint="/usr/local/bin/trainer",
                args=("--trainer-args", "foobar"),
                ...<OTHER_PARAMS...>)
    agent = LocalElasticAgent(spec)
    results = agent.run()

    if not results.is_failed():
        print("binary launches do not have return values")

扩展代理

要扩展代理,您可以直接实现 `ElasticAgent`,但是我们建议您改为扩展 SimpleElasticAgent,它提供了大部分脚手架,并为您留下了一些需要实现的特定抽象方法。

class torch.distributed.elastic.agent.server.SimpleElasticAgent(spec, exit_barrier_timeout=300)[source]

管理一种特定类型的工作进程角色的 ElasticAgent

管理单个 WorkerSpec(例如一种特定类型的工作进程角色)的工作进程(WorkerGroup)的 ElasticAgent

_assign_worker_ranks(store, group_rank, group_world_size, spec)[source]

确定工作进程的正确排名。

排名分配根据以下算法进行

  1. 每个代理将其配置(group_rank、group_world_size、num_workers)写入公共存储。

  2. 排名为 0 的代理从存储中读取所有 role_info 并确定每个代理的工作进程排名。

  3. 确定全局排名:工作进程的全局排名通过其前面所有工作进程的 local_world_size 的累积和计算得出。出于效率原因,每个工作进程都被分配一个基本全局排名,以便其工作进程位于 [base_global_rank, base_global_rank + local_world_size) 范围内。

  4. 确定角色排名:角色排名使用步骤 3 中的算法确定,区别在于排名是相对于角色名称计算的。

  5. 排名为 0 的代理将分配的排名写入存储。

  6. 每个代理从存储中读取分配的排名。

时间复杂度:每个工作进程 O(1),排名 0 为 O(n),总体为 O(n)

返回类型

List[Worker]

_exit_barrier()[source]

定义一个屏障,使代理进程保持活动状态,直到所有工作进程完成。

等待 exit_barrier_timeout 秒,直到所有代理完成其本地工作进程的执行(成功或失败)。这充当对用户脚本在不同时间终止的安全防护。

_initialize_workers(worker_group)[source]

为工作进程组启动一组新的工作进程。

从本质上讲,这是一个会合,然后是 start_workers。调用方应首先调用 _stop_workers() 停止正在运行的工作进程,然后再调用此方法。

乐观地将刚刚启动的工作进程组的状态设置为 HEALTHY,并将状态的实际监控委托给 _monitor_workers() 方法

abstract _monitor_workers(worker_group)[source]

检查 worker_group 中的工作进程。

此函数还返回工作进程组的新状态。

返回类型

RunResult

_rendezvous(worker_group)[source]

为工作进程规范指定的工作进程运行会合。

为工作进程分配新的全局排名和世界大小。更新工作进程组的会合存储。

_restart_workers(worker_group)[source]

重新启动(停止、会合、启动)组中的所有本地工作进程。

abstract _shutdown(death_sig=Signals.SIGTERM, is_restart=False)[source]

清理代理工作期间分配的任何资源。

参数

death_sig (Signals) – 要发送到子进程的信号,默认值为 SIGTERM

abstract _start_workers(worker_group)[source]

启动 worker_group.spec.local_world_size 个工作进程。

这根据工作进程组的工作进程规范进行。返回一个 local_rank 到工作进程 id 的映射。

返回类型

Dict[int, Any]

abstract _stop_workers(worker_group, is_restart=False)[source]

停止给定工作进程组中的所有工作进程。

实现者必须处理 WorkerState 定义的所有状态下的工作进程。也就是说,它必须优雅地处理停止不存在的工作进程、不健康(卡住)的工作进程等。

class torch.distributed.elastic.agent.server.api.RunResult(state, return_values=<factory>, failures=<factory>)[source]

返回工作进程执行的结果。

运行结果遵循“全有或全无”策略,即当且仅当此代理管理的所有本地工作进程都成功完成时,运行才算成功。

如果结果成功(例如 is_failed() = False),则 return_values 字段包含此代理管理的工作进程的输出(返回值),并按其全局排名映射。也就是说,result.return_values[0] 是全局排名为 0 的工作进程的返回值。

注意

return_values 仅在工作进程入口点为函数时才有意义。指定为二进制入口点的工作进程通常没有返回值,并且 return_values 字段没有意义并且可能为空。

如果 is_failed() 返回 True,则 failures 字段包含故障信息,同样,也按发生故障的工作进程的全局排名映射。

return_valuesfailures 中的键是互斥的,也就是说,工作进程的最终状态只能是成功或失败之一。根据代理的重启策略由代理有意终止的工作进程,既不显示在 return_values 中,也不显示在 failures 中。

代理中的看门狗

如果在 `LocalElasticAgent` 进程中定义了值为 1 的环境变量 TORCHELASTIC_ENABLE_FILE_TIMER,则可以在 `LocalElasticAgent` 中启用基于命名管道的看门狗。可选地,可以使用唯一的文件名为命名管道设置另一个环境变量 `TORCHELASTIC_TIMER_FILE`。如果未设置环境变量 `TORCHELASTIC_TIMER_FILE`,则 `LocalElasticAgent` 将在内部创建一个唯一的文件名并将其设置为环境变量 `TORCHELASTIC_TIMER_FILE`,并且此环境变量将传播到工作进程,以允许它们连接到 `LocalElasticAgent` 使用的相同命名管道。

健康检查服务器

如果在 `LocalElasticAgent` 进程中定义了环境变量 TORCHELASTIC_HEALTH_CHECK_PORT,则可以在 `LocalElasticAgent` 中启用健康检查监控服务器。添加健康检查服务器的接口,可以通过在指定的端口号上启动 tcp/http 服务器来扩展该接口。此外,健康检查服务器将具有回调函数来检查看门狗是否处于活动状态。

class torch.distributed.elastic.agent.server.health_check_server.HealthCheckServer(alive_callback, port, timeout)[source]

健康检查监控服务器的接口,可以通过在指定的端口上启动 tcp/http 服务器来扩展该接口。

参数
  • alive_callback (Callable[[], int]) – Callable[[], int],代理上次进度时间的回调函数

  • port (int) – int,启动 tcp/http 服务器的端口号

  • timeout (int) – int,确定代理处于活动状态/死机状态的超时秒数

start()[source]

Pytorch 中不支持的功能,不启动任何健康检查服务器

stop()[source]

停止健康检查服务器的函数

torch.distributed.elastic.agent.server.health_check_server.create_healthcheck_server(alive_callback, port, timeout)[source]

创建健康检查服务器对象

返回类型

HealthCheckServer

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取面向初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源