快捷键

torch.distributed.elastic.metrics.api 的源代码

#!/usr/bin/env python3
# mypy: 允许未类型化定义

版权所有(c)Facebook,Inc.及其关联公司
版权所有
#
此源代码遵循在源树根目录中的 LICENSE 文件中找到的 BSD 风格许可协议。
有关许可证文件,请参阅源树根目录。

导入 abc
导入 时间
来自 集合 导入 命名元组
来自 functools 导入 包装
来自 打字 导入 可选
来自 typing_extensions 导入 已弃用


__all__ = [
    "MetricsConfig",
    "MetricHandler",
    "控制台指标处理器",
    "空指标处理器",
    "指标流",
    "配置",
    "getStream",
    "prof",
    "配置文件",
    "put_metric",
    "publish_metric",
    "获取经过时间(毫秒)",
    "指标数据",
]

指标数据 = 命名元组("指标数据", [时间戳, 组名, 名称, "值"])


 指标配置:
    __slots__ = [参数]

    def 初始化(自身, 参数: 可选[字典[字符串, 字符串]] = ):
        自身.params = params
        如果 自身.params  :
            自身.params = {}


[文档]class 指标处理器(abc.ABC): @abc.abstractmethod def emit(self, metric_data: MetricData): pass
[文档]class ConsoleMetricHandler(MetricHandler): def emit(self, metric_data: MetricData): print( f"[{metric_data.timestamp}][{metric_data.group_name}]: {metric_data.name}={metric_data.value}" )
[文档]class NullMetricHandler(MetricHandler): def emit(self, metric_data: MetricData): pass
MetricStream: def 初始化
(自身, 群组名称: 字符串, 处理器: 指标处理器): 自身.group_name = group_name 自身.处理器 = 处理器 def 添加值(自身, 指标名称: 字符串, 指标值: int): 自身.处理器.发射( 指标数据(时间.时间(), 自身.群组名称, 指标名称, 指标值) ) _指标映射: 字典[字符串, 指标处理器] = {} _默认指标处理器: 指标处理器 = Null 指标处理器() # pyre-fixme[9]: group has type `str`; used as `None`.
[文档]def configure(handler: MetricHandler, group: Optional[str] = None): if group is None: 全局 _default_metrics_handler # pyre-fixme[9]: _default_metrics_handler has type `NullMetricHandler`; used # as `MetricHandler`. _default_metrics_handler = 处理器 else: `_metrics_map[group] = handler`
def 获取流
(群组: 字符串): 如果 `_metrics_map`: 处理器 = `_metrics_map`[群组] 否则: 处理器 = 默认度量处理器 返回 MetricStream(群组, 处理器) def _get_metric_name(函数): qualname = 函数.__qualname__ 分割 = qualname.分割(“点”) 如果 长度(分割) == 1: 模块 = 函数.__module__ 如果 模块: 返回 模块.分割(“点”)-1] + "." + 分割[0] 否则: 返回 分割[0] 否则: 返回 qualname
[文档]def prof(fn=None, group: str = "torchelastic"): r""" @profile 装饰器为被装饰的函数发布 duration.ms、count、success、failure 指标。 默认情况下,指标名称为函数的限定名称(`class_name.def_name`)。 如果函数不属于任何类,则使用叶模块名称代替。 使用说明 :: @metrics.prof def x(): 通过 @metrics.prof(group="agent") def y(): 通过 """ """ def wrap(f): @wraps(f) def wrapper(*args, **kwargs): key = _get_metric_name(f) try: start = time.time() result = f(*args, **kwargs) put_metric(f"{key}.success", 1, group) except Exception: put_metric(f"{key}.failure", 1, group) raise finally: put_metric(f"{key}.duration.ms", get_elapsed_time_ms(start), group) # type: ignore[possibly-undefined] return result return wrapper if fn: return wrap(fn) else: return wrap
@deprecated("已弃用,请使用 `@prof` 代替"
, 分类=未来警告) def 个人资料(群组=): "" `@profile` 装饰器为任何给定函数添加了延迟和成功/失败指标。 使用 :: @metrics.profile("my_metric_group") def some_function(参数列表): ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` def 包裹(函数): @wraps(函数) def 包装器(*参数, **kwargs): try: 开始时间 = 时间.时间() 结果 = 函数(*参数, **kwargs) 发布指标(群组, f"{函数.__name__}.成功, 1) 除了 异常: 发布指标(群组, f"{函数.__name__}.失败, 1) 提升 最后: 发布指标( 群组, f"{函数.__name__}.持续时间.ms, 获取经过的时间(毫秒)(开始时间), # type: ignore[possibly-undefined] ) 返回 结果 返回 包装器 返回 包装
[文档]定义 put_metric(metric_name: str, metric_value: int, metric_group: str = \"torchelastic\") " """ 发布一个指标数据点。 使用情况 :: put_metric("metric_name", 1) put_metric("metric_name", 1, "metric_group_name") """ getStream(metric_group).add_value(metric_name, metric_value)
@deprecated( "已弃用,请使用 `put_metric(metric_group)(metric_name, metric_value)` 代替", 分类
=未来警告, ) def 发布指标(指标组: 字符串, 指标名称: 字符串, 指标值: int): 指标流 = 获取流(指标组) 指标流.添加值(指标名称, 指标值) def 获取经过的毫秒数(开始时间(秒): float): "从给定开始时间返回已过时间(毫秒)。" 结束时间 = 时间.时间() 返回 int((结束时间 - 开始时间(秒)) * 1000)

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源