过期计时器¶
过期计时器与代理在同一进程中设置,并从脚本中使用以处理卡住的 worker。当进入可能卡住的代码块时,可以获取一个过期计时器,该计时器指示计时器服务器在未在自设过期截止时间内释放计时器时杀死进程。
用法
import torchelastic.timer as timer
import torchelastic.agent.server as agent
def main():
    start_method = "spawn"
    message_queue = mp.get_context(start_method).Queue()
    server = timer.LocalTimerServer(message, max_interval=0.01)
    server.start() # non-blocking
    spec = WorkerSpec(
                fn=trainer_func,
                args=(message_queue,),
                ...<OTHER_PARAMS...>)
    agent = agent.LocalElasticAgent(spec, start_method)
    agent.run()
def trainer_func(message_queue):
    timer.configure(timer.LocalTimerClient(message_queue))
    with timer.expires(after=60): # 60 second expiry
        # do some work
在上面的示例中,如果 trainer_func 花费超过 60 秒才能完成,则 worker 进程将被杀死,并且代理将重试 worker 组。
客户端方法¶
- torch.distributed.elastic.timer.expires(after, scope=None, client=None)[source]¶
- 获取一个倒计时器,该倒计时器在从现在起 - after秒后到期,除非它包装的代码块在时间范围内完成。当计时器到期时,此工作进程有资格被回收。回收的确切含义取决于客户端实现。在大多数情况下,回收意味着终止工作进程。请注意,工作进程不会保证在- time.now() + after时刻被回收,而是工作进程“有资格”被回收,并且客户端与之通信的- TimerServer最终将决定何时以及如何回收具有已过期计时器的工作进程。- 用法 - torch.distributed.elastic.timer.configure(LocalTimerClient()) with expires(after=10): torch.distributed.all_reduce(...) 
服务器/客户端实现¶
以下是 torchelastic 提供的计时器服务器和客户端对。
注意
计时器服务器和客户端始终必须成对实现和使用,因为服务器和客户端之间存在消息传递协议。
下面是一对基于 multiprocess.Queue 实现的计时器服务器和客户端。
- class torch.distributed.elastic.timer.LocalTimerServer(mp_queue, max_interval=60, daemon=True)[source]¶
- 与 - LocalTimerClient一起工作的服务器。客户端应是正在运行此服务器的父进程的子进程。作业中的每个主机应在本地启动自己的计时器服务器,并且每个服务器实例管理本地工作进程(在同一主机上的进程上运行)的计时器。
- class torch.distributed.elastic.timer.LocalTimerClient(mp_queue)[source]¶
- LocalTimerServer- LocalTimerServer
以下是基于命名管道实现的另一对计时器服务器和客户端。
- class torch.distributed.elastic.timer.FileTimerServer(file_path, max_interval=10, daemon=True, log_event=None)[source]¶
- 与 - FileTimerClient
- 类 torch.distributed.elastic.timer.FileTimerClient(file_path, signal=Signals.SIGKILL)[源代码]¶
- FileTimerServer的客户端端。此客户端旨在与运行- FileTimerServer的同一主机上使用,并使用 pid 唯一标识工作进程。此客户端使用命名管道向- FileTimerServer发送计时器请求。此客户端是生产者,而- FileTimerServer是消费者。多个客户端可以与同一个- FileTimerServer一起工作。- 参数
- file_path (str) – str,FIFO 特殊文件的路径。 - FileTimerServer必须通过调用 os.mkfifo() 来创建它。
- signal – 信号,用于杀死进程的信号。使用负数或零信号不会杀死进程。 
 
 
编写自定义计时器服务器/客户端¶
要编写自己的计时器服务器和客户端,请为服务器扩展 torch.distributed.elastic.timer.TimerServer,为客户端扩展 torch.distributed.elastic.timer.TimerClient。 TimerRequest 对象用于在服务器和客户端之间传递消息。
- 类 torch.distributed.elastic.timer.TimerRequest(worker_id, scope_id, expiration_time)[源代码]¶
- 表示倒计时计时器获取和释放的数据对象,用于 - TimerClient和- TimerServer之间。负- expiration_time应解释为“释放”请求。- 注意 - worker_id的类型取决于实现。它由 TimerServer 和 TimerClient 实现用于唯一标识工作进程。