快捷方式

本地

这包含 TorchX 本地调度程序,可用于通过子进程在本地运行 TorchX 组件。

class torchx.schedulers.local_scheduler.LocalScheduler(session_name: str, image_provider_class: Callable[[LocalOpts], ImageProvider], cache_size: int = 100, extra_paths: Optional[List[str]] = None)[source]

Bases: Scheduler[LocalOpts]

在本地主机上调度。容器被建模为进程,容器的某些属性(这些属性要么不相关,要么无法在本地主机运行时强制执行)将被忽略。被忽略的属性包括

  1. 资源需求

  2. 资源限制执行

  3. 重试策略

  4. 重试次数(不支持重试)

  5. 部署偏好

调度程序支持在收到 SIGTERM 或 SIGINT 后清理孤儿进程。调度程序将终止生成的进程。

这通过调度程序 local_cwd 公开。

  • local_cwd 相对于当前工作目录运行提供的应用程序,并忽略图像字段,以实现更快的迭代和测试目的。

注意

孤儿清理仅在 LocalScheduler 从主线程实例化时有效。

配置选项

    usage:
        [log_dir=LOG_DIR],[prepend_cwd=PREPEND_CWD],[auto_set_cuda_visible_devices=AUTO_SET_CUDA_VISIBLE_DEVICES]

    optional arguments:
        log_dir=LOG_DIR (str, None)
            dir to write stdout/stderr log files of replicas
        prepend_cwd=PREPEND_CWD (bool, False)
            if set, prepends CWD to replica's PATH env var making any binaries in CWD take precedence over those in PATH
        auto_set_cuda_visible_devices=AUTO_SET_CUDA_VISIBLE_DEVICES (bool, False)
            sets the `CUDA_AVAILABLE_DEVICES` for roles that request GPU resources. Each role replica will be assigned one GPU. Does nothing if the device count is less than replicas.

兼容性

注意

由于调度程序的差异,在本地运行的任务在使用其他调度程序时可能无法工作,因为网络或软件依赖项不同。

功能

调度程序支持

获取日志

✔️

分布式任务

LocalScheduler 支持多个副本,但所有副本都将在本地主机上执行。

取消任务

✔️

描述任务

✔️

工作空间/修补

部分支持。LocalScheduler 从本地目录运行应用程序,但不支持编程工作空间。

挂载

弹性

auto_set_CUDA_VISIBLE_DEVICES(role_params: Dict[str, List[ReplicaParam]], app: AppDef, cfg: LocalOpts) None[source]

如果运行选项 auto_set_cuda_visible_devices = True,则根据每个角色的资源规范中指定的 GPU 数量,将每个副本(节点)的 env var 设置为 CUDA_VISIBLE_DEVICES env var,覆盖角色 env 字段中任何现有的 CUDA_VISIBLE_DEVICES。要手动设置 CUDA_VISIBLE_DEVICES,请在调度程序 runcfg 中使用 auto_set_cuda_visible_devices = False 运行。

注意

如果主机的设备数量少于请求的 GPU 总数,则不会设置 CUDA_VISIBLE_DEVICES(即使 auto_set_cuda_visible_devices=True)。

注意

此方法要么在所有 gpu 角色上设置 CUDA_VISIBLE_DEVICES,要么不设置。

示例(所有示例都假设在具有 8 个 GPU 的主机上运行)

  1. Role(num_replicas=2, resource=Resource(gpus=2))
    1. 副本 0 的 CUDA_VISIBLE_DEVICES=0,1

    2. 副本 1 的 CUDA_VISIBLE_DEVICES=2,3

  2. Role(num_replicas=3, resource=Resource(gpus=4))
    1. 错误 - `` 3 * 4 = 12 >= 8``

  3. [Role(num_replicas=1, resource=Resource(gpus=2)), Role(num_replicas=3, resource=Resource(gpus=1))]
    1. 角色 0,副本 0 的 CUDA_VISIBLE_DEVICES=0,1

    2. 角色 1,副本 0 的 CUDA_VISIBLE_DEVICES=2

    3. 角色 1,副本 1 的 CUDA_VISIBLE_DEVICES=3

    4. 角色 1,副本 2 的 CUDA_VISIBLE_DEVICES=4

close() None[source]

仅适用于具有本地状态的调度器!关闭调度器,释放任何已分配的资源。关闭后,调度器对象将被视为不再有效,对该对象调用的任何方法都将导致未定义的行为。

此方法不应引发异常,并且允许在同一个对象上调用多次。

注意

仅为具有本地状态的调度器实现覆盖(torchx/schedulers/local_scheduler.py)。简单地包装远程调度器客户端的调度器不需要实现此方法。

describe(app_id: str) Optional[DescribeAppResponse][source]

描述指定的应用程序。

返回:

AppDef 描述或 None 如果应用程序不存在。

list() List[ListAppResponse][source]

对于在调度器上启动的应用程序,此 API 返回一个 ListAppResponse 对象列表,每个对象都包含应用程序 ID 及其状态。注意:此 API 处于原型阶段,可能会发生变化。

log_iter(app_id: str, role_name: str, k: int = 0, regex: Optional[str] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, should_tail: bool = False, streams: Optional[Stream] = None) Iterable[str][source]

返回 k``th replica of the ``role 的日志行的迭代器。当所有符合条件的日志行都被读取后,迭代器结束。

如果调度器支持基于时间的游标来获取自定义时间范围内的日志行,则 sinceuntil 字段将被遵守,否则将被忽略。不指定 sinceuntil 等同于获取所有可用的日志行。如果 until 为空,则迭代器表现得像 tail -f,在作业到达终端状态之前一直跟踪日志输出。

构成日志的精确定义是特定于调度器的。某些调度器可能将 stderr 或 stdout 视为日志,而其他调度器可能从日志文件读取日志。

行为和假设

  1. 如果对不存在的应用程序调用,则会产生未定义的行为。调用者应在调用此方法之前使用 exists(app_id) 检查应用程序是否存在。

  2. 没有状态,使用相同的参数调用此方法两次将返回一个新的迭代器。之前的迭代进度会丢失。

  3. 并不总是支持日志跟踪。并非所有调度器都支持实时日志迭代(例如,在应用程序运行时跟踪日志)。请参阅特定调度器的文档以了解迭代器的行为。

3.1 如果调度器支持日志跟踪,则应由

should_tail 参数控制。

  1. 不保证日志保留。当调用此方法时,底层调度器可能已经清除了此应用程序的日志记录。如果是这种情况,此方法将引发任意异常。

  2. 如果 should_tail 为 True,则该方法仅在访问的日志行完全耗尽且应用程序到达最终状态时才引发 StopIteration 异常。例如,如果应用程序卡住并且没有产生任何日志行,则迭代器将阻塞,直到应用程序最终被杀死(通过超时或手动),此时它将引发 StopIteration

    如果 should_tail 为 False,则该方法在没有更多日志时引发 StopIteration

  3. 并非所有调度器都需要支持。

  4. 某些调度器可能通过支持 __getitem__ 来支持行游标(例如,iter[50] 将跳转到第 50 行日志)。

  5. 保留空格,每个新行都应包含 \n。到

    支持交互式进度条,返回的行不需要包含 \n,但应在不换行的情况下打印,以正确处理 \r 回车符。

参数:

streams – 要选择的 IO 输出流。以下之一:combined、stdout、stderr。如果调度器不支持选定的流,它将抛出 ValueError。

返回:

指定角色副本的日志行的 Iterator

引发:

NotImplementedError – 如果调度器不支持日志迭代

schedule(dryrun_info: AppDryRunInfo[PopenRequest]) str[source]

submit 相同,只是它接受一个 AppDryRunInfo。建议实现者实现此方法,而不是直接实现 submit,因为 submit 可以通过

dryrun_info = self.submit_dryrun(app, cfg)
return schedule(dryrun_info)

镜像提供者

class torchx.schedulers.local_scheduler.ImageProvider[source]

管理在本地主机上下载和设置镜像。这仅适用于 LocalhostScheduler,因为通常真正的调度器会代表用户执行此操作。

abstract fetch(image: str) str[source]

拉取给定镜像并返回拉取的镜像在本地主机的路径,如果没有操作则返回空字符串。

fetch_role(role: Role) str[source]

fetch(image) 相同,它拉取角色的镜像并返回镜像根目录的路径,不同之处在于它允许此提供者更新角色。当需要在角色上设置额外的环境变量以符合镜像提供者在本地主机上获取和管理镜像的方式时,这很有用。默认情况下,此方法只是委托给 fetch(role.image)。如果需要,请覆盖。

get_cwd(image: str) Optional[str][source]

返回挂载的镜像目录的绝对路径。用作启动子进程的工作目录。

get_entrypoint(img_root: str, role: Role) str[source]

返回入口点的地址。

get_replica_param(img_root: str, role: Role, stdout: Optional[str] = None, stderr: Optional[str] = None, combined: Optional[str] = None) ReplicaParam[source]

给定角色副本的规范,返回 ReplicaParam 容器,它持有最终传递给 subprocess.Popen 的参数,以实际调用和运行每个角色的副本。预期 img_rootself.fetch(role.image) 的返回值。由于角色的镜像只需获取一次(而不是每个副本),因此预期调用者对每个角色调用 fetch 方法一次,并对每个 role.num_replicas 调用此方法。

class torchx.schedulers.local_scheduler.CWDImageProvider(cfg: LocalOpts)[source]

类似于 LocalDirectoryImageProvider,但它忽略镜像名称并使用当前工作目录作为镜像路径。

示例

  1. fetch(Image(name="/tmp/foobar")) 返回 os.getcwd()

  2. fetch(Image(name="foobar:latest")) 返回 os.getcwd()

fetch(image: str) str[source]

拉取给定镜像并返回拉取的镜像在本地主机的路径,如果没有操作则返回空字符串。

get_cwd(image: str) Optional[str][source]

返回挂载的镜像目录的绝对路径。用作启动子进程的工作目录。

get_entrypoint(img_root: str, role: Role) str[source]

返回入口点的地址。

class torchx.schedulers.local_scheduler.LocalDirectoryImageProvider(cfg: LocalOpts)[source]

将镜像名称解释为本地主机上的目录路径。不“获取”(例如下载)任何内容。与 LocalScheduler 结合使用以运行本地二进制文件。

镜像名称必须是绝对路径,并且必须存在。

示例

  1. fetch(Image(name="/tmp/foobar")) 返回 /tmp/foobar

  2. fetch(Image(name="foobar")) 抛出 ValueError

  3. fetch(Image(name="/tmp/dir/that/does/not_exist")) 抛出 ValueError

fetch(image: str) str[source]
引发:

ValueError – 如果镜像名称不是绝对路径,并且它不存在或不是目录

get_cwd(image: str) Optional[str][source]

返回绝对工作目录。用作子进程的工作目录。

get_entrypoint(img_root: str, role: Role) str[source]

返回角色入口点。当本地调度器使用 image_type=dir 执行时,子进程工作目录将设置为 img_root。如果 role.entrypoint 是相对路径,它将解析为 img_root/role.entrypoint,如果 role.entrypoint 是绝对路径,它将按提供的方式执行。

参考

torchx.schedulers.local_scheduler.create_scheduler(session_name: str, cache_size: int = 100, extra_paths: ~typing.Optional[~typing.List[str]] = None, image_provider_class: ~typing.Callable[[~torchx.schedulers.local_scheduler.LocalOpts], ~torchx.schedulers.local_scheduler.ImageProvider] = <class 'torchx.schedulers.local_scheduler.CWDImageProvider'>, **kwargs: ~typing.Any) LocalScheduler[source]
class torchx.schedulers.local_scheduler.LogIterator(app_id: str, log_file: str, scheduler: Scheduler, should_tail: bool = True)[source]
class torchx.schedulers.local_scheduler.PopenRequest(app_id: str, log_dir: str, role_params: Dict[str, List[ReplicaParam]], role_log_dirs: Dict[str, List[str]])[source]

保存创建每个应用程序角色副本的子进程的参数。

class torchx.schedulers.local_scheduler.ReplicaParam(args: List[str], env: Dict[str, str], stdout: Optional[str] = None, stderr: Optional[str] = None, combined: Optional[str] = None, cwd: Optional[str] = None)[source]

保存 LocalScheduler._popen() 的参数,每个角色副本使用。

class torchx.schedulers.local_scheduler.SignalException(msg: str, sigval: Signals)[source]

当 torchx 本地调度程序进程收到终止信号时,运行时会引发异常。

文档

访问 PyTorch 的全面开发人员文档

查看文档

教程

获取针对初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源