快捷方式

弹性 Agent

服务器

弹性 agent 是 torchelastic 的控制平面。

它是一个启动和管理底层 worker 进程的进程。Agent 负责

  1. 与分布式 torch 协同工作:worker 启动时会携带所有必要信息,以成功且简单地调用 torch.distributed.init_process_group()

  2. 容错:监控 worker,并在检测到 worker 故障或不健康时,关闭所有 worker 并重新启动所有 worker。

  3. 弹性:对成员变更做出反应,并使用新成员重新启动 worker。

最简单的 agent 部署在每个节点上,并与本地进程协同工作。更高级的 agent 可以远程启动和管理 worker。Agent 可以完全去中心化,根据其管理的 worker 做出决策。或者可以是协同的,与其他 agent(管理同一作业中的 worker)通信以做出集体决策。

以下是管理本地 worker 组的 agent 的示意图。

../_images/agent_diagram.jpg

概念

本节介绍与理解 agent 在 torchelastic 中的角色相关的高级类和概念。

class torch.distributed.elastic.agent.server.ElasticAgent[source][source]

负责管理一个或多个 worker 进程的 agent 进程。

worker 进程被假定为常规的分布式 PyTorch 脚本。当 worker 进程由 agent 创建时,agent 会提供必要的信息,以便 worker 进程正确初始化 torch 进程组。

agent 到 worker 的确切部署拓扑和比率取决于 agent 的具体实现和用户的作业放置偏好。例如,要在 GPU 上运行分布式训练作业,使用 8 个训练器(每个 GPU 一个),可以

  1. 使用 8 个单 GPU 实例,每个实例放置一个 agent,每个 agent 管理 1 个 worker。

  2. 使用 4 个双 GPU 实例,每个实例放置一个 agent,每个 agent 管理 2 个 worker。

  3. 使用 2 个四 GPU 实例,每个实例放置一个 agent,每个 agent 管理 4 个 worker。

  4. 使用 1 个 8 GPU 实例,每个实例放置一个 agent,每个 agent 管理 8 个 worker。

用法

group_result = agent.run()
 if group_result.is_failed():
   # workers failed
   failure = group_result.failures[0]
   logger.exception("worker 0 failed with exit code : %s", failure.exit_code)
 else:
   return group_result.return_values[0] # return rank 0's results
abstract get_worker_group(role='default')[source][source]

返回给定 roleWorkerGroup

请注意,worker 组是一个可变对象,因此在多线程/进程环境中,它的状态可能会改变。鼓励(但不要求)实现者返回防御性的只读副本。

返回类型

WorkerGroup

abstract run(role='default')[source][source]

运行 agent。

支持在失败时重试 worker 组,最多 max_restarts 次。

返回

执行结果,其中包含每个 worker 的返回值或失败详情,并按 worker 的全局 rank 映射。

引发

Exception - 任何其他与 worker 进程无关的故障

返回类型

RunResult

class torch.distributed.elastic.agent.server.WorkerSpec(role, local_world_size, rdzv_handler, fn=None, entrypoint=None, args=(), max_restarts=3, monitor_interval=0.1, master_port=None, master_addr=None, local_addr=None)[source][source]

关于特定类型 worker 的蓝图信息。

对于给定的角色,只能存在一个 worker spec。Worker spec 预计在所有节点(机器)上是同构的,即每个节点为特定 spec 运行相同数量的 worker。

参数
  • role (str) – 此 spec 的 worker 的用户定义角色

  • local_world_size (int) – 要运行的本地 worker 数量

  • fn (Optional[Callable]) – (已弃用,请改用 entrypoint)

  • entrypoint (Optional[Union[Callable, str]]) – worker 函数或命令

  • args (Tuple) – 传递给 entrypoint 的参数

  • rdzv_handler (RendezvousHandler) – 处理这组 worker 的 rdzv

  • max_restarts (int) – worker 的最大重试次数

  • monitor_interval (float) – 每隔 n 秒监控 worker 的状态

  • master_port (Optional[int]) – 在 rank 0 上运行 c10d 存储的固定端口,如果未指定,则将选择随机空闲端口

  • master_addr (Optional[str]) – 在 rank 0 上运行 c10d 存储的固定 master_addr,如果未指定,则将选择 agent rank 0 上的主机名

  • redirects – 将标准流重定向到文件,通过传递映射为特定本地 rank 选择性地重定向

  • tee – 将指定的标准流复制到控制台 + 文件,通过传递映射为特定本地 rank 选择性地复制,优先于 redirects 设置。

get_entrypoint_name()[source][source]

获取入口点名称。

如果入口点是一个函数(例如 Callable),则返回其 __qualname__;如果入口点是一个二进制文件(例如 str),则返回二进制文件名。

class torch.distributed.elastic.agent.server.WorkerState(value)[source][source]

WorkerGroup 的状态。

worker 组中的 worker 以单元形式更改状态。如果 worker 组中的单个 worker 失败,则整个集合被视为失败

UNKNOWN - agent lost track of worker group state, unrecoverable
INIT - worker group object created not yet started
HEALTHY - workers running and healthy
UNHEALTHY - workers running and unhealthy
STOPPED - workers stopped (interrupted) by the agent
SUCCEEDED - workers finished running (exit 0)
FAILED - workers failed to successfully finish (exit !0)

worker 组从初始 INIT 状态开始,然后进展到 HEALTHYUNHEALTHY 状态,最终达到终端 SUCCEEDEDFAILED 状态。

worker 组可以被 agent 中断并临时置于 STOPPED 状态。处于 STOPPED 状态的 worker 计划在不久的将来由 agent 重新启动。worker 被置于 STOPPED 状态的一些示例包括

  1. 观察到 worker 组故障|不健康

  2. 检测到成员变更

当 worker 组上的操作(启动、停止、rdzv、重试等)失败并导致操作部分应用于 worker 组时,状态将为 UNKNOWN。通常,这发生在 agent 上的状态更改事件期间发生未捕获/未处理的异常时。Agent 不应恢复处于 UNKNOWN 状态的 worker 组,最好自行终止并允许作业管理器重试节点。

static is_running(state)[source][source]

返回 Worker 的状态。

返回

如果 worker 状态表示 worker 仍在运行(例如,进程存在但不一定健康),则为 True。

返回类型

bool

class torch.distributed.elastic.agent.server.Worker(local_rank, global_rank=-1, role_rank=-1, world_size=-1, role_world_size=-1)[source][source]

worker 实例。

将其与代表 worker 规范的 WorkerSpec 进行对比。Worker 是从 WorkerSpec 创建的。Worker 之于 WorkerSpec,就像对象之于类。

worker 的 idElasticAgent 的特定实现解释。对于本地 agent,它可以是 worker 的 pid (int),对于远程 agent,它可以编码为 host:port (string)

参数
  • id (Any) – 唯一标识 worker(由 agent 解释)

  • local_rank (int) – worker 的本地 rank

  • global_rank (int) – worker 的全局 rank

  • role_rank (int) – 在所有具有相同角色的 worker 中的 worker 的 rank

  • world_size (int) – worker 的数量(全局)

  • role_world_size (int) – 具有相同角色的 worker 的数量

class torch.distributed.elastic.agent.server.WorkerGroup(spec)[source][source]

Worker 实例的集合。

此类定义了由 ElasticAgent 管理的给定 WorkerSpec 的一组 Worker 实例。worker 组是否包含跨实例 worker 取决于 agent 的实现。

实现

以下是 torchelastic 提供的 agent 实现。

class torch.distributed.elastic.agent.server.local_elastic_agent.LocalElasticAgent(spec, logs_specs, start_method='spawn', exit_barrier_timeout=300, log_line_prefix_template=None)[source][source]

处理主机本地 worker 的 torchelastic.agent.server.ElasticAgent 的实现。

此 agent 部署在每个主机上,并配置为生成 n 个 worker。使用 GPU 时,n 映射到主机上可用的 GPU 数量。

本地 agent 不与其他主机上部署的本地 agent 通信,即使 worker 可能进行跨主机通信。worker id 被解释为本地进程。Agent 作为一个单元启动和停止所有 worker 进程。

传递给 worker 函数的 worker 函数和参数必须与 python 多进程兼容。要将多进程数据结构传递给 worker,您可以在与指定的 start_method 相同的多进程上下文中创建数据结构,并将其作为函数参数传递。

exit_barrier_timeout 指定等待其他 agent 完成的时间量(以秒为单位)。这充当安全网,以处理 worker 在不同时间完成的情况,以防止 agent 将提前完成的 worker 视为缩减事件。强烈建议用户代码处理确保 worker 以同步方式终止,而不是依赖于 exit_barrier_timeout。

如果环境变量 TORCHELASTIC_ENABLE_FILE_TIMER 的值为 1,则可以在 `LocalElasticAgent` 中启用基于命名管道的 watchdog。可选地,可以为命名管道设置另一个环境变量 `TORCHELASTIC_TIMER_FILE`,并使用唯一的文件名。如果未设置环境变量 `TORCHELASTIC_TIMER_FILE`,则 `LocalElasticAgent` 将在内部创建一个唯一的文件名并将其设置为环境变量 `TORCHELASTIC_TIMER_FILE`,并且此环境变量将传播到 worker 进程,以允许它们连接到 `LocalElasticAgent` 使用的同一命名管道。

日志写入指定的日志目录。默认情况下,每行日志将以 [${role_name}${local_rank}]: 为前缀(例如 [trainer0]: foobar)。可以通过传递 模板字符串 作为 log_line_prefix_template 参数来自定义日志前缀。以下宏(标识符)在运行时被替换:${role_name}, ${local_rank}, ${rank}。例如,要使用全局 rank 而不是本地 rank 作为每行日志的前缀,请设置 log_line_prefix_template = "[${rank}]:

启动函数示例

def trainer(args) -> str:
    return "do train"

def main():
    start_method="spawn"
    shared_queue= multiprocessing.get_context(start_method).Queue()
    spec = WorkerSpec(
                role="trainer",
                local_world_size=nproc_per_process,
                entrypoint=trainer,
                args=("foobar",),
                ...<OTHER_PARAMS...>)
    agent = LocalElasticAgent(spec, start_method)
    results = agent.run()

    if results.is_failed():
        print("trainer failed")
    else:
        print(f"rank 0 return value: {results.return_values[0]}")
        # prints -> rank 0 return value: do train

启动二进制文件示例

def main():
    spec = WorkerSpec(
                role="trainer",
                local_world_size=nproc_per_process,
                entrypoint="/usr/local/bin/trainer",
                args=("--trainer-args", "foobar"),
                ...<OTHER_PARAMS...>)
    agent = LocalElasticAgent(spec)
    results = agent.run()

    if not results.is_failed():
        print("binary launches do not have return values")

扩展 Agent

要扩展 agent,您可以直接实现 `ElasticAgent,但是我们建议您扩展 SimpleElasticAgent,它提供了大部分脚手架,并为您留下了一些特定的抽象方法来实现。

class torch.distributed.elastic.agent.server.SimpleElasticAgent(spec, exit_barrier_timeout=300)[source][source]

管理一种特定类型 worker 角色的 ElasticAgent

管理单个 WorkerSpec(例如一种特定类型的 worker 角色)的 worker (WorkerGroup) 的 ElasticAgent

_assign_worker_ranks(store, group_rank, group_world_size, spec)[source][source]

确定 worker 进程的正确 rank。

快速路径:当所有 worker 具有相同的角色和 world size 时。我们计算全局 rank 为 group_rank * group_world_size + local_rank。role_world_sizeglobal_world_size 相同。在这种情况下不使用 TCP 存储。仅当用户将环境变量 TORCH_ELASTIC_WORKER_IDENTICAL 设置为 1 时才启用此功能。

时间复杂度:每个 worker O(1),总体 O(1)

慢速路径:当 worker 具有不同的角色和 world size 时。我们使用以下算法

  1. 每个 agent 将其配置(group_rank、group_world_size、num_workers)写入公共存储。

  2. Rank 0 agent 从存储中读取所有 role_info,并确定每个 agent 的 worker rank。

  3. 确定全局 rank:worker 的全局 rank 通过累加其前面所有 worker 的 local_world_size 来计算。出于效率原因,为每个 worker 分配一个基本全局 rank,使其 worker 位于 [base_global_rank, base_global_rank + local_world_size) 范围内。

  4. 确定角色 rank:角色 rank 使用第 3 点中的算法确定,但 rank 是相对于角色名称计算的。

  5. Rank 0 agent 将分配的 rank 写入存储。

  6. 每个 agent 从存储中读取分配的 rank。

时间复杂度:每个 worker O(1),rank0 O(n),总体 O(n)

返回类型

List[Worker]

_exit_barrier()[source][source]

定义一个屏障,使 agent 进程保持活动状态,直到所有 worker 完成。

等待 exit_barrier_timeout 秒,以便所有 agent 完成执行其本地 worker(无论是成功还是失败)。这充当安全保护,防止用户脚本在不同时间终止。

_initialize_workers(worker_group)[source][source]

为 worker_group 启动一组全新的 worker。

本质上,这是一个 rendezvous 过程,然后是 start_workers。调用者应先调用 _stop_workers() 停止正在运行的 worker,然后再调用此方法。

乐观地将刚启动的 worker group 的状态设置为 HEALTHY,并将实际的状态监控委托给 _monitor_workers() 方法

abstract _monitor_workers(worker_group)[source][source]

检查 worker_group 的 worker。

此函数还会返回 worker group 的新状态。

返回类型

RunResult

_rendezvous(worker_group)[source][source]

为 worker spec 指定的 worker 运行 rendezvous。

为 worker 分配新的全局 rank 和 world size。更新 worker group 的 rendezvous store。

_restart_workers(worker_group)[source][source]

重启(停止、rendezvous、启动)组中的所有本地 worker。

abstract _shutdown(death_sig=Signals.SIGTERM, is_restart=False)[source][source]

清理 agent 工作期间分配的任何资源。

参数

death_sig (Signals) – 发送给子进程的信号,默认为 SIGTERM

abstract _start_workers(worker_group)[source][source]

启动 worker_group.spec.local_world_size 数量的 worker。

这根据 worker group 的 worker spec。返回从 local_rank 到 worker id 的映射。

返回类型

Dict[int, Any]

abstract _stop_workers(worker_group, is_restart=False)[source][source]

停止给定 worker group 中的所有 worker。

实现者必须处理 WorkerState 定义的所有状态的 worker。也就是说,它必须优雅地处理停止不存在的 worker、不健康的(卡住的)worker 等。

class torch.distributed.elastic.agent.server.api.RunResult(state, return_values=<factory>, failures=<factory>)[source][source]

返回 worker 执行的结果。

运行结果遵循“全有或全无”策略,只有当此 agent 管理的所有本地 worker 都成功完成时,运行才算成功。

如果结果成功(例如 is_failed() = False),则 return_values 字段包含由此 agent 管理的 worker 的输出(返回值),并按其 GLOBAL rank 映射。也就是说,result.return_values[0] 是全局 rank 0 的返回值。

注意

return_values 仅在 worker entrypoint 是函数时才有意义。指定为二进制 entrypoint 的 worker 通常没有返回值,return_values 字段没有意义,可能为空。

如果 is_failed() 返回 True,则 failures 字段包含失败信息,同样,按失败的 worker 的 GLOBAL rank 映射。

return_valuesfailures 中的键是互斥的,也就是说,worker 的最终状态只能是以下之一:成功、失败。根据 agent 的重启策略,agent 有意终止的 worker 不会出现在 return_valuesfailures 中。

Agent 中的 Watchdog

如果环境变量 TORCHELASTIC_ENABLE_FILE_TIMER 的值为 1,则可以在 `LocalElasticAgent` 中启用基于命名管道的 watchdog。可选地,可以为命名管道设置另一个环境变量 `TORCHELASTIC_TIMER_FILE`,并使用唯一的文件名。如果未设置环境变量 `TORCHELASTIC_TIMER_FILE`,则 `LocalElasticAgent` 将在内部创建一个唯一的文件名并将其设置为环境变量 `TORCHELASTIC_TIMER_FILE`,并且此环境变量将传播到 worker 进程,以允许它们连接到 `LocalElasticAgent` 使用的同一命名管道。

健康检查服务器

如果 `LocalElasticAgent` 进程中定义了环境变量 TORCHELASTIC_HEALTH_CHECK_PORT,则可以在 `LocalElasticAgent` 中启用健康检查监控服务器。添加健康检查服务器的接口,可以通过在指定的端口号上启动 tcp/http 服务器来扩展。此外,健康检查服务器将具有回调来检查 watchdog 是否存活。

class torch.distributed.elastic.agent.server.health_check_server.HealthCheckServer(alive_callback, port, timeout)[source][source]

健康检查监控服务器的接口,可以通过在指定的端口上启动 tcp/http 服务器来扩展。

参数
  • alive_callback (Callable[[], int]) – Callable[[], int],agent 上次进度时间的回调

  • port (int) – int,启动 tcp/http 服务器的端口号

  • timeout (int) – int,判断 agent 是否存活/死机的超时秒数

start()[source][source]

Pytorch 不支持此功能,不会启动任何健康检查服务器

stop()[source][source]

停止健康检查服务器的函数

torch.distributed.elastic.agent.server.health_check_server.create_healthcheck_server(alive_callback, port, timeout)[source][source]

创建健康检查服务器对象

返回类型

HealthCheckServer

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取面向初学者和高级开发者的深度教程

查看教程

资源

查找开发资源并获得问题解答

查看资源