本地¶
这包含 TorchX 本地调度程序,可用于通过子进程在本地运行 TorchX 组件。
- class torchx.schedulers.local_scheduler.LocalScheduler(session_name: str, image_provider_class: Callable[[LocalOpts], ImageProvider], cache_size: int = 100, extra_paths: Optional[List[str]] = None)[source]¶
Bases:
Scheduler
[LocalOpts
]在本地主机上调度。容器被建模为进程,容器的某些属性(这些属性要么不相关,要么无法在本地主机运行时强制执行)将被忽略。被忽略的属性包括
资源需求
资源限制执行
重试策略
重试次数(不支持重试)
部署偏好
调度程序支持在收到 SIGTERM 或 SIGINT 后清理孤儿进程。调度程序将终止生成的进程。
这通过调度程序 local_cwd 公开。
local_cwd 相对于当前工作目录运行提供的应用程序,并忽略图像字段,以实现更快的迭代和测试目的。
注意
孤儿清理仅在 LocalScheduler 从主线程实例化时有效。
配置选项
usage: [log_dir=LOG_DIR],[prepend_cwd=PREPEND_CWD],[auto_set_cuda_visible_devices=AUTO_SET_CUDA_VISIBLE_DEVICES] optional arguments: log_dir=LOG_DIR (str, None) dir to write stdout/stderr log files of replicas prepend_cwd=PREPEND_CWD (bool, False) if set, prepends CWD to replica's PATH env var making any binaries in CWD take precedence over those in PATH auto_set_cuda_visible_devices=AUTO_SET_CUDA_VISIBLE_DEVICES (bool, False) sets the `CUDA_AVAILABLE_DEVICES` for roles that request GPU resources. Each role replica will be assigned one GPU. Does nothing if the device count is less than replicas.
兼容性
注意
由于调度程序的差异,在本地运行的任务在使用其他调度程序时可能无法工作,因为网络或软件依赖项不同。
功能
调度程序支持
获取日志
✔️
分布式任务
LocalScheduler 支持多个副本,但所有副本都将在本地主机上执行。
取消任务
✔️
描述任务
✔️
工作空间/修补
部分支持。LocalScheduler 从本地目录运行应用程序,但不支持编程工作空间。
挂载
❌
弹性
❌
- auto_set_CUDA_VISIBLE_DEVICES(role_params: Dict[str, List[ReplicaParam]], app: AppDef, cfg: LocalOpts) None [source]¶
如果运行选项
auto_set_cuda_visible_devices = True
,则根据每个角色的资源规范中指定的 GPU 数量,将每个副本(节点)的 env var 设置为CUDA_VISIBLE_DEVICES
env var,覆盖角色env
字段中任何现有的CUDA_VISIBLE_DEVICES
。要手动设置CUDA_VISIBLE_DEVICES
,请在调度程序 runcfg 中使用auto_set_cuda_visible_devices = False
运行。注意
如果主机的设备数量少于请求的 GPU 总数,则不会设置
CUDA_VISIBLE_DEVICES
(即使auto_set_cuda_visible_devices=True
)。注意
此方法要么在所有 gpu 角色上设置
CUDA_VISIBLE_DEVICES
,要么不设置。示例(所有示例都假设在具有 8 个 GPU 的主机上运行)
Role(num_replicas=2, resource=Resource(gpus=2))
副本 0 的
CUDA_VISIBLE_DEVICES=0,1
副本 1 的
CUDA_VISIBLE_DEVICES=2,3
Role(num_replicas=3, resource=Resource(gpus=4))
错误 - `` 3 * 4 = 12 >= 8``
[Role(num_replicas=1, resource=Resource(gpus=2)), Role(num_replicas=3, resource=Resource(gpus=1))]
角色 0,副本 0 的
CUDA_VISIBLE_DEVICES=0,1
角色 1,副本 0 的
CUDA_VISIBLE_DEVICES=2
角色 1,副本 1 的
CUDA_VISIBLE_DEVICES=3
角色 1,副本 2 的
CUDA_VISIBLE_DEVICES=4
- close() None [source]¶
仅适用于具有本地状态的调度器!关闭调度器,释放任何已分配的资源。关闭后,调度器对象将被视为不再有效,对该对象调用的任何方法都将导致未定义的行为。
此方法不应引发异常,并且允许在同一个对象上调用多次。
注意
仅为具有本地状态的调度器实现覆盖(
torchx/schedulers/local_scheduler.py
)。简单地包装远程调度器客户端的调度器不需要实现此方法。
- describe(app_id: str) Optional[DescribeAppResponse] [source]¶
描述指定的应用程序。
- 返回:
AppDef 描述或
None
如果应用程序不存在。
- list() List[ListAppResponse] [source]¶
对于在调度器上启动的应用程序,此 API 返回一个 ListAppResponse 对象列表,每个对象都包含应用程序 ID 及其状态。注意:此 API 处于原型阶段,可能会发生变化。
- log_iter(app_id: str, role_name: str, k: int = 0, regex: Optional[str] = None, since: Optional[datetime] = None, until: Optional[datetime] = None, should_tail: bool = False, streams: Optional[Stream] = None) Iterable[str] [source]¶
返回
k``th replica of the ``role
的日志行的迭代器。当所有符合条件的日志行都被读取后,迭代器结束。如果调度器支持基于时间的游标来获取自定义时间范围内的日志行,则
since
、until
字段将被遵守,否则将被忽略。不指定since
和until
等同于获取所有可用的日志行。如果until
为空,则迭代器表现得像tail -f
,在作业到达终端状态之前一直跟踪日志输出。构成日志的精确定义是特定于调度器的。某些调度器可能将 stderr 或 stdout 视为日志,而其他调度器可能从日志文件读取日志。
行为和假设
如果对不存在的应用程序调用,则会产生未定义的行为。调用者应在调用此方法之前使用
exists(app_id)
检查应用程序是否存在。没有状态,使用相同的参数调用此方法两次将返回一个新的迭代器。之前的迭代进度会丢失。
并不总是支持日志跟踪。并非所有调度器都支持实时日志迭代(例如,在应用程序运行时跟踪日志)。请参阅特定调度器的文档以了解迭代器的行为。
- 3.1 如果调度器支持日志跟踪,则应由
should_tail
参数控制。
不保证日志保留。当调用此方法时,底层调度器可能已经清除了此应用程序的日志记录。如果是这种情况,此方法将引发任意异常。
如果
should_tail
为 True,则该方法仅在访问的日志行完全耗尽且应用程序到达最终状态时才引发StopIteration
异常。例如,如果应用程序卡住并且没有产生任何日志行,则迭代器将阻塞,直到应用程序最终被杀死(通过超时或手动),此时它将引发StopIteration
。如果
should_tail
为 False,则该方法在没有更多日志时引发StopIteration
。并非所有调度器都需要支持。
某些调度器可能通过支持
__getitem__
来支持行游标(例如,iter[50]
将跳转到第 50 行日志)。- 保留空格,每个新行都应包含
\n
。到 支持交互式进度条,返回的行不需要包含
\n
,但应在不换行的情况下打印,以正确处理\r
回车符。
- 保留空格,每个新行都应包含
- 参数:
streams – 要选择的 IO 输出流。以下之一:combined、stdout、stderr。如果调度器不支持选定的流,它将抛出 ValueError。
- 返回:
指定角色副本的日志行的
Iterator
- 引发:
NotImplementedError – 如果调度器不支持日志迭代
- schedule(dryrun_info: AppDryRunInfo[PopenRequest]) str [source]¶
与
submit
相同,只是它接受一个AppDryRunInfo
。建议实现者实现此方法,而不是直接实现submit
,因为submit
可以通过dryrun_info = self.submit_dryrun(app, cfg) return schedule(dryrun_info)
镜像提供者¶
- class torchx.schedulers.local_scheduler.ImageProvider[source]¶
管理在本地主机上下载和设置镜像。这仅适用于
LocalhostScheduler
,因为通常真正的调度器会代表用户执行此操作。- fetch_role(role: Role) str [source]¶
与
fetch(image)
相同,它拉取角色的镜像并返回镜像根目录的路径,不同之处在于它允许此提供者更新角色。当需要在角色上设置额外的环境变量以符合镜像提供者在本地主机上获取和管理镜像的方式时,这很有用。默认情况下,此方法只是委托给fetch(role.image)
。如果需要,请覆盖。
- get_replica_param(img_root: str, role: Role, stdout: Optional[str] = None, stderr: Optional[str] = None, combined: Optional[str] = None) ReplicaParam [source]¶
给定角色副本的规范,返回
ReplicaParam
容器,它持有最终传递给subprocess.Popen
的参数,以实际调用和运行每个角色的副本。预期img_root
是self.fetch(role.image)
的返回值。由于角色的镜像只需获取一次(而不是每个副本),因此预期调用者对每个角色调用fetch
方法一次,并对每个role.num_replicas
调用此方法。
- class torchx.schedulers.local_scheduler.CWDImageProvider(cfg: LocalOpts)[source]¶
类似于 LocalDirectoryImageProvider,但它忽略镜像名称并使用当前工作目录作为镜像路径。
示例
fetch(Image(name="/tmp/foobar"))
返回 os.getcwd()fetch(Image(name="foobar:latest"))
返回 os.getcwd()
- class torchx.schedulers.local_scheduler.LocalDirectoryImageProvider(cfg: LocalOpts)[source]¶
将镜像名称解释为本地主机上的目录路径。不“获取”(例如下载)任何内容。与
LocalScheduler
结合使用以运行本地二进制文件。镜像名称必须是绝对路径,并且必须存在。
示例
fetch(Image(name="/tmp/foobar"))
返回/tmp/foobar
fetch(Image(name="foobar"))
抛出ValueError
fetch(Image(name="/tmp/dir/that/does/not_exist"))
抛出ValueError
- fetch(image: str) str [source]¶
- 引发:
ValueError – 如果镜像名称不是绝对路径,并且它不存在或不是目录
参考¶
- torchx.schedulers.local_scheduler.create_scheduler(session_name: str, cache_size: int = 100, extra_paths: ~typing.Optional[~typing.List[str]] = None, image_provider_class: ~typing.Callable[[~torchx.schedulers.local_scheduler.LocalOpts], ~torchx.schedulers.local_scheduler.ImageProvider] = <class 'torchx.schedulers.local_scheduler.CWDImageProvider'>, **kwargs: ~typing.Any) LocalScheduler [source]¶
- class torchx.schedulers.local_scheduler.LogIterator(app_id: str, log_file: str, scheduler: Scheduler, should_tail: bool = True)[source]¶
- class torchx.schedulers.local_scheduler.PopenRequest(app_id: str, log_dir: str, role_params: Dict[str, List[ReplicaParam]], role_log_dirs: Dict[str, List[str]])[source]¶
保存创建每个应用程序角色副本的子进程的参数。