#!/usr/bin/env python3
# mypy: 允许未类型化定义
版权所有(c)Facebook,Inc.及其关联公司
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。
导入 abc
导入
时间
来自
集合
导入
命名元组
来自 functools
导入
包装
来自
打字
导入
可选
来自 typing_extensions
导入
已弃用
__all__ = [
"MetricsConfig",
"MetricHandler",
"控制台指标处理器",
"空指标处理器",
"指标流",
"配置",
"getStream",
"prof",
"配置文件",
"put_metric",
"publish_metric",
"获取经过时间(毫秒)",
"指标数据",
]
指标数据 =
命名元组(
"指标数据", [
时间戳,
组名,
名称,
"值"])
类
指标配置:
__slots__ = [参数]
def 初始化(
自身,
参数:
可选[
字典[
字符串,
字符串]] =
无):
自身.params = params
如果
自身.params
是
无:
自身.params = {}
[文档]class 指标处理器(abc.ABC):
@abc.abstractmethod
def emit(self, metric_data: MetricData):
pass
[文档]class ConsoleMetricHandler(MetricHandler):
def emit(self, metric_data: MetricData):
print(
f"[{metric_data.timestamp}][{metric_data.group_name}]: {metric_data.name}={metric_data.value}"
)
[文档]class NullMetricHandler(MetricHandler):
def emit(self, metric_data: MetricData):
pass
类 MetricStream:
def 初始化(
自身,
群组名称:
字符串,
处理器:
指标处理器):
自身.group_name = group_name
自身.
处理器 =
处理器
def 添加值(
自身,
指标名称:
字符串,
指标值: int):
自身.
处理器.
发射(
指标数据(
时间.
时间(),
自身.
群组名称,
指标名称,
指标值)
)
_指标映射:
字典[
字符串,
指标处理器] = {}
_默认指标处理器:
指标处理器 =
Null 指标处理器()
# pyre-fixme[9]: group has type `str`; used as `None`.
[文档]def configure(handler: MetricHandler, group: Optional[str] = None):
if group is None:
全局 _default_metrics_handler
# pyre-fixme[9]: _default_metrics_handler has type `NullMetricHandler`; used
# as `MetricHandler`.
_default_metrics_handler = 处理器
else:
`_metrics_map[group] = handler`
def 获取流(
群组:
字符串):
如果
组
在
`_metrics_map`:
处理器 =
`_metrics_map`[
群组]
否则:
处理器 =
默认度量处理器
返回 MetricStream(
群组,
处理器)
def _get_metric_name(函数):
qualname = 函数.__qualname__
分割 = qualname.
分割(
“点”)
如果
长度(
分割) == 1:
模块 =
函数.__module__
如果
模块:
返回
模块.
分割(
“点”
)-1] + "." +
分割[0]
否则:
返回
分割[0]
否则:
返回 qualname
[文档]def prof(fn=None, group: str = "torchelastic"):
r"""
@profile 装饰器为被装饰的函数发布 duration.ms、count、success、failure 指标。
默认情况下,指标名称为函数的限定名称(`class_name.def_name`)。
如果函数不属于任何类,则使用叶模块名称代替。
使用说明
::
@metrics.prof
def x():
通过
@metrics.prof(group="agent")
def y():
通过
"""
"""
def wrap(f):
@wraps(f)
def wrapper(*args, **kwargs):
key = _get_metric_name(f)
try:
start = time.time()
result = f(*args, **kwargs)
put_metric(f"{key}.success", 1, group)
except Exception:
put_metric(f"{key}.failure", 1, group)
raise
finally:
put_metric(f"{key}.duration.ms", get_elapsed_time_ms(start), group) # type: ignore[possibly-undefined]
return result
return wrapper
if fn:
return wrap(fn)
else:
return wrap
@deprecated("已弃用,请使用 `@prof` 代替",
分类=
未来警告)
def 个人资料(
群组=
无):
""
`@profile` 装饰器为任何给定函数添加了延迟和成功/失败指标。
使用
::
@metrics.profile("my_metric_group")
def some_function(参数列表):
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
def 包裹(
函数):
@wraps(函数)
def 包装器(*
参数, **kwargs):
try:
开始时间 =
时间.
时间()
结果 =
函数(*
参数, **kwargs)
发布指标(
群组, f"{
函数.__name__}
.成功, 1)
除了
异常:
发布指标(
群组, f"{
函数.__name__}
.失败, 1)
提升
最后:
发布指标(
群组,
f"{函数.__name__}
.持续时间.ms,
获取经过的时间(毫秒)(
开始时间), # type: ignore[possibly-undefined]
)
返回
结果
返回
包装器
返回
包装
[文档]定义 put_metric(metric_name: str, metric_value: int, metric_group: str = \"torchelastic\")
" """
发布一个指标数据点。
使用情况
::
put_metric("metric_name", 1)
put_metric("metric_name", 1, "metric_group_name")
"""
getStream(metric_group).add_value(metric_name, metric_value)
@deprecated(
"已弃用,请使用 `put_metric(metric_group)(metric_name, metric_value)` 代替",
分类=
未来警告,
)
def 发布指标(
指标组:
字符串,
指标名称:
字符串,
指标值: int):
指标流 =
获取流(
指标组)
指标流.
添加值(
指标名称,
指标值)
def 获取经过的毫秒数(
开始时间(秒): float):
"从给定开始时间返回已过时间(毫秒)。"
结束时间 =
时间.
时间()
返回 int((
结束时间 -
开始时间(秒)) * 1000)