torch.distributed.elastic.events 的源代码
#!/usr/bin/env/python3
版权所有(c)Facebook,Inc.及其关联公司
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。
""
模块包含与标准 Python 日志集成的事件处理机制。
使用示例:
::
从 torch.distributed.elastic 导入 events
event = events.Event(
name="测试事件",来源=events.EventSource.WORKER,元数据={...}
)
events.get_logging_handler(目标="控制台").info(事件)
""
导入
检查
导入
记录日志
导入
操作系统
导入
套接字
导入
跟踪回溯
来自
打字
导入
可选
来自 torch.distributed.elastic.events.handlers
导入
获取日志处理程序
来自 .api
导入 ( # noqa: F401
活动,
事件元数据值,
事件源,
节点状态,
Rdzv 事件,
)
_事件记录器:
字典[
字符串,
记录.
记录器] = {}
定义
_获取或创建记录器(
目的地:
字符串 = "null") ->
记录.
记录器:
""
基于目标类型构建 Python 日志记录器或扩展(如果提供)。
可用目标可在 ``handlers.py`` 文件中找到。
构建的日志记录器不会将消息传播到上级日志记录器,
例如根日志记录器。这确保单个事件只被处理一次。
参数:
事件处理程序的字符串表示
在 `handlers` 模块中找到的可用的处理程序
"文档"
全局 _events_loggers
如果 destination
非
在
事件记录器:
事件记录器_ =
记录.
获取日志记录器(f
torchelastic-事件-{
目的地}")
事件记录器_.
设置级别(
操作系统.
环境.
获取(
"LOG 级别",
"信息"))
# 不要将消息传播到根日志记录器
_事件记录器.
传播 =
假
日志处理程序 =
获取日志处理程序(
目的地)
_事件记录器.
添加处理器(
日志处理程序)
# 将记录器添加到全局字典中
_事件记录器[
目的地] =
_事件记录器
返回
_事件记录器[
目的地]
[文档]def 记录(事件: Event, 目标: str = "null") -> None:
_get_or_create_logger(目标).info(event 序列化().info())
定义
记录_rdzv_event(
事件:
Rdzv 事件) ->
无:
_get_or_create_logger(动态会话).
信息(
事件.
序列化())
[文档]
定义
构建并记录会话事件(
运行 ID:
字符串,
消息:
字符串,
节点状态:
节点状态,
名称:
字符串 =
输入文本翻译为简体中文为:"",
主机名:
字符串 =
输入文本翻译为简体中文为:"",
进程 ID:
可选[
整数] =
无,
主端点:
字符串 =
输入文本翻译为简体中文为:"",
本地 ID:
可选[
整数] =
无,
排名:
可选[
整数] =
无,
) -> 无:
""
初始化 rendezvous 事件对象并记录其操作。
参数:
run_id (str): rendezvous 的运行 ID。
message (str): 描述事件的消息。
node_state (NodeState): 节点的状态(INIT,RUNNING,SUCCEEDED,FAILED)。
名称(str):活动名称。(例如:当前正在执行的操作)。
主机名(str):节点的主机名。
pid(可选[int]):节点的进程 ID。
master_endpoint(str):已知 rendezvous 存储的主端点。
local_id(可选[int]):在 dynamic_rendezvous.py 中定义的节点 local_id,如果已定义
rank(可选[int]):如果已知,节点的 rank
返回:
无
示例:
>>> # 查看 DynamicRendezvousHandler 类
>>> def _record(
... self,
... message: 字符串,
... node_state: 节点状态 = NodeState.RUNNING,
... rank: 可选[int] = None,
... ) -> None:
... 构建并记录 rdzv 事件(
... name=f"{self.__class__.__name__}.{get_method_name()}",
... run_id=self._settings.run_id,
... message=消息,
... node_state=节点状态,
... hostname=self._this_node.addr,
... pid=self._this_node.pid,
... local_id=self._this_node.local_id,
... rank=rank,
... )
"文档"
# 我们不希望在不需要的情况下进行额外的计算。
如果 isinstance(
获取日志处理程序(
动态会话),
记录.
Null 处理器):
返回
设置参数。
如果
非
主机名:
主机名 =
套接字.
获取完全限定域名()
如果
非
进程 ID:
进程 ID =
操作系统.
获取进程 ID()
# 确定调用此函数的文件名。
调用栈 =
检查.
栈()
文件名 =
"无文件"
如果 len(
调用栈) > 1:
栈深度_1 =
调用栈[1]
文件名 =
操作系统.
路径.basename(
栈深度_1.
文件名)
如果
非
名称:
名称 =
栈深度_1.
函数
删除调用堆栈变量。如果保留,可能会与 Python 的
垃圾回收器,因为我们正在保留堆栈帧信息
模块。
删除
调用栈
设置错误跟踪,如果这是一个异常
如果
节点状态 ==
节点状态.
失败:
错误追踪 =
跟踪回溯.format_exc()
else:
错误追踪 =
请提供需要翻译的文本
初始化事件对象
事件 =
Rdzv 事件(
名称=f"{
文件名}:{
名称}",
run_id=run_id,
消息=
消息,
hostname=hostname,
进程 ID=
进程 ID,
节点状态=
节点状态,
主端点=
主端点,
排名=
排名,
本地 ID=
本地 ID,
错误跟踪=
错误跟踪,
)
最后,记录事件。
record_rdzv_event(事件)