快捷方式

到期计时器

到期计时器是在与代理相同的进程上设置的,并从您的脚本中使用,以处理卡死的 worker。当您进入可能卡死的代码块时,您可以获取到期计时器,该计时器指示计时器服务器在它未在自我强加的到期截止日期前释放计时器的情况下终止进程。

用法

import torchelastic.timer as timer
import torchelastic.agent.server as agent

def main():
    start_method = "spawn"
    message_queue = mp.get_context(start_method).Queue()
    server = timer.LocalTimerServer(message, max_interval=0.01)
    server.start() # non-blocking

    spec = WorkerSpec(
                fn=trainer_func,
                args=(message_queue,),
                ...<OTHER_PARAMS...>)
    agent = agent.LocalElasticAgent(spec, start_method)
    agent.run()

def trainer_func(message_queue):
    timer.configure(timer.LocalTimerClient(message_queue))
    with timer.expires(after=60): # 60 second expiry
        # do some work

在上面的示例中,如果 trainer_func 耗时超过 60 秒才能完成,则 worker 进程将被终止,代理将重试 worker 组。

客户端方法

torch.distributed.elastic.timer.configure(timer_client)[source]

配置计时器客户端。必须在使用 expires 之前调用。

torch.distributed.elastic.timer.expires(after, scope=None, client=None)[source]

获取一个倒计时器,该计时器将在 after 秒后到期,除非它包装的代码块在该时间段内完成。计时器到期后,该 worker 将有资格被收割。 “收割”的确切含义取决于客户端实现。在大多数情况下,收割意味着终止 worker 进程。请注意,worker 不保证在 exactly time.now() + after 时被收割,而是 worker “有资格”被收割,客户端与之通信的 TimerServer 最终将决定何时以及如何收割具有已到期计时器的 worker。

用法

torch.distributed.elastic.timer.configure(LocalTimerClient())
with expires(after=10):
    torch.distributed.all_reduce(...)

服务器/客户端实现

以下是 torchelastic 提供的计时器服务器和客户端对。

注意

计时器服务器和客户端必须成对实现和使用,因为服务器和客户端之间存在消息协议。

以下是一对计时器服务器和客户端,它们是基于 multiprocess.Queue 实现的。

class torch.distributed.elastic.timer.LocalTimerServer(mp_queue, max_interval=60, daemon=True)[source]

LocalTimerClient 协同工作的服务器。客户端预计是运行此服务器的父进程的子进程。作业中的每个主机都预计会本地启动自己的计时器服务器,并且每个服务器实例都会管理本地 worker(在同一主机上的进程上运行)的计时器。

class torch.distributed.elastic.timer.LocalTimerClient(mp_queue)[source]

LocalTimerServer 的客户端端。此客户端旨在用于与 LocalTimerServer 运行在同一主机上,并使用 pid 来唯一标识 worker。这在每个主机上都有多个 GPU 设备的情况下,每个 GPU 都会产生一个子进程(训练器)的情况下尤其有用。

以下是一对计时器服务器和客户端,它们是基于命名管道实现的。

class torch.distributed.elastic.timer.FileTimerServer(file_path, run_id, max_interval=10, daemon=True, log_event=None)[source]

FileTimerClient 协同工作的服务器。 客户端应与运行此服务器的进程位于同一主机上。 作业中的每个主机都应该在本地启动自己的计时器服务器,并且每个服务器实例管理本地工作者的计时器(在同一主机上的进程上运行)。

参数
  • file_path (str) – str,要创建的 FIFO 特殊文件的路径。

  • max_interval (float) – float,每个看门狗循环的最大间隔时间(以秒为单位)。

  • daemon (bool) – bool,是否以守护进程模式运行看门狗线程。 守护进程不会阻止进程停止。

  • log_event (Optional[Callable[[str, Optional[FileTimerRequest]], None]]) – Callable[[Dict[str, str]], None],用于以 JSON 格式记录事件的可选回调函数。

class torch.distributed.elastic.timer.FileTimerClient(file_path, signal=Signals.SIGKILL)[source]

FileTimerServer 的客户端。 此客户端旨在与运行 FileTimerServer 的同一主机一起使用,并使用 pid 来唯一标识工作者。 此客户端使用命名管道将计时器请求发送到 FileTimerServer。 此客户端是生产者,而 FileTimerServer 是消费者。 多个客户端可以与同一个 FileTimerServer 协同工作。

参数
  • file_path (str) – str,FIFO 特殊文件的路径。 FileTimerServer 必须通过调用 os.mkfifo() 来创建它。

  • signal – signal,用于杀死进程的信号。 使用负数或零信号不会杀死进程。

编写自定义计时器服务器/客户端

要编写自己的计时器服务器和客户端,请扩展 torch.distributed.elastic.timer.TimerServer(用于服务器)和 torch.distributed.elastic.timer.TimerClient(用于客户端)。 TimerRequest 对象用于在服务器和客户端之间传递消息。

class torch.distributed.elastic.timer.TimerRequest(worker_id, scope_id, expiration_time)[source]

表示在 TimerClientTimerServer 之间使用的倒计时计时器获取和释放的数据对象。 负数的 expiration_time 应被解释为“释放”请求。

注意

worker_id 的类型是特定于实现的。 它是 TimerServer 和 TimerClient 实现用来唯一标识工作者的任何内容。

class torch.distributed.elastic.timer.TimerServer(request_queue, max_interval, daemon=True)[source]

监控活动计时器并在适当的时候将其到期的实体。 此服务器负责回收计时器到期的工作者。

abstract clear_timers(worker_ids)[source]

清除给定 worker_ids 的所有计时器。

abstract get_expired_timers(deadline)[source]

返回每个 worker_id 的所有已到期计时器。 已到期计时器是指其 expiration_time 小于或等于所提供期限的计时器。

返回类型

Dict[str, List[TimerRequest]]

abstract register_timers(timer_requests)[source]

处理传入的计时器请求并将其注册到服务器。 计时器请求可以是获取计时器或释放计时器请求。 具有负数 expiration_time 的计时器请求应被解释为释放计时器请求。

class torch.distributed.elastic.timer.TimerClient[source]

通过与 TimerServer 通信来获取和释放倒计时计时器的客户端库。

abstract acquire(scope_id, expiration_time)[source]

获取持有此客户端对象的 worker 的计时器,给定 scope_id 和 expiration_time。 通常将计时器注册到 TimerServer。

abstract release(scope_id)[source]

释放此客户端代表的 worker 上的 scope_id 的计时器。 调用此方法后,作用域上的倒计时计时器将不再有效。

调试信息记录

torch.distributed.elastic.timer.debug_info_logging.log_debug_info_for_expired_timers(run_id, expired_timers)[source]

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取针对初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得答案

查看资源