torch.distributed.elastic.multiprocessing.errors 源代码
#!/usr/bin/env python3
# mypy: 允许未类型化定义
版权所有(c)Facebook,Inc.及其关联公司
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。
""
每个分布式 PyTorch 作业中的主机都运行一个 TorchElastic 代理
和多个工作者(作为 TorchElastic 代理的子进程)。
由于工人是用户提供的(您的 PyTorch 脚本/作业),TorchElastic
通过代理将错误在训练器中传播,并向上传递
调度器,最终通知最终用户作业的状态
并应用任何重试策略。
TorchElastic 将错误分为 3 类:
+----------------+----------------+--------------------------------------------------------------+
| 分类 | 子分类 | 描述 |
+================+================+==============================================================+
用户错误 输入错误 向 TorchElastic API 的无效输入(例如,最小节点数大于最大节点数)
+----------------+--------------------------------------------------------------+
| 工作节点故障 | 工作子进程上的任何故障
+----------------+----------------+--------------------------------------------------------------+
平台错误 | n/a | 由代理引起的故障
+----------------+----------------+--------------------------------------------------------------+
基础错误 | 无可用信息 | 代理和工作者域外的故障 |
| | (例如,主机故障) |
+----------------+----------------+--------------------------------------------------------------+
所有非“工作者故障”的错误要么是规范地从代理进程引发,要么是隐式或显式地使代理进程崩溃。因此,
所以,所有错误要么是规范地从代理进程引发,要么是隐式或显式地使代理进程崩溃。因此,
标准语言(Python)提供的异常处理策略适用。
工人故障是特殊的,因为异常/故障起源于不同的
从代理进程处理,因此错误需要跨进程传播
(例如,代理不能简单地“try-catch”在工作进程上抛出的异常)。
TorchElastic 代理使用 :func:`torch.distributed.elastic.multiprocessing.start_processes` 来启动工作进程,该进程具有简单的基于文件的进程间错误传播机制。
任何使用 :func:`record` 装饰的函数或二进制入口点。
内置。
任何使用 :func:`record` 装饰的函数或二进制入口点。
将未捕获的异常(带有跟踪信息)写入由环境变量 `TORCHELASTIC_ERROR_FILE` 指定的文件。
父进程(例如代理)为每个启动的子进程设置此环境变量,然后汇总所有子进程的错误文件,并传播具有**最小**时间戳的文件(例如**第一个**错误)。
父进程(例如代理)为每个启动的子进程设置此环境变量,然后汇总所有子进程的错误文件,并传播具有**最小**时间戳的文件(例如**第一个**错误)。
父进程(例如代理)为每个启动的子进程设置此环境变量,然后汇总所有子进程的错误文件,并传播具有**最小**时间戳的文件(例如**第一个**错误)。
""
导入 json
导入
操作系统
导入
信号
导入
套接字
导入
时间
来自 dataclasses
导入
数据类,
字段
来自 datetime
导入 datetime
来自 functools
导入
包装
来自
字符串
导入
模板
来自
打字
导入
任意,
可调用,
可选,
类型变量
来自
torch.distributed.elastic.utils.日志
导入
获取日志记录器
来自
.错误处理器
导入
错误处理器 # noqa: F401
来自
.处理器
导入
获取错误处理器 # noqa: F401
__all__ = [
"进程失败",
子失败错误,
记录,
错误处理器,
获取错误处理器,
]
日志记录器 =
获取日志记录器(__name__)
JSON = 字典
_空错误数据 = {
"消息":
无}
不可用 =
不适用
T = 类型变量(
T)
[文档]@dataclass
类
进程失败:
""
表示失败过程的结果。当工作进程失败时,它可能会将失败的根本原因记录到文件中。
尝试从提供的 `error_file` 中读取失败时间戳,
如果 `error_file` 不存在,则时间戳为当前
时间戳(自纪元以来的秒数)。
`message` 字段是对失败情况的简要说明。如果
错误文件存在时,消息从错误文件中获取。
否则根据失败签名生成一个。
.. 注意:: 假设 ``error_file`` 是由
``torch.distributed.elastic.multiprocessing.errors.error_handler.ErrorHandler`` 编写的。
否则行为未定义。
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
local_rank: 整型
进程 ID:
整型
退出码:
整型
错误文件:
字符串
错误文件数据: JSON =
字段(
初始化=
错误)
消息:
字符串 =
字段(
初始化=
错误)
时间戳:
整型 =
字段(
初始化=
错误)
def __post_init__(自身):
自身.
错误文件数据 =
_空错误数据
如果
操作系统.
路径.
判断是否为文件(
自身.
错误文件):
try:
与
打开(
自身.
错误文件)
作为 fp:
自身.
错误文件数据 = json.
加载(fp)
记录器.
调试(
用户进程失败,错误数据为:%s",
json.压缩包(
自身.
错误文件数据,
缩进=2),
)
自身.
消息,
自身.
时间戳 =
自身.
获取错误数据(
自身.
错误文件数据
)
除了
异常:
记录器.
异常(
"解析回复文件失败:"%s",
自身.
错误文件)
提升
否则:
自身.
_设置无回复文件()
# 如果尚未存在,则创建一个信息性消息
如果 not
自身.
消息:
# 信号通常不会生成错误文件消息
如果
自身.exitcode < 0:
自身.
消息 = (
f信号{-
自身.
退出码} ({
自身.
信号名称()})"
f由 PID 接收{
自身.
进程 ID}"
)
否则:
自身.
消息 =
要启用跟踪回溯,请参阅:https://maskerprc.github.io/docs/stable/elastic/errors.html
def _获取错误数据(
自身,
错误文件数据:
字典[
字符串,
任意]) ->
元组[
字符串, int
]:
消息 =
错误文件数据[
"消息"]
如果 isinstance(
消息,
字符串):
时间戳 = int(
错误文件数据.
获取(
时间戳, 0))
否则:
时间戳 = int(
消息[
"额外信息"
]
["时间戳"])
返回 (
消息,
时间戳)
def _设置不回复文件(
自身):
自身.
错误文件 =
_不可用
自身.
文件数据错误 =
_空错误数据
自身.
消息 =
请提供需要翻译的文本
自身.
时间戳 = int(
时间.
时间())
def 信号名称(
自身) ->
字符串:
如果
自身.exitcode < 0:
我们不想在尝试查找信号名称时杀死父进程。
如果信号没有映射到已知名称,则使用不可用。
try:
返回
信号.
信号(-
自身.
退出码).
名称
除了
异常:
返回 _NOT_AVAILABLE
否则:
返回 _NOT_AVAILABLE
def timestamp_isoformat(自身):
返回 ISO 格式的时间戳(YYYY-MM-DD_HH:MM:SS)。
返回
日期时间.fromtimestamp(
自身.
时间戳).isoformat(
分隔符=
“_”)
全球排名 =
整型
_失败格式模板 =
```[${idx}
]:
时间 : $
{时间}
host : ${主机名}
rank : ${排名}
(本地排名: $
{本地排名})
退出码 : $
{退出码}
(进程 ID: $
{进程 ID})
错误文件: $
{错误文件}
跟踪信息: $
{消息}
""
# 预先在前后添加额外的空行是故意的
消息格式模板 =
""
${边框}
${标题}
${章节}
失败:
${其他失败}
${章节}
根本原因(首次观察到的失败):
${根故障}
${边界}
""
[文档]
类 ChildFailedError(
异常):
""
可以从使用注解的函数中抛出的特殊异常类型
使用 `@record` 装饰器使子进程(根异常)原样向上传递堆栈
(例如,不包裹在父进程的跟踪信息中)。
在父进程是一个简单的看护进程的情况下很有用
而子(工作)进程实际上在进行有意义的计算时。
在这种情况下,错误通常发生在子进程中
没有进行任何非平凡操作,子错误应被传播
调度器以进行准确的根本原因诊断。
.. note:: 传播依赖于错误文件而不是异常处理来
支持函数和二进制启动。
示例:
::
# 主机(容器)上的进程树
0: 调度器初始化进程:
|- 1: torchelastic_agent:
|- 2: 训练师_0 (正常)
|- 3: 训练师_1 (失败) -> error.json
|- ...
|- n+2: 训练师_n (正常)
|- n+3: 其他进程
|- ...
在上面的例子中,训练器 1 的失败(写入 error.json)是
根本原因,应报告给调度器的初始化进程。
torchelastic 代理抛出了`ChildFailedError("trainer", {1: "trainer_1/error.json"})`异常
在检测到训练器 1 失败后,该失败将传播到调度器的初始化进程
训练器 1 的错误文件内容
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
def 初始化(
自身,
名称:
字符串,
失败:
字典[
全球排名,
进程失败
)]
自身.
名称 =
名称
自身.
失败 =
失败
断言 (
自身.
失败
) # 不应在没有失败的情况下创建 ChildFaileError
超级().
初始化(
自身.
格式化消息())
def 获取第一个失败(
自身) ->
元组[
全球排名,
进程失败
]:
排名 =
最小(
自身.
失败.
键(),
键=lambda r:
自身.
失败[r].
时间戳)
返回
排名,
自身.
失败[
排名]
def 格式消息(
自身,
边界分隔符=
等于,
部分分隔符=
破折号):
标题 = f"{
自身.
名称}
失败
根级,
_根级故障 =
自身.
获取首次故障()
根级故障格式化:
字符串 =
请提供需要翻译的文本
其他失败格式:
列表[
字符串] = []
宽度 =
长度(
标题)
为
索引, (
排名,
失败)
在
列举(
自身.
失败次数.
项目()):
格式化, w =
自身.
格式错误(
索引,
排名,
失败)
宽度 =
最大值(
宽度, w)
如果
排名 ==
根排名:
根部故障格式 = fmt
否则:
其他故障格式.
追加(
格式化)
宽度上限
宽度 =
最小(
宽度, 60)
返回
模板(_MSG_FORMAT_TEMPLATE).
替换(
边框=
边界分隔符 *
宽度,
标题=
标题,
部分=
区部分隔符 *
宽度,
根故障=
根故障格式化,
其他失败="
输入文本翻译为简体中文为:\n".
加入(
其他失败格式化
或 [
"无其他失败"
)]
)
def 格式化失败(
自身,
索引: int,
排名: int,
失败:
处理失败
) -> 元组[
字符串, int
]:
# 失败.message 要么是一个 str(当失败不生成跟踪堆栈时 - 例如信号)
# 或者是一个字典(json)的形式
# {"message": 错误信息, "extraInfo": {"py_callstack": 跟踪栈, timestamp: 时间戳}}
# 显示逻辑如下:
# 1. 如果 failure.message 不是一个字典(它是一个字符串)则直接显示
# 2. 否则尝试获取跟踪栈(py_callstack)
如果没有 traceback,则使用消息
如果没有消息,则显示
msg = 失败.
消息
如果 isinstance(
失败.
消息,
字典):
msg = (
失败.
消息.
获取(
"额外信息", {})
.获取(
"Python 调用栈",
失败.
消息.
获取(
"消息", "<N/A>"))
.替换("
输入文本翻译为简体中文为:\n", "
输入文本翻译为简体中文为:\n ")
# 正确缩进跟踪信息
)
fmt = 模板(
_失败格式模板).
替代(
索引=
索引,
时间=
失败.
ISO 格式时间戳(),
hostname=套接字.
获取完全限定域名(),
排名=
排名,
本地排名=
失败.
本地排名,
退出码=
失败.
退出码,
进程 ID=
失败.
进程 ID,
错误文件=
失败.
错误文件,
消息=
信息,
)
宽度 = 0
为
行
在
格式化.
分割("
输入文本翻译为简体中文为:\n"):
宽度 =
最大值(
宽度,
长度(
行))
返回
格式化,
宽度
[文档]def
记录(
函数:
可调用[..., T
]
错误处理器:
可选[
错误处理器] = None
) -> 可调用[..., T
]:
""
用于记录被装饰函数中发生的错误/异常的语法糖。
使用此装饰器等同于:使用提供的 `error_handler`。
使用此装饰器等同于:
::
error_handler = 获取错误处理器()
error_handler.initialize()
尝试:
foobar()
except 子进程失败错误 as e:
_, failure = e.get_first_failure()
error_handler.dump_error_file(failure.error_file, failure.exitcode)
raise
except Exception as e:
error_handler.record_exception(e)
raise
.. 重要:: 在进程顶层方法中每次使用此装饰器,
通常这是主方法。
示例
::
@record
def 主函数():
通过
if __name__ == "__main__":
主函数()
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
如果 not
错误处理器:
错误处理器 =
获取错误处理器()
def 包裹(f):
@wraps(f)
def 包装器(*
参数, **kwargs):
断言
错误处理器
是 not None
# mypy 类型检查器的断言
错误处理器.
初始化()
try:
返回 f(*
参数, **kwargs)
除了
系统退出
作为 se:
# 系统退出代码为 0 时,基于 run_path 的入口点永远不会退出。
# 这里处理它,通过返回一个值:
如果 se.
代码 == 0:
返回 None
否则:
提升
除了 ChildFailedError
作为 e:
排名,
失败 = e.
获取第一个失败()
如果
失败.
错误文件 !=
不可用:
错误处理器.
错误文件输出(
失败.
错误文件,
失败.
退出码)
否则:
记录器.
信息(
(
"本地排名"%s
无错误文件,失败。
"使用@record 装饰你的入口函数 fn 以获取跟踪信息。"
"查看:https://maskerprc.github.io/docs/stable/elastic/errors.html",
排名,
)
)
提升
除了
异常
作为 e:
错误处理器.
记录异常(e)
提升
返回
包装器
返回
包裹(
函数)