torchx.specs¶
这包含 TorchX AppDef 和相关的组件定义。这些由组件用于定义应用程序,然后可以通过 TorchX 调度器或管道适配器启动这些应用程序。
AppDef¶
Role¶
- class torchx.specs.Role(name: str, image: str, min_replicas: ~typing.Optional[int] = None, base_image: ~typing.Optional[str] = None, entrypoint: str = '<MISSING>', args: ~typing.List[str] = <factory>, env: ~typing.Dict[str, str] = <factory>, num_replicas: int = 1, max_retries: int = 0, retry_policy: ~torchx.specs.api.RetryPolicy = RetryPolicy.APPLICATION, resource: ~torchx.specs.api.Resource = <factory>, port_map: ~typing.Dict[str, int] = <factory>, metadata: ~typing.Dict[str, ~typing.Any] = <factory>, mounts: ~typing.List[~typing.Union[~torchx.specs.api.BindMount, ~torchx.specs.api.VolumeMount, ~torchx.specs.api.DeviceMount]] = <factory>)[source]¶
在
AppDef
中执行特定任务的一组节点。示例分布式数据并行应用程序 - 由一个角色(训练器)组成。
具有参数服务器的应用程序 - 由多个角色(训练器、ps)组成。
注意
一个
image
是一个软件包,安装在调度器安排的容器上。调度器上的容器决定了图像的实际含义。图像可以像一个 tar-ball 一样简单,也可以映射到一个 docker 镜像。调度器通常知道如何根据图像名称(str)“拉取”图像,该名称可以是一个简单的名称(例如 docker 镜像)或一个 url,例如s3://path/my_image.tar
。使用
trainer = Role(name="trainer", image = "pytorch/torch:1", entrypoint = "my_trainer.py" args = ["--arg", "foo", ENV_VAR="FOOBAR"], num_replicas = 4, resource = Resource(cpu=1, gpu=1, memMB=500), port_map={"tcp_store":8080, "tensorboard": 8081}, metadata={"local_cwd.property", value})
- 参数:
name – 角色的名称
image – 安装在容器上的软件包。
entrypoint – (容器内的)调用角色的命令
args – 入口点命令的命令行参数
env – 环境变量映射
num_replicas – 要运行的容器副本数量
min_replicas – 作业启动的最小副本数。当设置后,作业大小可以在 min_replicas 和 num_replicas 之间自动调整,具体取决于集群资源和策略。如果调度器不支持自动缩放,则忽略此字段,作业大小将为 num_replicas。实验性:对于 HOT_SPARE 重新启动策略,此字段用于指示作业运行所需的仲裁。
max_retries – 在放弃之前尝试的最大次数
retry_policy – 副本失败时的重试行为
resource – 角色的资源需求。角色应由调度器在
num_replicas
容器上进行调度,每个容器应至少具有resource
保证。port_map – 角色的端口映射。键是端口的唯一标识符,例如“tensorboard”: 9090
metadata – 与角色关联的自由格式信息,例如特定于调度器的数据。键应遵循以下模式:
$scheduler.$key
mounts – 机器上的挂载列表
- class torchx.specs.RetryPolicy(value)[source]¶
定义
AppDef
中Roles
的重试策略。该策略定义了角色副本遇到故障时的行为不成功(非零)退出代码
硬件/主机崩溃
抢占
驱逐
注意
并非所有重试策略都受所有调度器支持。但是,所有调度器都必须支持
RetryPolicy.APPLICATION
。有关他们支持的重试策略以及行为注意事项(如果有)的更多信息,请参阅调度器的文档。- REPLICA:替换副本实例。存活的副本保持不变。
与
dist.ddp
组件一起使用,以便 torchelastic 协调重启和成员资格更改。否则,应用程序需要处理失败的副本离开和替换副本加入。
APPLICATION:重启整个应用程序。
- HOT_SPARE:重启角色的副本,只要配额(min_replicas)
使用额外的主机作为备用不会被违反。它实际上不支持弹性,只是使用 num_replicas 和 min_replicas 之间的差值作为备用(实验性)。
资源¶
- class torchx.specs.Resource(cpu: int, gpu: int, memMB: int, capabilities: ~typing.Dict[str, ~typing.Any] = <factory>, devices: ~typing.Dict[str, int] = <factory>)[source]¶
表示
Role
的资源需求。- 参数:
cpu – 逻辑 CPU 内核数量。CPU 内核的定义取决于调度器。有关逻辑 CPU 内核如何映射到物理内核和线程,请参阅您的调度器文档。
gpu – GPU 数量
memMB – MB 内存
capabilities – 其他硬件规格(由调度器解释)
devices – 命名设备及其数量列表
注意:您应该优先使用 named_resources,而不是直接指定原始资源需求。
- torchx.specs.resource(cpu: Optional[int] = None, gpu: Optional[int] = None, memMB: Optional[int] = None, h: Optional[str] = None) Resource [source]¶
一个方便的方法,用于从原始资源规格(cpu、gpu、memMB)或注册的命名资源(
h
)创建Resource
对象。请注意,(cpu、gpu、memMB)与h
互斥,如果指定了h
则优先使用。如果指定了
h
,则它将用于从已注册的命名资源列表中查找资源规格。请参阅 注册命名资源。否则,将从原始资源规格创建
Resource
对象。示例
resource(cpu=1) # returns Resource(cpu=1) resource(named_resource="foobar") # returns registered named resource "foo" resource(cpu=1, named_resource="foobar") # returns registered named resource "foo" (cpu=1 ignored) resource() # returns default resource values resource(cpu=None, gpu=None, memMB=None) # throws
- torchx.specs.get_named_resources(res: str) Resource [source]¶
根据通过 entrypoints.txt 注册的字符串定义获取资源对象。
TorchX 实现
named_resource
注册机制,它包括以下步骤创建模块并定义您的资源检索函数
# my_module.resources from typing import Dict from torchx.specs import Resource def gpu_x_1() -> Dict[str, Resource]: return Resource(cpu=2, memMB=64 * 1024, gpu = 2)
在 entrypoints 部分注册资源检索
[torchx.named_resources] gpu_x_1 = my_module.resources:gpu_x_1
可以使用
gpu_x_1
作为此函数的字符串参数from torchx.specs import named_resources resource = named_resources["gpu_x_1"]
AWS 命名资源¶
torchx.specs.named_resources_aws 包含资源定义,这些定义代表从 https://aws.amazon.com/ec2/instance-types/ 获取的相应 AWS 实例类型。在安装 torchx 库后,这些资源通过 entrypoints 公开。映射存储在 setup.py 文件中。
命名资源目前没有指定 AWS 实例类型功能,而只是代表内存、CPU 和 GPU 数量方面的等效资源。
注意
这些资源定义可能会在将来发生变化。预计每个用户都会管理自己的资源。请遵循 https://pytorch.ac.cn/torchx/latest/specs.html#torchx.specs.get_named_resources 来设置命名资源。
使用
from torchx.specs import named_resources print(named_resources["aws_t3.medium"]) print(named_resources["aws_m5.2xlarge"]) print(named_resources["aws_p3.2xlarge"]) print(named_resources["aws_p3.8xlarge"])
宏¶
- class torchx.specs.macros[source]¶
定义可以在
Role.args
值的元素或Role.env
中使用的宏。这些宏将在运行时被替换为它们实际的值。警告
除了上面提到的字段外,宏使用的
Role
字段,不会被替换。可用的宏
img_root
- 拉取的容器镜像的根目录。app_id
- 由调度器分配的应用程序 ID。replica_id
- 角色副本的每个实例的唯一 ID,例如,一个具有 3 个副本的角色可以具有 0、1、2 作为副本 ID。请注意,当容器发生故障并被替换时,新容器将具有与被替换容器相同的
replica_id
。例如,如果节点 1 发生故障并被调度器替换,则替换节点也将具有replica_id=1
。
示例
# runs: hello_world.py --app_id ${app_id} trainer = Role( name="trainer", entrypoint="hello_world.py", args=["--app_id", macros.app_id], env={"IMAGE_ROOT_DIR": macros.img_root}) app = AppDef("train_app", roles=[trainer]) app_handle = session.run(app, scheduler="local_docker", cfg={})
运行配置¶
- class torchx.specs.runopts[source]¶
包含调度器运行配置键、默认值(如果有)和帮助信息字符串。这些选项由
Scheduler
提供,并在Session.run
中针对用户提供的运行配置进行验证。允许None
默认值。必需的选项不能具有非 None 默认值。重要
此类没有访问器,因为它旨在由
Scheduler.run_config_options
构造并返回,并作为“帮助”工具或异常信息的一部分打印出来。使用
opts = runopts() opts.add("run_as_user", type_=str, help="user to run the job as") opts.add("cluster_id", type_=int, help="cluster to submit the job", required=True) opts.add("priority", type_=float, default=0.5, help="job priority") opts.add("preemptible", type_=bool, default=False, help="is the job preemptible") # invalid opts.add("illegal", default=10, required=True) opts.add("bad_type", type=str, default=10) opts.check(cfg) print(opts)
- add(cfg_key: str, type_: Type[Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]], help: str, default: Optional[Union[str, int, float, bool, List[str], Dict[str, str]]] = None, required: bool = False) None [source]¶
使用给定的帮助信息字符串和
default
值(如果有)添加config
选项。如果未指定default
,则此选项为必需选项。
- cfg_from_str(cfg_str: str) Dict[str, Optional[Union[str, int, float, bool, List[str], Dict[str, str]]]] [source]¶
从字符串字面量解析调度器
cfg
并返回一个 cfg 映射,其中 cfg 值已转换为此 runopts 对象指定的适当类型。 忽略未知键,并且不会在结果映射中返回。注意
与方法
resolve
不同,此方法不会解析默认选项,也不会检查给定的cfg_str
中是否实际存在必需的选项。 此方法旨在在调用resolve()
之前调用,此时输入为字符串编码的运行 cfg。 也就是说,要完全解析 cfg,请调用opt.resolve(opt.cfg_from_str(cfg_literal))
。如果
cfg_str
为空字符串,则返回一个空的cfg
。 否则,至少需要一个由"="
(等于)分隔的键值对。可以使用
","
(逗号)或";"
(分号)来分隔多个键值对。CfgVal
允许List
基本类型,可以将其作为","
或";"
(分号)分隔的传递。 由于使用相同的分隔符来分隔 cfg 键值对,因此此方法将最后一个(尾随)","
或";"
解释为键值对之间的分隔符。 请参见下面的示例。示例
opts = runopts() opts.add("FOO", type_=List[str], default=["a"], help="an optional list option") opts.add("BAR", type_=str, required=True, help="a required str option") # required and default options not checked # method returns strictly parsed cfg from the cfg literal string opts.cfg_from_str("") == {} # however, unknown options are ignored # since the value type is unknown hence cannot cast to the correct type opts.cfg_from_str("UNKNOWN=VALUE") == {} opts.cfg_from_str("FOO=v1") == {"FOO": "v1"} opts.cfg_from_str("FOO=v1,v2") == {"FOO": ["v1", "v2"]} opts.cfg_from_str("FOO=v1;v2") == {"FOO": ["v1", "v2"]} opts.cfg_from_str("FOO=v1,v2,BAR=v3") == {"FOO": ["v1", "v2"], "BAR": "v3"} opts.cfg_from_str("FOO=v1;v2,BAR=v3") == {"FOO": ["v1", "v2"], "BAR": "v3"} opts.cfg_from_str("FOO=v1;v2;BAR=v3") == {"FOO": ["v1", "v2"], "BAR": "v3"}
运行状态¶
- class torchx.specs.AppStatus(state: ~torchx.specs.api.AppState, num_restarts: int = 0, msg: str = '', structured_error_msg: str = '<NONE>', ui_url: ~typing.Optional[str] = None, roles: ~typing.List[~torchx.specs.api.RoleStatus] = <factory>)[source]¶
AppDef
的运行时状态。调度程序可以返回任意文本消息(msg 字段)。如果发生任何错误,调度程序可以使用 json 响应填充structured_error_msg
。replicas
代表作业中副本的状态。如果作业以多次重试运行,则该参数将包含最近一次重试的状态。注意:如果之前的重试失败,但最近一次重试成功或正在进行,replicas
将不会包含发生的错误。
- class torchx.specs.AppState(value)[source]¶
应用程序的状态。应用程序从初始的
UNSUBMITTED
状态开始,经过SUBMITTED
、PENDING
、RUNNING
状态,最终到达终端状态:SUCCEEDED
、``FAILED``、CANCELLED
。如果调度程序支持抢占,应用程序在抢占时会从
RUNNING
状态变为PENDING
状态。如果用户停止应用程序,则应用程序状态会变为
STOPPED
状态,然后在调度程序实际取消作业时变为CANCELLED
状态。UNSUBMITTED - 应用程序尚未提交给调度程序
SUBMITTED - 应用程序已成功提交给调度程序
PENDING - 应用程序已提交给调度程序,正在等待分配
RUNNING - 应用程序正在运行
SUCCEEDED - 应用程序已成功完成
FAILED - 应用程序已不成功完成
CANCELLED - 应用程序在完成之前被取消
UNKNOWN - 应用程序状态未知
挂载¶
- torchx.specs.parse_mounts(opts: List[str]) List[Union[BindMount, VolumeMount, DeviceMount]] [source]¶
parse_mounts 将选项列表解析为类型化的挂载,其格式类似于 Docker 的绑定挂载。
同一个列表中可以指定多个挂载点。 每个挂载点中必须首先指定
type
。- 例如
type=bind,src=/host,dst=/container,readonly,[type=bind,src=…,dst=…]
- 支持的类型
BindMount: type=bind,src=<主机路径>,dst=<容器路径>[,readonly] VolumeMount: type=volume,src=<名称/ID>,dst=<容器路径>[,readonly] DeviceMount: type=device,src=/dev/<设备>[,dst=<容器路径>][,perm=rwm]
- class torchx.specs.BindMount(src_path: str, dst_path: str, read_only: bool = False)[source]¶
将主机路径绑定到工作环境的绑定挂载点定义。 请参阅调度程序文档以了解每个调度程序的绑定挂载点操作方式。
- 参数:
src_path - 主机上的路径
dst_path - 工作环境/容器中的路径
read_only - 挂载点是否应该是只读的
组件 Linter¶
- torchx.specs.file_linter.validate(path: str, component_function: str) List[LinterMessage] [source]¶
验证函数,以确保其符合组件标准。
validate
找到component_function
并根据以下规则验证它。该函数必须具有 google 风格文档
所有函数参数都必须标注类型
该函数必须返回
torchx.specs.api.AppDef
- 参数:
path - Python 源文件的路径。
component_function - 要验证的函数的名称。
- 返回值:
验证错误列表
- 返回值类型:
List[LinterMessage]
- torchx.specs.file_linter.get_fn_docstring(fn: Callable[[...], object]) Tuple[str, Dict[str, str]] [source]¶
从提供的函数中解析函数和参数描述。 文档字符串应采用 google 风格格式
如果函数没有文档字符串,函数描述将是函数的名称,如何改进帮助消息和参数描述的提示将是参数的名称。
文档字符串中不存在的参数将包含默认/必填信息
- 参数:
fn - 有或没有文档字符串的函数
- 返回值:
- 函数描述、参数描述,其中键是参数的名称,值是
如果描述
- class torchx.specs.file_linter.LinterMessage(name: str, description: str, line: int, char: int, severity: str = 'error')[source]¶
- class torchx.specs.file_linter.TorchFunctionVisitor(component_function_name: str)[source]¶
查找 component_function 并对其运行注册的验证器的访问器。 当前注册的验证器
- TorchxFunctionArgsValidator - 验证函数的参数。
- 标准
每个参数都应该用类型标注
- 支持以下类型
primitive_types: {int, str, float},
Optional[primitive_types],
Dict[primitive_types, primitive_types],
List[primitive_types],
Optional[Dict[primitive_types, primitive_types]],
Optional[List[primitive_types]]
- class torchx.specs.file_linter.TorchXArgumentHelpFormatter(prog, indent_increment=2, max_help_position=24, width=None)[source]¶
帮助消息格式化程序,它将默认值和必需项添加到参数帮助。
如果参数是必需的,则类会在帮助消息末尾添加 (required)。如果参数具有默认值,则类会在末尾添加 (default: $DEFAULT)。该格式化程序旨在仅用于 torchx 组件函数。这些函数没有同时具有必需参数和默认参数。
- class torchx.specs.file_linter.TorchxFunctionArgsValidator[source]¶
- validate(app_specs_func_def: FunctionDef) List[LinterMessage] [source]¶
用于验证提供的函数定义的方法。