快捷键

torch.distributed.elastic.multiprocessing.api 源代码

#!/usr/bin/env python3
# mypy: 允许未类型化定义

版权所有(c)Facebook,Inc.及其关联公司
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。

导入 abc
导入 记录日志
导入 操作系统
导入 正则表达式
导入 shutil
导入 信号
导入 subprocess
导入 系统
导入 tempfile
导入 线程
导入 时间
来自 abc 导入 ABC, 抽象方法
来自 contextlib 导入 空上下文
来自 dataclasses 导入 数据类, 字段
来自 枚举 导入 IntFlag
来自 多进程 导入 同步
来自 类型 导入 帧类型
来自 打字 导入 任意, 可调用, 可选, 联合

导入 torch.multiprocessing 作为 mp
来自 torch.distributed.elastic.multiprocessing.errors 导入 进程失败, 记录
来自 torch.distributed.elastic.multiprocessing.redirects 导入 (
    重定向标准错误,
    重定向标准输出,
)
来自 torch.distributed.elastic.multiprocessing.subprocess_handler 导入 (
    获取子进程处理器,
    子进程处理器,
)
来自 torch.distributed.elastic.multiprocessing.尾部日志 导入 尾部日志


是否为 Windows = 系统.平台 == win32
IS_MACOS = 系统.平台 == "darwin"


日志记录器 = 记录日志.获取日志记录器(__name__)

__all__ = [
    "默认日志规范",
    "信号异常",
    "标准",
    to_map,
    "运行进程结果",
    "P 上下文",
    获取标准厘米,
    "多进程上下文",
    "子进程上下文",
    "日志目标",
    "日志规范",
]


 信号异常(异常):
    ""
在 torchelastic 代理进程的终止处理程序内部引发异常
如果收到了进程的死亡信号。
```python
# 假设输入文本为:
input_text = '"""'

# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
    # 这里应该调用真实的翻译 API 进行翻译
    # 由于示例中不使用真实的 API,以下为模拟翻译结果
    return text

# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```

    def 初始化(自身, 信息: 字符串, sigval: 信号.信号) -> :
        超级().初始化(信息)
        自身.sigval = sigval


def 终止进程处理器(信号值: int, frame: 可选[框架类型]) -> :
    """终止处理器,当主进程接收到终止信号(SIGTERM, SIGINT)时,将引发异常。

当进程收到终止信号(SIGTERM, SIGINT)时,
被调用。它引发 `SignalException` 异常,该异常应由用户代码处理。
Python 在终止处理程序完成后不会终止进程,
因此,异常不应被无声地忽略,否则进程将永远不会终止。

```python
# 假设输入文本为:
input_text = '"""'

# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
    # 这里应该调用真实的翻译 API 进行翻译
    # 由于示例中不使用真实的 API,以下为模拟翻译结果
    return text

# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
    sigval = 信号.信号(signum)
    提升 信号异常(f"进程"{操作系统.获取进程 ID()}收到信号:{sigval}", sigval=sigval)


def _get_kill_signal() -> 信号.信号:
    获取终止信号。Unix 系统使用 SIGKILL,Windows 系统使用 CTRL_C_EVENT。
    如果 IS_WINDOWS:
        返回 信号.CTRL_C_EVENT  # type: ignore[attr-defined] # noqa: F821
    否则:
        返回 信号.SIGKILL


def 获取默认信号() -> 信号.信号:
    获取默认终止信号。Unix 系统为 SIGTERM,Windows 系统为 CTRL_C_EVENT。
    如果 IS_WINDOWS:
        返回 信号.CTRL_C_EVENT  # type: ignore[attr-defined] # noqa: F821
    否则:
        返回 信号.SIGTERM


def _validate_full_rank(d: 字典[int, 任意] 进程数: int, 什么: 字符串):
    实际键 = 集合(d.())
    预期键 = 集合(范围(进程数))

    如果 实际键 != 预期键:
        提升 运行时错误(
            f"{什么},本地排名映射不匹配,
            f预期:{预期键},实际:{实际键}"
        )


_MAPPING_REGEX = r"^(\d:[0123],)*(\d:[0123])$"
_值正则表达式 = r"^[0123]$"


 Std(整数标志):
     = 0
    输出 = 1
    错误 = 2
    所有 = 输出 | 错误

    @classmethod
    def from_str(, 虚拟机: 字符串) -> 联盟["标准", 字典[int, "标准"]]
        ""
示例:
        ::

         from_str("0") -> Std.NONE
         from_str("1") -> Std.OUT
         from_str("0:3,1:0,2:1,3:2") -> {0: Std.ALL, 1: Std.NONE, 2: Std.OUT, 3: Std.ERR}

任何其他输入都会引发异常
```python
# 假设输入文本为:
input_text = '"""'

# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
    # 这里应该调用真实的翻译 API 进行翻译
    # 由于示例中不使用真实的 API,以下为模拟翻译结果
    return text

# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```

        def to_std(v: 字符串) -> Std:  # type: ignore[return]
            s = Std(int(v))
            如果 s  Std:
                返回 s
            # 返回 None -> 由于我们进行了正则表达式检查输入,因此永远不会到达这里

        如果 正则表达式.匹配(值正则表达式, 虚拟机):  # vm 是一个数字(例如 0)
            返回 标准化(虚拟机)
        elif 正则表达式.匹配(_映射正则表达式, 虚拟机):  # vm 是一个映射(例如 0:1,1:2)
            d: 字典[int, Std] = {}
             m  vm.分割(","):
                i, v = m.分割(“:”)
                d[int(i] = 转换为标准格式(v)
            返回 d
        否则:
            提升 ValueError(
                f"{vm}不匹配:<{_值正则表达式}> 或 <{_映射正则表达式}>
            )


def 转换为映射(
    val_or_map: 联盟[Std, 字典[int, Std]], 本地世界大小: 整型
) -> 字典[int, Std]:
    ""
某些 API 将重定向设置作为单个值(例如,应用于所有
本地排名)或作为显式提供的映射。此方法是一种便利
将值或映射转换为映射的方法。

示例:
    ::

to_map(Std.OUT, local_world_size=2)  # 返回: {0: Std.OUT, 1: Std.OUT}
to_map({1: Std.OUT}, local_world_size=2)  # 返回: {0: Std.NONE, 1: Std.OUT}
     to_map(
{0: 标准输出, 1: 标准输出}, 本地世界大小=2
)  # 返回: {0: 标准输出, 1: 标准输出}
```python
# 假设输入文本为:
input_text = '"""'

# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
    # 这里应该调用真实的翻译 API 进行翻译
    # 由于示例中不使用真实的 API,以下为模拟翻译结果
    return text

# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
    如果 isinstance(val_or_map, Std):
        返回 字典.fromkeys(范围(本地世界大小), 值或映射)
    否则:
        映射 = {}
         i  范围(本地世界大小):
            地图[i] = 值或映射.获取(i, Std.)
        返回 地图


[文档]@数据类 类 LogsDest: """ 每种日志类型都包含本地排名 ID 到文件路径的映射。 "" stdouts: dict[int, str] = field(default_factory=dict) stderrs: dict[int, str] = field(default_factory=dict) tee_stdouts: dict[int, str] = field(default_factory=dict) tee_stderrs: dict[int, str] = field(default_factory=dict) error_files: dict[int, str] = field(default_factory=dict)
[文档]类 LogsSpecs(ABC): """ 定义每个工作进程的日志处理和重定向。 Args: 日志目录: 基础目录,日志将写入其中。 redirects: 流向要重定向到文件的流。传递单个 ``Std`` 枚举以重定向所有工作进程,或一个以键为 通过 local_rank 有选择性地重定向。 tee: 将流复制到 stdout/stderr。 将单个“Std”枚举传递给所有工作进程以复制流, 或按 local_rank 键进行选择性地复制。 """ def __init__(self, self, log_dir: 可选[str] = None, redirects: 联合[Std, dict[int, Std]] = Std.NONE, tee: 联合[Std, dict[int, Std]] = Std.NONE, local_ranks_filter: 可选[set[int]] = None, None self._root_log_dir = log_dir self._redirects = redirects self._tee = tee self._local_ranks_filter = local_ranks_filter
[文档] @abstractmethod def reify( self, envs: dict[int, dict[str, str]], ) -> LogsDest: """ 基于环境变量,为每个本地进程构建日志文件的目标位置。 Envs 参数包含每个本地 rank 的环境变量字典,其中条目定义在: func:`~torchelastic.distributed.elastic.agent.server.local_elastic_agent.LocalElasticAgent._start_workers`. """
@property @abstractmethod def root_log_dir(self) -> str: pass
[文档] 默认日志规范(日志规范): "" 默认日志规范实现: - `log_dir` 将被创建,如果不存在的话 - 为每个尝试和排名生成嵌套文件夹。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` def 初始化( 自身, 日志目录: 可选[字符串] = , 重定向: 联盟[Std, 字典[int, Std]] = Std., tee: 联盟[Std, 字典[int, Std]] = Std., 本地排名过滤器: 可选[集合[int]] = , ) -> : 如果 # 日志目录 != 操作系统.devnull: 如果 not 日志目录: # 日志目录 = 临时文件.mkdtemp(前缀=torchelastic_) elif not 操作系统.路径.存在(日志目录): 操作系统.创建多级目录(日志目录, exist_ok=True) 否则: 如果 操作系统.路径.判断是否为文件(日志目录): 提升 目录不存在错误(flog_dir:{日志目录}是一个文件) 超级().初始化(日志目录, 重定向, tee, 本地排名过滤器) 仅初始化一次 自身._运行日志目录 = None @property def 根日志目录(自身) -> 字符串: 返回 字符串(自身._根日志目录) def _创建日志目录(自身, 日志目录: 可选[字符串] rdzv 运行 ID: 字符串): 基础日志目录 = # 日志目录 临时文件.mkdtemp(前缀=torchelastic_) 操作系统.创建多级目录(基础日志目录, exist_ok=True) 目录 = 临时文件.mkdtemp(前缀=f"{rdzv 运行 ID}_, 目录=基础日志目录) 记录器.信息(日志目录设置为:%s", 目录) 返回 目录
[文档] def 实化( 自身, 环境变量: 字典[int, 字典[字符串, 字符串]], ) -> 日志存储位置: "" 使用以下方案构建日志目标路径: - `<log_dir>/<rdzv_run_id>/attempt_<attempt>/<rank>/stdout.log` - `<log_dir>/<rdzv_run_id>/attempt_<attempt>/<rank>/stderr.log` - `<log_dir>/<rdzv_run_id>/attempt_<attempt>/<rank>/error.json` ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 进程数 = 长度(环境变量) global_env = {} 仅用于查询不依赖于排名的属性 如果 进程数 > 0: global_env = 环境变量[0] 否则: 记录器.警告( "在定义日志目标时提供了空的 envs 映射。" ) 在单元测试中,键总是定义的,但值可能缺失 运行 ID = 全局环境.获取(TORCHELASTIC 运行 ID, 测试运行 ID) 重启次数 = 全局环境.获取("TORCHELASTIC_RESTART_COUNT", "0") 尝试日志目录: 字符串 = 请提供需要翻译的文本 如果 自身.根日志目录 != 操作系统.设备空: 如果 not 自身.运行日志目录: 自身.运行日志目录 = 自身.创建日志目录(自身.根日志目录, run_id) 尝试日志目录 = 操作系统.路径.加入( 自身.运行日志目录, f尝试_{重启次数}" ) # type: ignore[call-overload] shutil.rmtree(尝试日志目录, 忽略错误=True) 操作系统.创建多级目录(尝试日志目录) 如果 自身.根日志目录 == 操作系统./dev/null: 尝试日志目录 = 操作系统.devnull # 为每个本地 rank 在 logs_dir 中创建子目录 # logs_dir # |- 0 # |- 错误.json # |- 标准输出.log # |- 标准错误.log # |- ... (nprocs-1) 重定向 = 转换为映射(自身._重定向, 进程数) 简体中文 = 转换为映射(自身._tee, 进程数) # 首先将标准输出和标准错误重定向到文件中 # 然后使用 tail -f 命令查看 stdout.log/stderr.log 文件,因此将 tee 设置添加到重定向中 local_rank, tee 标准 ts.项目(): 重定向标准 = 重定向[本地排名] 重定向[本地排名] = 重定向标准 | 输出标准 SYS_STREAM = 请提供需要翻译的文本 # 特殊情况,表示输出到控制台 标准输出 = 字典.fromkeys(范围(进程数), SYS_STREAM) 标准错误 = 字典.fromkeys(范围(进程数), 系统流) 合并标准输出: 字典[int, 字符串] = {} 合并标准错误: 字典[int, 字符串] = {} 错误文件 = {} 本地排名 范围(进程数): 如果 尝试日志目录 == 操作系统./dev/null: tee 标准输出[本地排名] = 操作系统.设备空 输出标准错误[本地排名] = 操作系统.空设备 错误文件[本地排名] = 操作系统.空设备 环境变量[本地排名] ["TORCHELASTIC_ERROR_FILE"] = 请提供需要翻译的文本 否则: clogdir = 操作系统.路径.加入(attempt_log_dir, 字符串(local_rank)) 操作系统.建立目录(clogdir) rd = redirs[local_rank] 如果 (rd & Std.OUT) == Std.OUT: stdouts[本地排名] = 操作系统.路径.加入(日志目录, "标准输出.log") 如果 (读入 & Std.错误) == Std.错误: 标准错误[本地排名] = 操作系统.路径.加入(clogdir, "stderr.log") t = ts[local_rank] 如果 t & Std.OUT == Std.输出: 标准输出[本地排名] = 输出[本地排名] 如果 t & Std.错误 == Std.错误: tee 标准错误输出[本地排名] = 标准错误[本地排名] 如果 ( 自身._本地排名过滤器 本地排名 not 自身._本地排名过滤器 ): # 如果流被 tee,则只写入文件,但不要跟踪 如果 本地排名 tee_stdouts: tee_stdouts.弹出(local_rank, ) 如果 local_rank tee_stderrs: tee_stderrs.弹出(local_rank, ) # 如果没有重定向流,则不要打印 如果 标准输出[本地排名] == 系统流: 标准输出[本地排名] = 操作系统.设备空 如果 标准错误[本地排名] == 系统流: 标准错误[本地排名] = 操作系统.设备空 错误文件 = 操作系统.路径.加入(日志目录, "错误.json") 错误文件[本地排名] = 错误文件 记录器.信息( 设置工作%s回复文件到:%s", 本地 rank, 错误文件 ) 环境变量[本地排名] ["TORCHELASTIC_ERROR_FILE"] = 错误文件 返回 日志存储位置(标准输出, 标准错误, 合并标准输出, 合并标准错误, 错误文件)
def __repr__(自身) -> 字符串: 返回 ( f"默认日志规范(root_log_dir=){自身._root_log_dir}, 重定向={自身._redirects}," ftee={自身._tee}, local_ranks_filter={自身._local_ranks_filter})" ) def __等于__(自身, 其他: 对象) -> 布尔: 如果 not isinstance(其他, DefaultLogsSpecs): 返回 返回 ( 自身.根日志目录 == 其他.根日志目录 自身.重定向 == 其他.重定向 自身._tee == 其他._tee 自身._local_ranks_filter == 其他._local_ranks_filter )
[文档]@dataclass class RunProcsResult: """ 使用 `start_processes()` 启动的进程完成运行的“结果。由 `PContext` 返回。 注意以下内容: 1. 所有字段均按本地排名映射 2. ``return_values`` - 仅在函数中填充(不是二进制文件)。 3. ``stdouts`` - stdout.log 的路径(如果没有重定向则为空字符串) stderrs - stderr.log 的路径(如果没有重定向则为空字符串) "" return_values: dict[int, Any] = field(default_factory=dict) failures: dict[int, ProcessFailure] = field(default_factory=dict) stdouts: dict[int, str] = field(default_factory=dict) stderrs: dict[int, str] = field(default_factory=dict) def is_failed(self) -> bool: return len(self.failures) > 0
[文档] P 上下文(abc.ABC): "" 基类,用于标准化通过不同机制启动的一组进程的操作。 PContext 这个名字是故意用来与 torch.multiprocessing.ProcessContext 区分的。 ..警告:标准输出和标准错误始终应该是它们的超集 tee_stdouts 和 tee_stderrs(分别)这是 b/c tee 实现为一个重定向 + tail -f ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` def 初始化( 自身, 名称: 字符串, 入口: 联盟[可调用, 字符串] 参数: 字典[int, 元组] 环境变量: 字典[int, 字典[字符串, 字符串]], 日志规范: 日志规范类, 日志行前缀: 可选[字典[int, 字符串]] = , ): 自身.名称 = 名称 # 验证所有映射具有相同数量的键 # 所有本地 rank 都已计入 进程数 = 长度(参数) # TODO log_line_prefixes 也可以扩展 日志目标 = 日志规范.实化(环境变量) _validate_full_rank(日志目标.标准输出, 进程数, 标准输出) _validate_full_rank(日志目标.标准错误, 进程数, 标准错误) 自身.入口 = 入口 自身.args = args 自身.环境 = 环境 自身.标准输出 = 日志目标.标准输出 自身.标准错误 = 日志目标.标准错误 自身.错误文件 = 日志目标.错误文件 自身.进程数 = 进程数 自身._标准输出尾随 = 尾随日志( 名称, 日志目标.标准输出, 系统.标准输出, 日志行前缀 ) 自身._stderr 尾 = 尾日志( 名称, 日志目标.输出标准错误, 系统.标准错误输出, 日志行前缀 ) def 开始(自身) -> : 使用构造函数中定义的参数启动进程。 如果 线程.当前线程() 线程.主线程(): 信号.信号(信号.SIGTERM, _终止进程处理程序) 信号.信号(信号.SIGINT, 终止进程处理程序) 如果 not IS_WINDOWS: 信号.信号(信号.SIGHUP, 终止进程处理程序) 信号.信号(信号.SIGQUIT, _终止进程处理程序) 否则: 记录器.警告( "由于 torchelastic 在子线程上运行,无法注册信号处理程序。" "如果终止 torchrun,这可能导致孤儿工作进程。" ) 自身._启动() 自身._stdout_tail.开始() 自身._stderr_tail.开始() @abc.抽象方法 def _start(自身) -> : 使用特定上下文中定义的策略启动进程。 提升 未实现异常 @abc.抽象方法 def _poll(自身) -> 可选[运行进程结果]: "" 查询此上下文中运行的进程的运行状态。 此方法遵循“全有或全无”策略,如果所有进程完成,则返回 一个 ``RunProcessResults`` 对象。 成功或任何进程失败。如果所有进程仍在运行,则返回 `None`。 所有进程仍在运行。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 提升 未实现异常 def 等待(自身, 超时: 浮点数 = -1, 期间: 浮点数 = 1) -> 可选[运行进程结果]: "" 等待指定的 `timeout` 秒,每 `period` 秒轮询一次。 确保进程完成。如果进程仍在运行,则返回 `None`。 在超时到期时。负的超时值被解释为“无限期等待”。 超时值为零时,仅查询进程的状态(例如,相当于轮询)。 等待超时。超时值为零时,仅查询进程的状态(例如,相当于轮询)。 .. 注意:: 多进程库注册了 SIGTERM 和 SIGINT 信号处理器,这些处理器会引发 信号异常,当接收到信号时。取决于代码的消费者 正确处理异常。重要的是不要吞没异常。 进程不会终止。典型工作流程的示例可以是: .. 代码块 :: python pc = 启动进程(...) 尝试: pc.wait(1) ...执行其他工作 except SignalException 异常 as e: pc.shutdown(e.sigval, 超时=30) 如果发生 SIGTERM 或 SIGINT,上述代码将尝试通过传播 收到的信号来关闭子进程。如果子进程在超时时间内没有终止, 进程将发送 SIGKILL 信号。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 如果 超时 == 0: 返回 自身._投票() 如果 超时 < 0: 超时 = 系统.最大尺寸 过期 = 时间.时间() + 超时 while 时间.时间() < 过期: pr = 自身._投票() 如果 普及: 返回 普及 时间.睡眠(期间) 返回 None @abc.抽象方法 def 进程 ID(自身) -> 字典[int, int]: 返回按各自本地 rank 映射的进程 pid。 提升 未实现异常 @abc.抽象方法 def 关闭(自身, 退出信号: 信号.信号, 超时: 整型 = 30) -> : r"" 终止由该上下文管理的所有进程,并清理任何 元数据资源(例如重定向、错误文件等)。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 提升 未实现异常 def 关闭( 自身, 死亡信号: 可选[信号.信号] = , 超时: 整型 = 30 ) -> : r"" 终止由该上下文管理的所有进程,并清理任何元数据资源(例如重定向、错误文件等)。 元数据资源(例如重定向、错误文件等)。 参数: 死亡信号:用于终止进程的信号。 超时:等待进程结束的时间,如果进程在此时间后仍然存活,将通过 SIGKILL 信号终止。 超时:等待进程结束的时间,如果进程在此时间后仍然存活,将通过 SIGKILL 信号终止。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 如果 not 死亡信号: 死亡信号 = _获取默认信号() 自身._关闭(死亡信号=死亡信号, 超时=超时) 如果 自身.标准输出尾部: 自身.标准输出尾部.停止() 如果 自身.标准错误尾部: 自身._stderr_tail.停止()
def get_std_cm(std_rd: 字符串, redirect_fn): 如果 是否为 Windows IS_MACOS not std_rd: 返回 无上下文() 否则: 返回 redirect_fn(std_rd) def _包装( local_rank: int, 函数: 可调用, 参数: 字典[int, 元组] 环境变量: 字典[int, 字典[字符串, 字符串]], 标准输出重定向: 字典[int, 字符串] # 重定向文件用于标准输出(如果为空则重定向到控制台) 标准错误重定向: 字典[int, 字符串] # 重定向文件用于标准错误(如果为空则重定向到控制台) 返回值: 字典[int, mp.简单队列] 队列完成读取事件: 同步.活动, ) -> : # 提前获取每个 rank 的参数,以便快速失败,如果找不到映射 args_ = 参数[本地排名] 环境_ = 环境变量[本地排名] 返回值_ = 返回值[本地排名] 标准输出读取 = 标准输出重定向[本地排名] 标准错误输出 = 标准错误重定向[本地排名] 标准输出 = 获取标准输出(标准输出读取, 重定向标准输出) stderr_cm = 获取标准错误信息(stderr_rd, 重定向标准错误) k, v env_.项目(): 操作系统.环境[k] = v stdout_cm, stderr_cm: 返回 = 记录(函数)(*args_) ret_val_.放置(返回) 队列完成读取事件.等待()
[文档] 多进程上下文(P 上下文): ``PContext`` 持有作为函数调用的工作进程。 def 初始化( 自身, 名称: 字符串, 入口: 可调用, 参数: 字典[int, 元组] 环境变量: 字典[int, 字典[字符串, 字符串]], 开始方法: 字符串, 日志规范: 日志规范类, 日志行前缀: 可选[字典[int, 字符串]] = , ): 超级().初始化( 名称, 入口, 参数, 环境变量, 日志规范, 日志行前缀, ) 自身.启动方法 = 启动方法 # 每个 ret_val 队列总是包含一个元素。 自身._ret_vals = { 本地排名: mp.获取上下文(自身.开始方法).简单队列() 本地排名 范围(自身.进程数) } 请参阅 `join()` 函数中的注释以了解这是怎么回事 自身.返回值: 字典[int, 任意] = {} 自身._pc: 可选[mp.进程上下文] = None # 注意:仅在所有进程完成时才应调用 set 方法。 # 如果任何进程在调用 event.wait() 时死亡,调用 set() 方法将导致死锁。 自身._worker_finished_event = mp.获取上下文(自身.开始方法).活动() def _start(自身): 如果 自身._pc: 提升 ValueError( "进程上下文已初始化。" "很可能是启动方法被调用了两次。" ) 自身._pc = mp.启动进程( 函数=_包装, 参数=( 自身.入口, 自身.参数, 自身.环境变量, 自身.标准输出, 自身.标准错误, 自身._ret_vals, 自身._worker_finished_event, ), 进程数=自身.进程数, 加入=错误, 守护进程=错误, 开始方法=自身.开始方法, ) def _is_done(自身) -> 布尔: 返回 长度(自身._return_values) == 自身.进程数 def _poll(自身) -> 可选[运行进程结果]: 断言 自身._pc not None mypy 类型检查断言 try: torch.mp.ProcessContext 如果某些/所有工作进程失败则抛出异常 工作进程失败 超时小于 0 检查工作状态并立即返回 # Join 将永远不会返回成功,因为我们使用 synchronize.Event 等待所有进程完成。 # 为所有进程完成。 自身._pc.加入(-1) # 重要:我们使用 multiprocessing.Queue 来传递工作进程的返回值。 返回父进程,工作进程将在终止前等待 直到喂料线程将所有缓冲项喂入底层管道 因此,为了避免大型返回值导致的死锁,我们在每次 join 调用时尝试 opportunistically 调用 queue.get 因此,为了避免大型返回值导致的死锁,我们在每次 join 调用时尝试 opportunistically 调用 queue.get # 请参阅:https://docs.python.org/2/library/multiprocessing.html#all-platforms 本地排名 范围(0, 自身.进程数): 返回队列 = 自身._ret_vals[本地排名] 如果 not 返回队列.空的(): # 临时保存返回值到成员变量中 自身._返回值[本地排名] = 返回队列.获取() 如果 自身._is_done(): # 我们应该始终在所有进程完成时拥有所有返回值 自身._worker_finished_event.集合() # 在这一点上,工作者已经完成了用户函数的运行 但是子进程可能仍然没有退出。等待它们。 pc.join() 会永久阻塞,直到 "a" 进程退出。循环直到所有进程退出。 while not 自身._pc.加入(): 记录器.调试( "入口函数完成,等待所有子进程退出..." ) _validate_full_rank( 自身.返回值, 自身.进程数, "返回值队列" ) 自身.关闭() 返回 运行进程结果( 返回值=自身.返回值, 标准输出=自身.标准输出, 标准错误=自身.标准错误, ) 否则: 返回 None 除了 (mp.处理引发的异常, mp.进程退出异常) 作为 e: 本地排名失败 = e.错误索引 MultiprocessContext 的入口点始终是一个可调用对象 函数名 = 自身.入口.__qualname__ # 类型:忽略[联合属性] 处理失败 = 自身._pc.流程[本地排名失败] 错误文件路径 = 自身.错误文件[本地排名失败] 记录器.异常( 失败(退出码:%s)" "local_rank: "%s (pid: %s)" " of fn: "%s (start_method: %s)", 失败的进程.退出码, 失败的本地排名, e.进程 ID, 函数名, 自身.开始方法, ) 自身.关闭() 返回 运行进程结果( 失败={ 失败的本地排名: 进程失败( 本地排名=失败的本地排名, 进程 ID=e.进程 ID, 退出码=失败进程.退出码, 错误文件=错误文件路径, ) }, 标准输出=自身.标准输出, 标准错误=自身.标准错误, ) def 进程 ID(自身) -> 字典[int, int]: 断言 自身._pc not None # 断言用于 mypy 类型检查 返回 字典(列举(自身._pc.进程 ID())) def _close(自身, 死亡信号: 信号.信号, 超时: 整型 = 30) -> : 如果 not 自身._pc: 返回 处理 自身._pc.流程: 如果 进程.是否存活(): 记录器.警告( "关闭进程"%s通过信号%s", 进程.进程 ID, 死亡信号.名称 ) try: 操作系统.终止(进程.进程 ID, 死亡信号) 除了 ProcessLookupError: 如果进程因为某些原因退出, 将会引发`ProcessLookupError`异常,可以安全地忽略它。 通过 末端 = 时间.单调的() + 超时 proc 自身._pc.流程: 等待时间 = 末端 - 时间.单调的() 如果 等待时间 0: 断开 进程.加入(等待时间) 处理 自身._pc.流程: 如果 进程.是否存活(): 记录器.警告( "无法关闭进程"%s通过%s,强制退出通过%s", 进程.进程 ID, 死亡信号, 获取终止信号(), ) try: 操作系统.终止(进程.进程 ID, 获取终止信号()) 除了 进程查找错误: 如果进程由于某些原因退出, `ProcessLookupError` 将引发,可以安全地忽略它。 通过 进程.加入()
[文档] 子进程上下文(P 上下文): ``PContext`` 持有作为二进制文件调用的工作进程。 def 初始化( 自身, 名称: 字符串, 入口: 字符串, 参数: 字典[int, 元组] 环境变量: 字典[int, 字典[字符串, 字符串]], 日志规范: 日志规范类, 日志行前缀: 可选[字典[int, 字符串]] = , ): 超级().初始化( 名称, 入口, 参数, 环境变量, 日志规范, 日志行前缀, ) 状态向量;_vdone[local_rank] -> 是否 local_rank 已完成 自身.正在运行的本地 rank: 集合[int] = 集合(范围(自身.进程数)) 自身.失败: 字典[int, 进程失败] = {} 自身.子进程处理器: 字典[int, 子进程处理器] = {} def _start(自身): 如果 自身.子进程处理器: 提升 ValueError( 子进程处理器已经初始化。很可能是启动方法被调用两次。 ) 自身.子进程处理器 = { 本地排名: 获取子进程处理器( 入口=自身.入口, # 类型:忽略[arg-type] # entrypoint 总是字符串类型 参数=自身.参数[本地排名] 环境=自身.环境变量[本地排名] 标准输出=自身.标准输出[本地排名] 标准错误输出=自身.标准错误输出[本地排名] 本地排名 ID=本地排名, ) 本地排名 范围(自身.进程数) } def _投票(自身) -> 可选[运行进程结果]: 本地排名完成 = 集合() 本地排名 自身._正在运行本地排名: 处理器 = 自身.子进程处理器[本地排名] exitcode = 处理器.进程.轮询() 如果 exitcode not : 完成本地排名.添加(本地排名) 如果 exitcode != 0: 失败或已发出信号 自身.失败[本地排名] = 进程失败( 本地排名=本地排名, 进程 ID=处理器.进程.进程 ID, 退出码=退出码, 错误文件=自身.错误文件[本地排名] ) # else: --> 成功;无操作 自身.运行本地排名.差异更新(完成本地排名) 如果所有进程都已完成或任何进程已失败 如果 not 自身.运行本地排名 自身.失败: 自身.关闭() 终止所有运行进程 结果 = 运行进程结果( 失败=自身.失败, 标准输出=自身.标准输出, 标准错误=自身.标准错误, ) 如果 结果.是否失败(): 首次失败 = 最小(结果.失败.(), =lambda f: f.时间戳) 记录器.错误( "失败(退出码:"%s) 本地排名:%s (进程 ID:%s) 的二进制:%s", 首次失败.退出码, 首次失败.本地排名, 首次失败.进程 ID, 自身.入口, ) 否则: 填充返回值以使用占位符。这确保了与 MultiprocessingHandler 的一致性 结果.返回值 = 字典.fromkeys(范围(自身.进程数)) 返回 结果 否则: 没有失败且进程仍在运行 返回 None def 进程 ID(自身) -> 字典[int, int]: 返回 { 本地排名: sh.进程.进程 ID 本地排名, sh 自身.子进程处理器.项目() } def 关闭(自身, 死亡信号: 信号.信号, 超时: 整型 = 30) -> : 如果 not 自身.子进程处理器: 返回 处理器 自身.子进程处理器.(): 如果 处理器.处理.投票() : 记录器.警告( 发送处理%s关闭信号%s", 处理器.进程.进程 ID, 死亡信号.名称, ) 处理器.关闭(死亡信号=死亡信号) 末端 = 时间.单调的() + 超时 处理器 自身.子进程处理器.(): 等待时间 = 末端 - 时间.单调的() 如果 等待时间 0: 断开 try: 处理器.进程.等待(等待时间) 除了 子进程.超时已过期: 忽略超时异常,因为 子进程将通过 SIGKILL 信号强制终止 通过 处理器 自身.子进程处理器.(): 如果 处理器.处理.投票() : 记录器.警告( "无法关闭进程"%s通过%s,强制退出通过%s", 处理器.处理.进程 ID, 死亡信号, _获取终止信号(), ) 处理器.关闭(死亡信号=获取终止信号()) 处理器.进程.等待()

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源