快捷键

torchx.runner

Runner 允许您将组件作为独立作业在支持的 调度器 上运行。Runner 接受一个 specs.AppDef 对象,该对象是使用一组用户提供的参数评估组件函数的结果,以及调度器名称和调度器参数(也称为 runcfgrunopts),并将组件作为作业提交(见下图)。

_images/runner_diagram.png

Runner 函数

torchx.runner.get_runner(name: Optional[str] = None, component_defaults: Optional[Dict[str, Dict[str, str]]] = None, **scheduler_params: Any) Runner[source]

构建和获取 Runner 对象的便捷方法。用法

with get_runner() as runner:
  app_handle = runner.run(component(args), scheduler="kubernetes", runcfg)
  print(runner.status(app_handle))

或者,

runner = get_runner()
try:
   app_handle = runner.run(component(args), scheduler="kubernetes", runcfg)
   print(runner.status(app_handle))
finally:
   runner.close()
参数:
  • name – 人类可读的名称,将包含在所有启动的作业中。

  • scheduler_params – 将传递给所有可用调度器构造函数的额外参数。

Runner 类

class torchx.runner.Runner(name: str, scheduler_factories: Dict[str, SchedulerFactory], component_defaults: Optional[Dict[str, Dict[str, str]]] = None, scheduler_params: Optional[Dict[str, object]] = None)[source]

TorchX 单个组件运行器。具有用户对 AppDefs 进行操作的方法。如果应用程序是在本地启动的,Runner 将缓存有关已启动应用程序的信息,否则它将由特定调度器实现负责。

cancel(app_handle: str) None[source]

停止应用程序,有效地指示调度器取消作业。如果应用程序不存在,则不执行任何操作。

注意

此方法在取消请求提交到调度器后立即返回。应用程序将处于 RUNNING 状态,直到调度器实际终止作业。如果调度器成功中断作业并终止它,最终状态将是 CANCELLED,否则将是 FAILED

close() None[source]

关闭此运行器并释放/清理任何已分配的资源。递归调用所有调度器的 close() 方法。一旦在此运行器上调用此方法,运行器对象将被视为无效,并且对运行器对象以及与此运行器关联的调度器调用的任何方法都将具有未定义的行为。在同一个运行器对象上多次调用此方法是可以的。

describe(app_handle: str) Optional[AppDef][source]

根据应用程序句柄重建应用程序(尽可能)。请注意,重建的应用程序可能不是完整的应用程序,因为它通过运行 API 提交。应用程序可以重建的程度取决于调度器。

返回:

AppDef 或 None,如果应用程序不再存在或调度器不支持描述应用程序句柄

dryrun(app: AppDef, scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) AppDryRunInfo[source]

在给定调度器上使用提供的运行配置对应用程序进行试运行。不实际提交应用程序,而是返回本应提交的内容。返回的 AppDryRunInfo 格式良好,可以直接打印或记录。

用法

dryrun_info = session.dryrun(app, scheduler="local", cfg)
print(dryrun_info)
dryrun_component(component: str, component_args: List[str], scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) AppDryRunInfo[source]

run_component() 的试运行版本。不会实际运行组件,而只是返回“将”运行的内容。

list(scheduler: str) List[ListAppResponse][source]

对于在调度器上启动的应用程序,此 API 返回一个 ListAppResponse 对象列表,每个对象包含应用程序 ID、应用程序句柄及其状态。注意:此 API 处于原型阶段,可能会发生更改。

log_lines(app_handle: str, role_name: str, k: int = 0, regex: Optional[str] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, should_tail: bool = False, streams: Optional[Stream] = None) Iterable[str][source]

返回指定作业容器的日志行迭代器。

注意

  1. k 是节点(主机)ID,而不是 rank

  2. sinceuntil 不一定总是被遵守(取决于调度器)。

警告

返回的迭代器的语义和保证高度依赖于调度器。请参阅 torchx.specs.api.Scheduler.log_iter 以了解此日志迭代器的高级语义。由于这个原因,强烈不建议使用此方法生成输出以传递给下游函数/依赖项。此方法不保证返回 100% 的日志行。如果调度器已经完全或部分清除应用程序的日志记录,则此方法完全可能返回无日志行或部分日志行。

返回行将包括空格字符,例如 \n\r。输出行时,您应该确保避免添加额外的换行符。

用法

app_handle = session.run(app, scheduler="local", cfg=Dict[str, ConfigValue]())

print("== trainer node 0 logs ==")
for line in session.log_lines(app_handle, "trainer", k=0):
   # for prints newlines will already be present in the line
   print(line, end="")

   # when writing to a file nothing extra is necessary
   f.write(line)

不推荐的反模式

# DO NOT DO THIS!
# parses accuracy metric from log and reports it for this experiment run
accuracy = -1
for line in session.log_lines(app_handle, "trainer", k=0):
   if matches_regex(line, "final model_accuracy:[0-9]*"):
       accuracy = parse_accuracy(line)
       break
report(experiment_name, accuracy)
参数:
  • app_handle – 应用程序句柄

  • role_name – 应用程序中的角色(例如训练器)

  • k – 要获取日志的第 k 个角色副本

  • regex – 可选的正则表达式过滤器,如果留空则返回所有行

  • since – 基于日期时间的起始游标。如果留空,则从第一条日志行(作业开始)开始。

  • until – 基于日期时间的结束游标。如果留空,则继续跟踪日志输出,直到作业完成并且所有日志行都被使用。

返回:

指定应用程序的第 k 个角色副本的迭代器。

引发:

UnknownAppException – 如果调度器中不存在该应用程序

run(app: AppDef, scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) str[source]

在指定模式下运行给定的应用程序。

注意

Runner 的子类应该实现 schedule 方法,而不是直接覆盖此方法。

返回:

一个应用程序句柄,用于对应用程序调用其他操作 API。

run_component(component: str, component_args: List[str], scheduler: str, cfg: Optional[Mapping[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]]] = None, workspace: Optional[str] = None, parent_run_id: Optional[str] = None) str[source]

运行一个组件。

component 的解析顺序如下(从高到低):
  • 用户注册的组件。用户可以通过以下方式注册组件:

    https://packaging.pythonlang.cn/specifications/entry-points/。该方法在组 torchx.components 中查找入口点。

  • 相对于 torchx.components 的内置组件。组件路径应

    为相对于 torchx.components 的模块名称,并以以下格式指定函数名称:$module.$function

  • 格式为 $FILE_PATH:FUNCTION_NAME 的基于文件的组件。支持相对路径和

    绝对路径。

用法

# resolved to torchx.components.distributed.ddp()
runner.run_component("distributed.ddp", ...)

# resolved to my_component() function in ~/home/components.py
runner.run_component("~/home/components.py:my_component", ...)
返回:

用于在应用程序上调用其他操作 API 的应用程序句柄。

引发:
  • ComponentValidationException – 如果组件无效。

  • ComponentNotFoundException – 如果无法解析 component_path

schedule(dryrun_info: AppDryRunInfo) str[source]

实际上根据给定的 dryrun 信息运行应用程序。当需要覆盖调度程序请求中无法从某个对象 API 配置的参数时,此方法非常有用。

警告

谨慎使用此方法,因为滥用此方法来覆盖原始调度程序请求中的多个参数可能会导致您的 TorchX 使用在长期内不符合规范。此方法旨在让用户在短期内无需等待 TorchX 在其 API 中公开调度程序功能即可尝试某些特定于调度程序的功能。

注意

建议 Session 的子类实现此方法,而不是直接实现 run 方法。

用法

dryrun_info = session.dryrun(app, scheduler="default", cfg)

# overwrite parameter "foo" to "bar"
dryrun_info.request.foo = "bar"

app_handle = session.submit(dryrun_info)
scheduler_backends() List[str][source]

返回所有支持的调度程序后端的列表。

scheduler_run_opts(scheduler: str) runopts[source]

返回支持的调度程序后端的 runopts

用法

local_runopts = session.scheduler_run_opts("local_cwd")
print("local scheduler run options: {local_runopts}")
返回:

指定调度程序类型的 runopts

status(app_handle: str) Optional[AppStatus][source]
返回:

应用程序的状态,如果应用程序不再存在(例如,过去已停止并从调度程序的后端中删除),则为 None

stop(app_handle: str) None[source]

请参见方法 cancel

警告

此方法将在未来弃用。它已被 cancel 替换,后者提供相同的功能。更改是为了与 CLI 和调度程序 API 保持一致。

wait(app_handle: str, wait_interval: float = 10) Optional[AppStatus][source]

块状等待(无限期地)应用程序完成。可能的实现

while(True):
    app_status = status(app)
    if app_status.is_terminal():
        return
    sleep(10)
参数:
  • app_handle – 要等待完成的应用程序句柄

  • wait_interval – 轮询状态之前的最小等待间隔

返回:

应用程序的终端状态,或者如果应用程序不再存在,则为 None

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取面向初学者和高级开发者的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源