快捷键

多进程 ¶

库,用于启动和管理由函数或二进制文件指定的 n 个工作子进程。

对于函数,它使用 torch.multiprocessing (因此是 python multiprocessing )来创建/派生工作进程。对于二进制文件,它使用 python subprocessing.Popen 来创建工作进程。

使用方法 1:以函数形式启动两个训练器

from torch.distributed.elastic.multiprocessing import Std, start_processes


def trainer(a, b, c):
    pass  # train


# runs two trainers
# LOCAL_RANK=0 trainer(1,2,3)
# LOCAL_RANK=1 trainer(4,5,6)
ctx = start_processes(
    name="trainer",
    entrypoint=trainer,
    args={0: (1, 2, 3), 1: (4, 5, 6)},
    envs={0: {"LOCAL_RANK": 0}, 1: {"LOCAL_RANK": 1}},
    log_dir="/tmp/foobar",
    redirects=Std.ALL,  # write all worker stdout/stderr to a log file
    tee={0: Std.ERR},  # tee only local rank 0's stderr to console
)

# waits for all copies of trainer to finish
ctx.wait()

使用方法 2:以二进制形式启动 2 个回声工作进程

# same as invoking
# echo hello
# echo world > stdout.log
ctx = start_processes(
        name="echo"
        entrypoint="echo",
        log_dir="/tmp/foobar",
        args={0: "hello", 1: "world"},
        redirects={1: Std.OUT},
       )

torch.multiprocessing 类似,函数 start_processes() 的返回值是进程上下文( api.PContext )。如果启动了函数,则返回 api.MultiprocessContext ;如果启动了二进制,则返回 api.SubprocessContext 。这两个都是父类 api.PContext 的具体实现。

启动多个工作进程

torch.distributed.elastic.multiprocessing.start_processes(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None, start_method='spawn')[source][source]

使用提供的选项启动 nentrypoint 进程副本。

entrypoint 可以是 Callable (函数)或 str (二进制)。副本数量由 argsenvs 参数的条目数量决定,这些参数需要具有相同的键集。

argsenv 参数是将传递给由副本索引(本地排名)映射的入口点的参数和环境变量。所有本地排名都必须计入。也就是说,键集应该是 {0,1,...,(nprocs-1)}

注意

entrypoint 是二进制( str )时, args 只能为字符串。如果给出其他类型,则将其转换为字符串表示(例如 str(arg1) )。此外,只有当主函数被 torch.distributed.elastic.multiprocessing.errors.record 注解时,二进制失败才会写入 error.json 错误文件。对于函数调用,这是默认行为,无需手动使用 @record 注解。

redirectstee 是位掩码,用于指定将哪些 std 流重定向到 log_dir 的日志文件中。有效的掩码值在 Std 中定义。要仅重定向/tee 特定局部 rank,请传递一个包含键为局部 rank 的映射,以指定重定向行为。任何缺失的局部 rank 将默认为 Std.NONE

tee 类似于 Unix 中的“tee”命令,它将重定向并打印到控制台。为了避免工作进程的 stdout/stderr 打印到控制台,请使用 redirects 参数。

对于每个进程, log_dir 将包含:

  1. 如果进程失败,将生成一个包含错误信息的文件

  2. 如果 redirect & STDOUT == STDOUT

  3. 如果 redirect & STDERR == STDERR

注意

预期 log_dir 存在、为空且为目录

示例:

log_dir = "/tmp/test"

# ok; two copies of foo: foo("bar0"), foo("bar1")
start_processes(
   name="trainer",
   entrypoint=foo,
   args:{0:("bar0",), 1:("bar1",),
   envs:{0:{}, 1:{}},
   log_dir=log_dir
)

# invalid; envs missing for local rank 1
start_processes(
   name="trainer",
   entrypoint=foo,
   args:{0:("bar0",), 1:("bar1",),
   envs:{0:{}},
   log_dir=log_dir
)

# ok; two copies of /usr/bin/touch: touch file1, touch file2
start_processes(
   name="trainer",
   entrypoint="/usr/bin/touch",
   args:{0:("file1",), 1:("file2",),
   envs:{0:{}, 1:{}},
   log_dir=log_dir
 )

# caution; arguments casted to string, runs:
# echo "1" "2" "3" and echo "[1, 2, 3]"
start_processes(
   name="trainer",
   entrypoint="/usr/bin/echo",
   args:{0:(1,2,3), 1:([1,2,3],),
   envs:{0:{}, 1:{}},
   log_dir=log_dir
 )
参数:
  • 名称(str)- 用于描述进程的、人类可读的简短名称(在 tee’ing 标准输出/标准错误输出时用作标题)

  • 入口点(Union[Callable, str])- 要么是 Callable (函数),要么是 cmd (二进制文件)

  • 参数(dict[int, tuple])- 每个副本的参数

  • 环境变量(dict[int, dict[str, str]])- 每个副本的环境变量

  • 日志目录 – 用于写入日志文件的目录

  • start_method (str) – 多进程启动方法(spawn, fork, forkserver),对于二进制文件忽略

  • 重定向 – 哪些标准流需要重定向到日志文件

  • tee – 哪些标准流需要重定向并打印到控制台

  • 本地排名过滤器 – 打印到控制台哪些排名的日志

返回类型:

P 上下文

进程上下文 ¶

class torch.distributed.elastic.multiprocessing.api.PContext(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None)[source][source]

标准化通过不同机制启动的一组进程的操作的基类。

名称 PContext 是故意的,以区分 torch.multiprocessing.ProcessContext

警告

stdouts 和 stderrs 应该始终是 tee_stdouts 和 tee_stderrs(分别)的超集,这是因为 tee 是通过重定向+ tail -f 实现的。

class torch.distributed.elastic.multiprocessing.api.MultiprocessContext(name, entrypoint, args, envs, start_method, logs_specs, log_line_prefixes=None)[source][source]

以函数形式调用的工作进程。

class torch.distributed.elastic.multiprocessing.api.SubprocessContext(name, entrypoint, args, envs, logs_specs, log_line_prefixes=None)[source][source]

以二进制形式调用的工作进程。

class torch.distributed.elastic.multiprocessing.api.RunProcsResult(return_values=<factory>, failures=<factory>, stdouts=<factory>, stderrs=<factory>)[source][source]

过程启动于 start_processes() 的运行结果。由 PContext 返回。

注意以下内容:

  1. 所有字段均由本地排名映射。

  2. return_values - 仅对函数有效(不是二进制文件)。

  3. stdouts - 输出到 stdout.log 的路径(如果没有重定向则为空字符串)

  4. stderrs - 输出到 stderr.log 的路径(如果没有重定向则为空字符串)

class torch.distributed.elastic.multiprocessing.api.DefaultLogsSpecs(log_dir=None, redirects=Std.NONE, tee=Std.NONE, local_ranks_filter=None)[source][source]

Default LogsSpecs 实现:

  • 如果不存在,将创建日志目录

  • 为每个尝试和排名生成嵌套文件夹。

实现 envs()[source][source] ¶

使用以下方案构建日志目标路径:

  • <日志目录>//attempt_//stdout.log

  • <日志目录>//attempt_//stderr.log

  • <日志目录>//attempt_//error.json

返回类型:

日志目标

class torch.distributed.elastic.multiprocessing.api.LogsDest(stdouts=<factory>, stderrs=<factory>, tee_stdouts=<factory>, tee_stderrs=<factory>, error_files=<factory>)[source][source]

对于每种日志类型,持有本地 rank ID 到文件路径的映射。

class torch.distributed.elastic.multiprocessing.api.LogsSpecs(log_dir=None, redirects=Std.NONE, tee=Std.NONE, local_ranks_filter=None)[source][source]

定义每个工作进程的日志处理和重定向。

参数:
  • log_dir(可选[str])- 存储日志的基本目录。

  • redirects(Union[Std, dict[int, torch.distributed.elastic.multiprocessing.api.Std])- 要重定向到文件的流。传递单个 Std 枚举以重定向所有工作进程,或按 local_rank 键的映射以选择性重定向。

  • tee(Union[Std, dict[int, torch.distributed.elastic.multiprocessing.api.Std])- 要复制到 stdout/stderr 的流。传递单个 Std 枚举以复制所有工作进程的流,或按 local_rank 键的映射以选择性复制。

抽象实现 reify(envs)[source][source]

根据环境变量,为每个本地进程构建日志文件的存储位置。

Envs 参数包含每个本地进程的环境变量字典,其中条目定义在: _start_workers()

返回类型:

日志存储位置


© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,主题由 Read the Docs 提供。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源并获得您的疑问解答

查看资源