torchx.schedulers¶
TorchX 调度器定义了现有调度器的插件。与 runner 一起使用时,它们将组件作为作业提交到相应的调度器后端。TorchX 支持一些 调度器 开箱即用。您可以通过实现 .. py:class::torchx.schedulers 并 注册 它在入口点来添加您自己的调度器。
所有调度器¶
调度器函数¶
- torchx.schedulers.get_scheduler_factories() Dict[str, SchedulerFactory] [源代码]¶
get_scheduler_factories 返回所有可用的调度器名称和实例化它们的方法。
字典中的第一个调度器用作默认调度器。
调度器类¶
- class torchx.schedulers.Scheduler(backend: str, session_name: str)[源代码]¶
抽象调度器功能的接口。实现者只需要实现那些使用
@abc.abstractmethod
注释的方法。- cancel(app_id: str) None [源代码]¶
取消/杀死应用程序。此方法在同一线程内是幂等的,并且可以在同一应用程序上安全地调用多次。但是,当从多个线程/进程对同一应用程序调用时,此方法的确切语义取决于底层调度器 API 的幂等性保证。
注意
此方法不会阻塞应用程序以达到取消状态。要确保应用程序达到最终状态,请使用
wait
API。
- close() None [源代码]¶
仅适用于具有本地状态的调度器!关闭调度器以释放任何已分配的资源。关闭后,调度器对象被认为不再有效,对该对象调用的任何方法都会导致未定义的行为。
此方法不应引发异常,并且允许在同一对象上调用多次。
注意
仅对具有本地状态的调度器实现重写(
torchx/schedulers/local_scheduler.py
)。简单地包装远程调度器客户端的调度器不需要实现此方法。
- abstract describe(app_id: str) Optional[DescribeAppResponse] [源代码]¶
描述指定的应用程序。
- 返回:
AppDef 描述或
None
(如果应用程序不存在)。
- abstract list() List[ListAppResponse] [源代码]¶
对于在调度器上启动的应用程序,此 API 返回一个 ListAppResponse 对象列表,每个对象都有应用程序 ID 及其状态。注意:此 API 处于原型阶段,可能会发生变化。
- log_iter(app_id: str, role_name: str, k: int = 0, regex: Optional[str] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, should_tail: bool = False, streams: Optional[Stream] = None) Iterable[str] [源代码]¶
返回一个迭代器,指向
``role``
的第k
个副本的日志行。当所有符合条件的日志行都被读取后,迭代器结束。如果调度器支持基于时间的光标来获取自定义时间范围内的日志行,则将遵循
since
和until
字段,否则将忽略它们。未指定since
和until
等同于获取所有可用的日志行。如果until
为空,则迭代器的行为类似于tail -f
,跟踪日志输出,直到作业达到最终状态。日志的确切定义因调度器而异。一些调度器可能将 stderr 或 stdout 视为日志,而另一些调度器可能从日志文件中读取日志。
行为和假设
如果对不存在的应用程序调用,则会产生未定义的行为。调用者应在调用此方法之前使用
exists(app_id)
检查应用程序是否存在。不是有状态的,使用相同的参数调用此方法两次将返回一个新的迭代器。先前的迭代进度将丢失。
并不总是支持日志跟踪。并非所有调度器都支持实时日志迭代(例如,在应用程序运行时跟踪日志)。有关迭代器行为的详细信息,请参阅特定调度器的文档。
- 3.1 如果调度器支持日志跟踪,则应由
should_tail
参数控制。
不保证日志保留。在调用此方法时,底层调度器可能已经清除了此应用程序的日志记录。如果是这样,此方法将引发任意异常。
如果
should_tail
为 True,则该方法仅在可访问的日志行已完全耗尽且应用程序已达到最终状态时才会引发StopIteration
异常。例如,如果应用程序卡住并且没有生成任何日志行,则迭代器将阻塞,直到应用程序最终被终止(通过超时或手动终止),此时它将引发StopIteration
。如果
should_tail
为 False,则该方法在没有更多日志时引发StopIteration
。并非所有调度器都需要支持。
一些调度器可能通过支持
__getitem__
来支持行光标(例如,iter[50]
查找第 50 行日志)。- 保留空格,每行都应包含
\n
。为了 支持交互式进度条,返回的行不需要包含
\n
,但应该在没有换行符的情况下打印,以便正确处理\r
回车符。
- 保留空格,每行都应包含
- 参数:
streams – 要选择的 IO 输出流。可以是:combined、stdout、stderr。如果调度器不支持所选流,则会引发 ValueError 异常。
- 返回:
指定角色副本的日志行的
迭代器
- 引发:
NotImplementedError – 如果调度器不支持日志迭代
- abstract schedule(dryrun_info: AppDryRunInfo) str [源代码]¶
与
submit
相同,只是它接受一个AppDryRunInfo
。鼓励实现者实现此方法,而不是直接实现submit
,因为可以通过dryrun_info = self.submit_dryrun(app, cfg) return schedule(dryrun_info)
- class torchx.schedulers.api.DescribeAppResponse(app_id: str = '<NOT_SET>', state: ~torchx.specs.api.AppState = AppState.UNSUBMITTED, num_restarts: int = -1, msg: str = '<NONE>', structured_error_msg: str = '<NONE>', ui_url: ~typing.Optional[str] = None, roles_statuses: ~typing.List[~torchx.specs.api.RoleStatus] = <factory>, roles: ~typing.List[~torchx.specs.api.Role] = <factory>)[源代码]¶
Scheduler.describe(app)
API 返回的响应对象。包含调度器已知的应用程序状态和描述。对于某些调度器实现,此响应对象包含重新创建AppDef
对象的必要和充分信息。对于这些类型的调度器,用户可以重新run()
创建的应用程序。否则,用户只能调用非创建方法(例如wait()
、status()
等)。由于此类是一个数据类,并且包含许多成员变量,因此我们保持其用法简单,提供了一个无参构造函数,并选择直接访问成员变量,而不是提供访问器。
如果调度器返回任意消息,则应填充
msg
字段。如果调度器返回结构化 json,则应填充structured_error_msg
字段。
- class torchx.schedulers.api.ListAppResponse(app_id: str, state: AppState, app_handle: str = '<NOT_SET>')[源代码]¶
scheduler.list()
和runner.list()
API 返回的响应对象。包含应用程序的 app_id、app_handle 和状态。App ID:标识在调度器上提交的应用程序的唯一标识符。App handle:使用 torchx 以 {scheduler_backend}://{session_name}/{app_id} 格式运行的应用程序的标识符,当 runner 在调度器上提交作业时由 runner 创建。ListAppResponse 中的句柄信息由runner.list()
填充。此句柄可用于使用 torchx CLI 或 torchx runner 实例进一步描述应用程序。由于此类是一个具有一些成员变量的数据类,因此我们保持其用法简单,并选择直接访问成员变量,而不是提供访问器。