到期计时器¶
到期计时器与代理程序在同一个进程上设置,并可在你的脚本中使用,以处理停滞的 worker。当你进入一个可能停滞的代码块时,可以获取一个到期计时器,它会指示计时器服务器在 worker 未能在自行设定的到期截止日期前释放计时器时终止该进程。
用法
import torchelastic.timer as timer
import torchelastic.agent.server as agent
def main():
start_method = "spawn"
message_queue = mp.get_context(start_method).Queue()
server = timer.LocalTimerServer(message, max_interval=0.01)
server.start() # non-blocking
spec = WorkerSpec(
fn=trainer_func,
args=(message_queue,),
...<OTHER_PARAMS...>)
agent = agent.LocalElasticAgent(spec, start_method)
agent.run()
def trainer_func(message_queue):
timer.configure(timer.LocalTimerClient(message_queue))
with timer.expires(after=60): # 60 second expiry
# do some work
在上面的示例中,如果 trainer_func
执行时间超过 60 秒,则 worker 进程将被终止,并且代理程序会重试 worker 组。
客户端方法¶
- torch.distributed.elastic.timer.configure(timer_client)[source][source]¶
配置计时器客户端。必须在使用
expires
之前调用。
- torch.distributed.elastic.timer.expires(after, scope=None, client=None)[source][source]¶
获取一个倒计时计时器,它将在距现在
after
秒后到期,除非其所包裹的代码块在该时间范围内完成。当计时器到期时,此 worker 有资格被“收割”(reaped)。“收割”的确切含义取决于客户端实现。在大多数情况下,“收割”意味着终止 worker 进程。请注意,worker **不**保证在time.now() + after
时刻精确地被收割,而是 worker “有资格”被收割,并且客户端与之通信的TimerServer
最终将决定何时以及如何收割具有到期计时器的 worker。用法
torch.distributed.elastic.timer.configure(LocalTimerClient()) with expires(after=10): torch.distributed.all_reduce(...)
服务器/客户端实现¶
以下是 torchelastic 提供的计时器服务器和客户端对。
注意
计时器服务器和客户端必须始终成对实现和使用,因为服务器和客户端之间存在消息协议。
以下是基于 multiprocess.Queue
实现的一对计时器服务器和客户端。
- class torch.distributed.elastic.timer.LocalTimerServer(mp_queue, max_interval=60, daemon=True)[source][source]¶
与
LocalTimerClient
配合使用的服务器。客户端预计是运行此服务器的父进程的子进程。作业中的每个主机都应在本地启动自己的计时器服务器,每个服务器实例管理本地 worker 的计时器(在同一主机上的进程中运行)。
- class torch.distributed.elastic.timer.LocalTimerClient(mp_queue)[source][source]¶
`LocalTimerServer` 的客户端。此客户端旨在用于运行
LocalTimerServer
的同一主机上,并使用 pid 唯一标识 worker。这在具有多个 GPU 设备的主机上为每个 GPU 启动一个子进程(训练器)的情况下特别有用。
以下是基于命名管道实现的另一对计时器服务器和客户端。
- class torch.distributed.elastic.timer.FileTimerServer(file_path, run_id, max_interval=10, daemon=True, log_event=None)[source][source]¶
与
FileTimerClient
配合使用的服务器。客户端预计与运行此服务器的进程在同一主机上运行。作业中的每个主机都应在本地启动自己的计时器服务器,每个服务器实例管理本地 worker 的计时器(在同一主机上的进程中运行)。
- class torch.distributed.elastic.timer.FileTimerClient(file_path, signal=Signals.SIGKILL)[source][source]¶
`FileTimerServer` 的客户端。此客户端旨在用于运行
FileTimerServer
的同一主机上,并使用 pid 唯一标识 worker。此客户端使用命名管道向FileTimerServer
发送计时器请求。此客户端是生产者,而FileTimerServer
是消费者。多个客户端可以与同一个FileTimerServer
配合使用。- 参数
file_path (str) – str,FIFO 特殊文件的路径。
FileTimerServer
必须通过调用 os.mkfifo() 创建它。signal – signal,用于终止进程的信号。使用负数或零信号不会终止进程。
编写自定义计时器服务器/客户端¶
要编写自己的计时器服务器和客户端,请分别扩展服务器的 torch.distributed.elastic.timer.TimerServer
和客户端的 torch.distributed.elastic.timer.TimerClient
。TimerRequest
对象用于在服务器和客户端之间传递消息。
- class torch.distributed.elastic.timer.TimerRequest(worker_id, scope_id, expiration_time)[source][source]¶
表示倒计时计时器获取和释放的数据对象,用于
TimerClient
和TimerServer
之间。负数的expiration_time
应解释为“释放”请求。注意
worker_id
的类型是实现特定的。它是TimerServer
和TimerClient
实现用于唯一标识 worker 的任何内容。
- class torch.distributed.elastic.timer.TimerServer(request_queue, max_interval, daemon=True)[source][source]¶
监视活动计时器并及时使其到期的实体。此服务器负责收割(reaping)具有到期计时器的 worker。