快捷方式

多进程

库,用于启动和管理 n 个工作进程副本,这些副本由函数或二进制文件指定。

对于函数,它使用 torch.multiprocessing(因此使用 python multiprocessing)来生成/派生工作进程。对于二进制文件,它使用 python subprocessing.Popen 来创建工作进程。

用法 1:启动两个训练器作为函数

from torch.distributed.elastic.multiprocessing import Std, start_processes

def trainer(a, b, c):
    pass # train


# runs two trainers
# LOCAL_RANK=0 trainer(1,2,3)
# LOCAL_RANK=1 trainer(4,5,6)
ctx = start_processes(
        name="trainer",
        entrypoint=trainer,
        args={0: (1,2,3), 1: (4,5,6)},
        envs={0: {"LOCAL_RANK": 0}, 1: {"LOCAL_RANK": 1}},
        log_dir="/tmp/foobar",
        redirects=Std.ALL, # write all worker stdout/stderr to a log file
        tee={0: Std.ERR}, # tee only local rank 0's stderr to console
      )

# waits for all copies of trainer to finish
ctx.wait()

用法 2:启动 2 个 echo 工作器作为二进制文件

# same as invoking
# echo hello
# echo world > stdout.log
ctx = start_processes(
        name="echo"
        entrypoint="echo",
        log_dir="/tmp/foobar",
        args={0: "hello", 1: "world"},
        redirects={1: Std.OUT},
       )

torch.multiprocessing 一样,函数 start_processes() 的返回值是进程上下文 (api.PContext)。如果启动了函数,则返回 api.MultiprocessContext;如果启动了二进制文件,则返回 api.SubprocessContext。两者都是父类 api.PContext 的特定实现。

启动多个工作器

torch.distributed.elastic.multiprocessing.start_processes(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None, start_method='spawn')[source]

使用提供的选项启动 nentrypoint 进程副本。

entrypointCallable(函数)或 str(二进制文件)。副本数量由 argsenvs 参数的条目数量决定,它们需要具有相同的键集。

argsenv 参数是传递给入口点的参数和环境变量,由副本索引(本地排名)映射。所有本地排名都必须被考虑在内。也就是说,键集应该是 {0,1,...,(nprocs-1)}

注意

entrypoint 是二进制文件 (str) 时,args 只能是字符串。如果给出任何其他类型,则将其转换为字符串表示形式(例如 str(arg1))。此外,只有在主函数使用 torch.distributed.elastic.multiprocessing.errors.record 进行注释时,二进制文件故障才会写入 error.json 错误文件。对于函数启动,这默认情况下会完成,无需手动使用 @record 注释进行注释。

redirectstee 是位掩码,用于指定将哪些 std 流重定向到 log_dir 中的日志文件。有效掩码值在 Std 中定义。要仅重定向/tee 某些本地排名,请将 redirects 作为映射传递,其中键是本地排名以指定其重定向行为。任何缺失的本地排名将默认为 Std.NONE

tee 的行为类似于 unix 的“tee”命令,它可以重定向并打印到控制台。为了避免 worker 的 stdout/stderr 打印到控制台,请使用 redirects 参数。

对于每个进程,log_dir 将包含

  1. {local_rank}/error.json: 如果进程失败,则包含错误信息的的文件

  2. {local_rank}/stdout.json: 如果 redirect & STDOUT == STDOUT

  3. {local_rank}/stderr.json: 如果 redirect & STDERR == STDERR

注意

预期 log_dir 存在,为空并且是一个目录。

示例

log_dir = "/tmp/test"

# ok; two copies of foo: foo("bar0"), foo("bar1")
start_processes(
   name="trainer",
   entrypoint=foo,
   args:{0:("bar0",), 1:("bar1",),
   envs:{0:{}, 1:{}},
   log_dir=log_dir
)

# invalid; envs missing for local rank 1
start_processes(
   name="trainer",
   entrypoint=foo,
   args:{0:("bar0",), 1:("bar1",),
   envs:{0:{}},
   log_dir=log_dir
)

# ok; two copies of /usr/bin/touch: touch file1, touch file2
start_processes(
   name="trainer",
   entrypoint="/usr/bin/touch",
   args:{0:("file1",), 1:("file2",),
   envs:{0:{}, 1:{}},
   log_dir=log_dir
 )

# caution; arguments casted to string, runs:
# echo "1" "2" "3" and echo "[1, 2, 3]"
start_processes(
   name="trainer",
   entrypoint="/usr/bin/echo",
   args:{0:(1,2,3), 1:([1,2,3],),
   envs:{0:{}, 1:{}},
   log_dir=log_dir
 )
参数
  • name (str) – 一个可读的简短名称,描述了进程是什么(用作 tee’ing stdout/stderr 输出时的标题)

  • entrypoint (Union[Callable, str]) – 一个 Callable(函数)或 cmd(二进制文件)

  • args (Dict[int, Tuple]) – 每个副本的参数

  • envs (Dict[int, Dict[str, str]]) – 每个副本的环境变量

  • log_dir – 用于写入日志文件的目录

  • start_method (str) – 多进程启动方法(spawn、fork、forkserver)对二进制文件忽略

  • redirects – 要重定向到日志文件的标准流

  • tee – 要重定向并打印到控制台的标准流

  • local_ranks_filter – 要打印到控制台的哪些 ranks 的日志

返回类型

PContext

进程上下文

class torch.distributed.elastic.multiprocessing.api.PContext(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None)[source]

基类,它规范化了对一组进程的操作,这些进程通过不同的机制启动。

名称 PContext 旨在与 torch.multiprocessing.ProcessContext 区分开。

警告

stdouts 和 stderrs 应该始终是 tee_stdouts 和 tee_stderrs(分别)的超集,这是因为 tee 是通过重定向 + tail -f <stdout/stderr.log> 实现的

class torch.distributed.elastic.multiprocessing.api.MultiprocessContext(name, entrypoint, args, envs, start_method, logs_specs, log_line_prefixes=None)[source]

PContext 持有作为函数调用的 worker 进程。

class torch.distributed.elastic.multiprocessing.api.SubprocessContext(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None)[source]

PContext 持有作为二进制文件调用的 worker 进程。

class torch.distributed.elastic.multiprocessing.api.RunProcsResult(return_values=<factory>, failures=<factory>, stdouts=<factory>, stderrs=<factory>)[source]

使用 start_processes() 启动的进程完成运行的结果。由 PContext 返回。

注意以下几点

  1. 所有字段都按本地排名映射

  2. return_values - 仅针对函数(而不是二进制文件)填充。

  3. stdouts - stdout.log 的路径(如果没有重定向,则为空字符串)

  4. stderrs - stderr.log 的路径(如果没有重定向,则为空字符串)

class torch.distributed.elastic.multiprocessing.api.DefaultLogsSpecs(log_dir=None, redirects=Std.NONE, tee=Std.NONE, local_ranks_filter=None)[source]

Default LogsSpecs 实现

  • log_dir 如果不存在,将被创建

  • 为每个尝试和排名生成嵌套文件夹。

reify(envs)[source]

使用以下方案构建日志目标路径

  • <log_dir>/<rdzv_run_id>/attempt_<attempt>/<rank>/stdout.log

  • <log_dir>/<rdzv_run_id>/attempt_<attempt>/<rank>/stderr.log

  • <log_dir>/<rdzv_run_id>/attempt_<attempt>/<rank>/error.json

返回类型

LogsDest

class torch.distributed.elastic.multiprocessing.api.LogsDest(stdouts=<factory>, stderrs=<factory>, tee_stdouts=<factory>, tee_stderrs=<factory>, error_files=<factory>)[source]

对于每种日志类型,都保存了本地排名 ID 到文件路径的映射。

class torch.distributed.elastic.multiprocessing.api.LogsSpecs(log_dir=None, redirects=Std.NONE, tee=Std.NONE, local_ranks_filter=None)[source]

为每个 worker 进程定义日志处理和重定向。

参数
  • log_dir (Optional[str]) – 日志将写入的基目录。

  • redirects (Union[Std, Dict[int, Std]]) – 要重定向到文件的流。传递单个 Std 枚举以对所有 worker 进行重定向,或者传递以本地排名为键的映射以选择性地进行重定向。

  • tee (Union[Std, Dict[int, Std]]) – 用于将流复制到标准输出/标准错误的流。传递单个 Std 枚举以复制所有工作进程的流,或传递一个以本地等级为键的映射以选择性地复制。

abstract reify(envs)[source]

根据环境变量,为每个本地等级构建日志文件的目标路径。

Envs 参数包含每个本地等级的环境变量字典,其中条目在: _start_workers() 中定义。

返回类型

LogsDest

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取针对初学者和高级开发人员的深入教程

查看教程

资源

查找开发资源并获得问题的解答

查看资源