快捷方式

过期定时器

过期定时器与 Agent 在同一进程中设置,并在您的脚本中使用,以处理卡住的工作进程。当您进入可能卡住的代码块时,您可以获取一个过期定时器,该定时器指示定时器服务器在工作进程未能在自行设定的过期截止时间前释放定时器时终止该进程。

用法

import torchelastic.timer as timer
import torchelastic.agent.server as agent

def main():
    start_method = "spawn"
    message_queue = mp.get_context(start_method).Queue()
    server = timer.LocalTimerServer(message, max_interval=0.01)
    server.start() # non-blocking

    spec = WorkerSpec(
                fn=trainer_func,
                args=(message_queue,),
                ...<OTHER_PARAMS...>)
    agent = agent.LocalElasticAgent(spec, start_method)
    agent.run()

def trainer_func(message_queue):
    timer.configure(timer.LocalTimerClient(message_queue))
    with timer.expires(after=60): # 60 second expiry
        # do some work

在上面的示例中,如果 trainer_func 的完成时间超过 60 秒,则工作进程将被终止,Agent 将重试工作进程组。

客户端方法

torch.distributed.elastic.timer.configure(timer_client)[source][source]

配置定时器客户端。必须在使用 expires 之前调用。

torch.distributed.elastic.timer.expires(after, scope=None, client=None)[source][source]

获取一个倒计时定时器,该定时器将在从现在起 after 秒后过期,除非它包装的代码块在该时间范围内完成。当定时器过期时,此工作进程有资格被回收。“回收”的确切含义取决于客户端实现。在大多数情况下,回收意味着终止工作进程。请注意,不保证工作进程会在正好 time.now() + after 时被回收,而是工作进程“有资格”被回收,并且客户端与之通信的 TimerServer 将最终决定何时以及如何回收具有过期定时器的工作进程。

用法

torch.distributed.elastic.timer.configure(LocalTimerClient())
with expires(after=10):
    torch.distributed.all_reduce(...)

服务器/客户端实现

以下是 torchelastic 提供的定时器服务器和客户端对。

注意

定时器服务器和客户端必须始终成对实现和使用,因为服务器和客户端之间存在消息传递协议。

以下是一对基于 multiprocess.Queue 实现的定时器服务器和客户端。

class torch.distributed.elastic.timer.LocalTimerServer(mp_queue, max_interval=60, daemon=True)[source][source]

LocalTimerClient 配合使用的服务器。客户端应为运行此服务器的父进程的子进程。作业中的每个主机都应在本地启动自己的定时器服务器,并且每个服务器实例管理本地工作进程(在同一主机上的进程上运行)的定时器。

class torch.distributed.elastic.timer.LocalTimerClient(mp_queue)[source][source]

LocalTimerServer 的客户端侧。此客户端旨在与运行 LocalTimerServer 的同一主机上使用,并使用 pid 唯一标识工作进程。这在每台具有多个 GPU 设备的 GPU 主机上生成一个子进程(训练器)的情况下特别有用。

以下是另一对基于命名管道实现的定时器服务器和客户端。

class torch.distributed.elastic.timer.FileTimerServer(file_path, run_id, max_interval=10, daemon=True, log_event=None)[source][source]

FileTimerClient 配合使用的服务器。客户端应与运行此服务器的进程在同一主机上运行。作业中的每个主机都应在本地启动自己的定时器服务器,并且每个服务器实例管理本地工作进程(在同一主机上的进程上运行)的定时器。

参数
  • file_path (str) – str,要创建的 FIFO 特殊文件的路径。

  • max_interval (float) – float,每个 watchdog 循环的最大间隔秒数。

  • daemon (bool) – bool,是否在守护程序模式下运行 watchdog 线程。守护程序线程不会阻止进程停止。

  • log_event (Optional[Callable[[str, Optional[FileTimerRequest]], None]]) – Callable[[Dict[str, str]], None],用于以 JSON 格式记录事件的可选回调函数。

class torch.distributed.elastic.timer.FileTimerClient(file_path, signal=Signals.SIGKILL)[source][source]

FileTimerServer 的客户端侧。此客户端旨在与运行 FileTimerServer 的同一主机上使用,并使用 pid 唯一标识工作进程。此客户端使用命名管道向 FileTimerServer 发送定时器请求。此客户端是生产者,而 FileTimerServer 是消费者。多个客户端可以与同一个 FileTimerServer 一起工作。

参数
  • file_path (str) – str,FIFO 特殊文件的路径。FileTimerServer 必须通过调用 os.mkfifo() 创建它。

  • signal – signal,用于终止进程的信号。使用负信号或零信号将不会终止进程。

编写自定义定时器服务器/客户端

要编写您自己的定时器服务器和客户端,请扩展服务器的 torch.distributed.elastic.timer.TimerServer 和客户端的 torch.distributed.elastic.timer.TimerClientTimerRequest 对象用于在服务器和客户端之间传递消息。

class torch.distributed.elastic.timer.TimerRequest(worker_id, scope_id, expiration_time)[source][source]

数据对象,表示 TimerClientTimerServer 之间使用的倒计时定时器的获取和释放。负的 expiration_time 应解释为“释放”请求。

注意

worker_id 的类型是特定于实现的。它是 TimerServer 和 TimerClient 实现用来唯一标识工作进程的任何内容。

class torch.distributed.elastic.timer.TimerServer(request_queue, max_interval, daemon=True)[source][source]

监视活动定时器并及时使其过期的实体。此服务器负责回收具有过期定时器的工作进程。

abstract clear_timers(worker_ids)[source][source]

清除给定 worker_ids 的所有定时器。

abstract get_expired_timers(deadline)[source][source]

返回每个 worker_id 的所有过期定时器。过期定时器是指 expiration_time 小于或等于提供的截止时间的定时器。

返回类型

Dict[str, List[TimerRequest]]

abstract register_timers(timer_requests)[source][source]

处理传入的定时器请求并将它们注册到服务器。定时器请求可以是获取定时器请求或释放定时器请求。expiration_time 为负的定时器请求应解释为释放定时器请求。

class torch.distributed.elastic.timer.TimerClient[source][source]

客户端库,用于通过与 TimerServer 通信来获取和释放倒计时定时器。

abstract acquire(scope_id, expiration_time)[source][source]

为持有此客户端对象的工作进程获取定时器,给定 scope_id 和 expiration_time。通常向 TimerServer 注册定时器。

abstract release(scope_id)[source][source]

在此客户端代表的工作进程上,释放 scope_id 的定时器。调用此方法后,scope 上的倒计时定时器不再有效。

调试信息日志记录

torch.distributed.elastic.timer.debug_info_logging.log_debug_info_for_expired_timers(run_id, expired_timers)[source][source]

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取面向初学者和高级开发者的深度教程

查看教程

资源

查找开发资源并获得问题解答

查看资源