快捷键

torch.autograd.profiler_util 的源代码

# mypy: 允许未类型化定义
导入 二分查找
导入 itertools
导入 数学
来自 集合 导入 默认字典, 命名元组
来自 操作符 导入 属性获取器
来自 打字 导入 任何, 可选
来自 typing_extensions 导入 已弃用

导入 火炬
来自 torch 自动微分 导入 设备类型


全部 = [
    "事件列表",
    "FormattedTimesMixin",
    "Interval",
    "Kernel",
    "功能事件",
    "FunctionEventAvg",
    "字符串表",
    "内存记录账户",
]


 事件列表(列表):
    "事件列表(用于美观打印)。"

    def __init__(self, *参数, **kwargs):
        使用设备 = kwargs.流行("使用设备", )
        用户内存 = kwargs.流行("配置内存", False)
        带浮点运算 = kwargs.流行(with_flops, False)
        超级().__init__(*参数, **kwargs)
        self.使用设备 = 使用设备
        self.配置内存 = 用户内存
        self.树构建完成 = 
        self._with_flops = 带浮点运算

    def 构建树(self):
        self._populate_cpu_children()
        self._remove_dup_nodes()
        self._设置回溯堆栈跟踪()
        self._tree_built = 真实

    def __str__(self):
        返回 self.表格()

    def 删除重复节点(self):
         True:
            要删除 = 设置()
            for 索引  范围(长度(self)):
                如果 (
                    self[索引].cpu 父节点   
                     self[索引].cpu_parent.名称 == self[索引].名称
                     长度(self[索引].cpu_parent.cpu 子节点) == 1
                ):
                    self[索引].cpu_parent.cpu_children = self[索引].cpu_children
                    self[索引].cpu_parent.kernels = self[索引].kernels  提升内核
                    for ch  self[索引].cpu 子进程:
                        ch.cpu_parent = self[索引].cpu_parent
                    to_delete.添加(索引)
            如果 长度(to_delete) == 0:
                断开
            新事件 = [事件 for 指数, 事件  列举(self) 如果 索引   删除]
            self.清晰()
            self.扩展(新事件)

    def _填充 cpu 子项(self):
        将子事件填充到每个底层 FunctionEvent 对象中。

一个事件是另一个事件的子事件,如果[s1, e1)包含在[s2, e2)内。其中,
s1 和 e1 表示子事件区间的开始和结束。
s2 和 e2 表示父事件区间的开始和结束。

例如:在事件列表[[0, 10], [1, 3], [3, 4]]中,会生成[0, 10]。
是两个其他区间的父区间。

如果由于任何原因两个区间仅部分相交,
则此函数不会记录它们之间的父子关系。
        """
        一些事件可以是异步的(即开始和结束在不同的线程上),
        # 由于通常无法将子范围分配给
        # 异步范围,我们在计算嵌套范围和统计时不会使用它们
        同步事件 = [
            事件
            for 事件  self
            如果  事件.是否异步  事件.设备类型 == 设备类型.CPU
        ]
        事件 = 排序(
            同步事件,
            =属性获取器("线程"),
        )
        # 按线程和 node_id 分组,以便事件发生时
        来自不同节点但具有相同 thread_id 的线程不会被错误地
        分组在一起。
        线程 = itertools.分组(
            活动, =Lambda 函数 事件: (事件.线程, 事件.node_id)
        )

        对于每个线程,我们保留一个当前嵌套父级的栈。
        我们保持一个不变量,即每个区间都是所有其他在栈中位置较低的区间的子集。
        区间。
        #
        首先我们按区间的起始时间对它们进行排序。然后我们遍历它们。
        每次我们看到一个新的区间,我们就从父级中移除几个
        将顶层恢复到不变状态。然后是父子关系
        如果已记录且栈不为空。
        最后我们将新的间隔添加到列表中
        #
        算法的时间复杂度为 O(N * log(N)),其中 N 为区间数量
        区间
        for _线程 ID, 线程事件  线程:
            线程事件_ = 排序(
                线程事件,
                =Lambda 函数 事件: [事件.时间范围.开始, -事件.时间范围.结束]
            )
            当前事件: 列表[函数事件] = 输入文本为空,请提供需要翻译的文本
            for 事件  线程事件_:
                 长度(当前事件) > 0:
                    父级 = 当前事件[-1]
                    如果 (
                        事件.时间范围.开始  父级.时间范围.末端
                        或者 事件.时间范围.末端 > 父级.时间范围.末端
                    ):
                        这不能是一个父级
                        当前事件.流行()
                    否则:
                        父级.添加 CPU 子项(事件)
                        断言 (
                            事件.CPU 父级  
                        ), f已经存在一个 CPU 父级事件{事件.}"
                        事件.设置 CPU 父节点(父节点)
                        断开

                当前事件.添加(事件)

    def _设置回溯堆栈跟踪(self):
        def bw 父级(事件):
            如果 事件  :
                返回 
            如果...否则 事件.范围 == 1:  # 反向函数
                返回 事件
            否则:
                返回 bw_parent(事件.cpu_parent)

        fwd_stacks = {}
        for 事件  self:
            如果 bw_父(事件)    事件.   :
                t = (事件.序列号, 事件.线程)
                如果 t   前端栈:
                    前端栈[t] = 事件.

        for 事件  self:
            p = 父节点(事件)
            如果 p   :
                断言 p.前线程   
                t = (p.序列号, p.前线程)
                如果 t  前栈:
                    事件. = 前栈[t]
                否则:
                    事件. = 输入文本为空,请提供需要翻译的文本

    @property
    def self_cpu_time_total(self):
        返回 总和(事件.自身 CPU 总时间 for 事件  self)

    def 表格(
        self,
        排序=,
        行限制=100,
        最大源列宽度=75,
        最大名称列宽度=55,
        最大形状列宽=80,
        标题=,
        仅顶级事件=False,
    ):
        打印一个格式优美的 EventList 表格。

参数:
sort_by (str, 可选): 用于排序条目的属性。默认情况下
它们按照注册的顺序打印。
有效的键包括:`cpu_time`、`cuda_time`、`xpu_time`、
``cpu_time_total``、``cuda_time_total``、``xpu_time_total``,
``cpu_memory_usage``、``cuda_memory_usage``、``xpu_memory_usage``,
``self_cpu_memory_usage``、``self_cuda_memory_usage``,
``self_xpu_memory_usage``、``count``.
仅输出翻译:
仅顶级事件(bool, 可选):布尔标志,用于确定
显示事件的选取。如果为真,分析器将仅
显示顶层事件,如 Python 的顶层调用
`lstm`、Python `add` 或其他函数,嵌套事件如低级
排除了用于分析结果可读性的 cpu/cuda/xpu 操作事件。

返回值:
包含表格的字符串。
        """
        返回 _构建表格(
            self,
            排序=排序,
            行限制=行限制,
            最大源列宽度=最大源列宽度,
            最大名称列宽度=最大名称列宽度,
            最大形状列宽=最大形状列宽,
            标题=标题,
            用户内存=self._profile_memory,
            带有 FLOPS=self._with_flops,
            仅顶级事件=仅顶级事件,
        )

    def 导出 Chrome 跟踪(self, 路径):
        导出 EventList 为 Chrome 跟踪工具文件。

检查点可以在稍后通过`chrome://tracing` URL 加载和检查。

参数:
路径(str):写入跟踪的路径。
        """
        导入 操作系统

        设备名称 = "cuda" 如果  self.使用设备 否则 self.使用设备
        替换为 打开(路径, w) as f:
            下一个 ID = 0
            使用文件 I/O 而不是使用 json.dump,因为 JSON 序列化非常慢且
            这项技术已被证明能提供 4 倍的速度提升。
            f.("[")
            for 事件  self:
                如果 事件.跟踪名称  :
                    continue
                f.(
                    {{"name": "{}", ' "
                    '"ph": "X", '
                    '"ts": '{}, '
                    '"dur":'{}','
                    '"tid":'{}','
                    '"pid": "CPU 功能", '
                    '"args": {{}}}}, '.格式(
                        事件.跟踪名称,
                        事件.时间范围.开始,
                        事件.时间范围.elapsed_us(),
                        事件.线程
                        如果  事件.is_remote
                        否则 f节点_id{事件.node_id}, 线程_id{事件.线程}"'",
                    )
                )
                for _  事件.内核:
                    's' 和 'f' 从 CPU 启动到 GPU 内核绘制流程箭头
                    从 CPU 启动到 GPU 内核
                    f.(
                        f'{{"名称": ""{事件.跟踪名称}", ' "
                        '"ph": "s", '
                        f'"ts": '{事件.时间范围.开始}, '
                        f'"tid":'{事件.线程}','
                        '"pid": "CPU functions",'
                        f'"id":'{下一个_id},‘
                        f"cat": "cpu_to_"{设备名称}", ' "
                        '"参数":'{}'}, ' '
                    )
                    # 注意:使用 torch.profiler 获取设备内核追踪
                    next_id += 1
            如果 长度(self) > 0:
                移除尾部空格和逗号
                f.搜索(f.告诉() - 2, os.SEEK_SET)
                f.截断()
            f.(])

    def 支持导出堆栈指标(self):
        返回 [
            "self_cpu_time_total",
            self_cuda_time_total,
            "self_xpu_time_total",
            "self_privateuse1_time_total",
        ]

    def 导出堆栈(self, 路径: str, 指标: str):
        如果 metric   self.supported_export_stacks_metrics():
            抛出 值错误(
                "指标应该是以下之一:"
                + str(self.支持的导出堆栈指标())
            )
        翻译表 = str.maketrans(";"制表符 换行符", "下划线")
        替换为 打开(路径, w) as f:
            for 事件  self:
                如果 事件.  长度(事件.) > 0:
                    指标值 = getattr(
                        事件,
                        指标.替换(cuda, "设备")
                        .替换(XPU, "设备")
                        .替换(privateuse1, "设备"),
                    )
                    如果 int(指标值) > 0:
                        栈字符串 = 请提供需要翻译的文本
                        for 条目  反转(事件.):
                            栈字符串 += 条目.翻译(翻译表)
                            栈字符串 += 分号
                        stack_str = stack_str[-1] + 输入文本为空,请提供需要翻译的文本 + str(int(指标值))
                        f.(栈字符串 + "输入文本翻译为简体中文为:\n")

    def key_averages(
        self,
        按输入形状分组=False,
        按栈数量分组=0,
        按重载名称分组=False,
    ):
        平均所有函数事件在其键上。

参数:
group_by_input_shapes:按输入形状分组条目:
(事件名称,输入形状)而不是仅按事件名称。
这是一个有用的功能,可以查看哪些输入形状对运行时产生影响
最和可能有助于特定尺寸的优化
选择量化最佳候选者(即拟合屋顶线)

按前 n 个堆栈跟踪条目分组

按重载名称分组:通过重载名称区分运算符,例如 aten::add.Tensor
aten::add.out 将分别聚合

返回值:
包含 FunctionEventAvg 对象的事件列表。
        """
        断言 self._tree_built
        统计: 字典[元组[str, ...] 函数事件平均] = 默认字典(函数事件平均)

        def 获取键(
            事件, 按输入形状分组, 按堆栈编号分组, 按重载名称分组
        ) -> 元组[str, ...]
             = [
                str(事件.),
                str(事件.node_id),
                str(事件.设备类型),
                str(事件.是遗留的),
                str(事件.是否为用户标注),
            ]
            如果 按重载名称分组:
                .添加(事件.重载名称)
            如果 按输入形状分组:
                .添加(str(事件.输入形状))
            如果 按堆栈分组_n > 0:
                 += 事件.[按堆栈分组_n]
            返回 元组()

        for 事件  self:
            统计[
                获取键(
                    事件, 按输入形状分组, 按堆栈编号分组, 按重载名称分组
                )
            ].添加(事件)

        平均列表 = 事件列表(
            统计.(),
            use_device=self.使用设备,
            用户内存=self.配置内存,
            带有 FLOPS=self.带有 FLOPS,
        )
        for 事件  平均列表:
            事件. = 事件.[按栈分组]
            如果  按输入形状分组:
                事件.输入形状 = 请提供需要翻译的文本
            如果  按重载名称分组:
                事件.重载名称 = 请提供需要翻译的文本
        返回 avg_list

    def 总平均值(self):
        """平均所有事件。"""

返回值:
一个 FunctionEventAvg 对象。
        """
        total_stat = 功能事件平均()
        for 事件  self:
            总计统计 += 事件
            总计统计. = 
        总计统计. = 总计
        返回 总计统计


def _格式化时间(时间_us):
    定义如何在 FunctionEvent 中格式化时间。
    每秒微秒数 = 1000.0 * 1000.0
    每毫秒微秒数 = 1000.0
    如果 时间_us  每秒时间_us:
        返回 f"{时间_us / 每秒时间_us:.3f}s"
    如果 时间微秒  毫秒单位:
        返回 f"{时间_us / US_毫秒:.3f}毫秒"
    返回 f"{时间_us:.3f}us"


def _format_time_share(time_us, 总时间_us):
    定义如何在 FunctionEvent 中格式化时间。
    如果 总耗时_us == 0:
        断言 耗时_us == 0, f"预期 time_us 等于 0,但得到"{time_us}"
        返回 "NaN"
    返回 f"{时间_us * 100.0 / 总时间_us:.2f}%


def _format_memory(字节数):
    "返回一个格式化的内存大小字符串。"
    KB = 1024
    兆字节 = 1024 * 千字节
    吉字节 = 1024 * 兆字节
    如果 绝对值(字节数)  英国:
        返回 f"{字节数 * 1.0 / 英国:%.2f} Gb"
    如果...否则 绝对值(字节数)  MB:
        返回 f"{字节数 * 1.0 / MB:.2f} Mb"
    如果...否则 绝对值(字节数)  KB:
        返回 f"{字节数 * 1.0 / KB:%.2f} Kb"
    否则:
        返回 str(字节数) + " b"


def 属性格式化器(名称):
    返回 属性(Lambda 函数 self: 格式化时间(getattr(self, 名称)))


 FormattedTimesMixin:
    """函数事件和函数事件平均值的辅助工具。

子类应定义`*_time_total`和`count`属性。
    """

    CPU 时间字符串 = _attr_formatter(CPU 时间)
    设备时间字符串 = _属性格式化器("设备时间")
    cpu 总时间字符串 = 属性格式化器(cpu 总耗时)
    设备总耗时字符串 = 属性格式化器(设备总时间)
    自身 CPU 总时间字符串 = 属性格式化器("self_cpu_time_total")
    自定义设备时间总计字符串 = 属性格式化器(自定义设备时间总计)

    @property
    def CPU 时间(self):
        返回 0.0 如果 self.计算 == 0 否则 1.0 * self.cpu_time_total / self.计算  # 类型:忽略[已定义]

    @property
    def device_time(self):
        返回 0.0 如果 self.计算 == 0 否则 1.0 * self.设备时间总计 / self.计算  # 类型:忽略[已定义]

    @property
    @deprecated(
        `cuda_time` 已弃用,请使用 `device_time` 代替。,
        分类=未来警告,
    )
    def cuda_time(self):  将被弃用
        返回 self.设备时间


[文档]类 Interval: def __init__(self, start, end): self.start = start self.end = end
[文档] def elapsed_us(self): r""" 返回区间的长度 ```python # 输入文本 input_text = '"""' # 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API) def translate_to_simplified_chinese(text): # 假设的翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 返回 self.end - self.start
内核
= 命名元组(内核, [名称, "设备", "时长"]) 函数事件(FormattedTimesMixin): "关于单个函数的配置信息。" def __init__( self, id, 名称, 线程, 开始时间(美国), 美国结束, 重载名称=, 前线程=, 输入形状=, =, 范围=0, use_device=, CPU 内存使用率=0, 设备内存使用=0, 是否异步=False, 远程=False, 序列号=-1, node_id=-1, 设备类型=设备类型.CPU, 设备索引=0, 设备资源 ID=, 是遗留的=False, 洛普斯=, 跟踪名称=, 混凝土输入=, 输入参数=, 是否为用户标注=False, ): self.id: 整型 = 标识符 self.node_id: 整型 = 节点 ID self.名称: 字符串 = 名称 self.重载名称: 字符串 = 重载名称 self.跟踪名称: 字符串 = 跟踪名称 self.时间范围: 间隔 = 间隔(开始时间(美国), 美国结束) self.线程: 整型 = 线程 self.前线程: 可选[int] = 前方线程 self.内核: 列表[内核] = 输入文本为空,请提供需要翻译的文本 self.数量: 整型 = 1 self.CPU 子节点: 列表[函数事件] = 输入文本为空,请提供需要翻译的文本 self.cpu_parent: 可选[函数事件] = self.输入形状: 元组[int, ...] = 输入形状 self.混凝土输入: 列表[任何] = 具体输入 self.输入参数: 字典[str, 任何] = 关键词输入 self.: 列表 = self.范围: 整型 = 范围 self.use_device: 可选[str] = 使用设备 self.CPU 内存使用率: 整型 = CPU 内存使用率 self.设备内存使用: 整型 = 设备内存使用率 self.是否异步: 布尔值 = 是否异步 self.远程: 布尔值 = 远程 self.序列号: 整型 = 序列号 self.设备类型: 设备类型 = 设备类型 self.设备索引: 整型 = 设备索引 self.设备资源 ID: 整型 = ( 线程 如果 设备资源 ID 否则 设备资源 ID ) self.是遗留的: 布尔值 = 是否为旧版本 self.洛普斯: 可选[int] = 拷贝操作数(FLOPS) self.是否为用户标注: 可选[bool] = 是否为用户标注 self.自身 CPU 百分比 = -1 self.总 CPU 百分比 = -1 self.总设备百分比 = -1 def 添加内核(self, 名称, 设备, 持续时间): 断言 self.设备类型 == 设备类型.CPU self.内核.添加(内核(名称, 设备, 持续时间)) def 添加 CPU 子进程(self, 儿童): 添加一个类型为 FunctionEvent 的 CPU 子进程。 应该只向事件添加直接子进程, 正确地报告了自我 CPU 时间。 """ 断言 self.设备类型 == 设备类型.CPU 断言 isinstance(儿童, 函数事件) 断言 儿童.设备类型 == 设备类型.CPU self.cpu 子节点.添加(儿童) def 设置 cpu 父节点(self, 父节点): 设置类型为 FunctionEvent 的 CPU 直接父节点。 一个分析函数事件应该只有一个 CPU 父级,这样子级的范围区间才能完全包含在父级的范围内。我们使用这种连接来判断事件是否来自顶层操作。 我们使用这个连接来判断事件是否来自顶层操作。 我们使用这个连接来判断事件是否来自顶层操作。 """ 断言 self.设备类型 == 设备类型.CPU 断言 isinstance(父亲, 函数事件) 断言 父亲.设备类型 == 设备类型.CPU self.cpu 父 = 父亲 异步事件没有子项,在计算“self”时不使用 其他事件的指标,只有总 CPU 时间 @property def self_cpu_memory_usage(self): 如果 self.是否异步 或者 self.设备类型 != 设备类型.CPU: 返回 0 返回 self.CPU 内存使用率 - 总和( 儿童.CPU 内存使用率 for 儿童 self.cpu_children ) @property def 自定义设备内存使用(self): 如果 self.是否异步 或者 self.设备类型 != 设备类型.CPU: 返回 0 返回 self.设备内存使用率 - 总和( 儿童.设备内存使用率 for 儿童 self.子进程 CPU ) @property @deprecated( "`self_cuda_memory_usage` 已弃用。请使用 `self_device_memory_usage` 代替。", 分类=未来警告, ) def self_cuda_memory_usage(self): # 将要弃用 返回 self.self_device_memory_usage @property def cpu_time_total(self): 如果 self.设备类型 == 设备类型.CPU: 返回 self.时间范围.elapsed_us() 否则: 返回 0 @property def self_cpu_time_total(self): 如果 self.是否异步 或者 self.设备类型 != 设备类型.CPU: 返回 0 返回 self.cpu_time_total - 总和( 儿童.cpu_time_total for 儿童 self.cpu_children ) @property def device_time_total(self): 如果 self.是否异步 或者 self.use_device: 返回 0 如果 self.设备类型 == 设备类型.CPU: 如果 self.是遗留的: 考虑子操作中的内核 返回 总和(kinfo.持续时间 for kinfo self.内核) + 总和( 沉浸式翻译.设备时间总计 for ch self.cpu_children ) 否则: 每个遗留的 CPU 事件都有一个(假的)内核 返回 总和(kinfo.持续时间 for kinfo self.内核) 否则: 断言 self.设备类型 [ 设备类型.CUDA, 设备类型.私有用途 1, 设备类型.翻译技术国际会议, ] 返回 self.时间范围.已过时间(微秒)() @property @deprecated( "`cuda_time_total` 已弃用。请使用 `device_time_total` 代替。", 分类=未来警告, ) def cuda 总时间(self): 将被弃用 返回 self.设备时间总计 @property def 自身设备总时间(self): 如果 self.是否异步 或者 self.use_device: 返回 0 如果 self.设备类型 == 设备类型.CPU: 返回 self.设备时间总计 - 总和( 儿童.设备时间总计 for 儿童 self.cpu 子进程 ) 否则: 断言 self.设备类型 [ 设备类型.CUDA, 设备类型.私有用途 1, 设备类型.翻译技术国际会议, ] 返回 self.设备时间总计 @property @deprecated( "`self_cuda_time_total` 已弃用。请使用 `self_device_time_total` 代替。", 分类=未来警告, ) def self_cuda_time_total(self): # 将要弃用 返回 self.self_device_time_total @property def (self): 返回 self.名称 def __repr__(self): 设备名称 = self.使用设备 设备时间 = self.设备时间字符串 设备内存使用率 = self.设备内存使用率 返回 ( f"<函数事件 id="{self.id}名称={self.名称}超载名称={self.重载名称} " fdevice_type={self.设备类型} node_id={self.node_id} cpu_time={self.cpu_time_str} " fstart_us={self.时间范围.开始} end_us={self.时间范围.结束} " f"cpu_children={str[儿童.标识符 for 儿童 self.cpu_children])} {设备名称}时间={device_time} " f名称={self.名称}线程={self.线程}输入形状={str(self.输入形状)} " fcpu_memory_usage={self.CPU 内存使用率} {设备名称}_memory_usage={设备内存使用} " f"is_async={self.是否异步} is_remote={self.远程}序列号={self.序列号}是否为旧版本={self.是遗留的}> " ) 功能事件平均(格式化时间混合类): 用于计算多个 FunctionEvent 对象统计数据的平均值。 def __init__(self) -> : self.: 可选[str] = self.数量: 整型 = 0 self.node_id: 整型 = 0 self.是否异步: 布尔值 = self.远程: 布尔值 = self.use_device: 可选[str] = self.总 CPU 时间: 整型 = 0 self.总设备时间: 整型 = 0 self.self_cpu_time_total: 整型 = 0 self.自定义设备时间总计: 整型 = 0 self.输入形状: 可选[列表[列表[int]]] = self.重载名称: 可选[str] = self.: 可选[列表] = self.范围: 可选[int] = self.CPU 内存使用率: 整型 = 0 self.设备内存使用: 整型 = 0 self.自定义 CPU 内存使用率: 整型 = 0 self.自定义设备内存使用率: 整型 = 0 self.CPU 子进程: 可选[列表[函数事件]] = self.cpu_parent: 可选[函数事件] = self.设备类型: 设备类型 = 设备类型.CPU self.是遗留的: 布尔值 = self.洛普斯: 整型 = 0 def 添加(self, other): 如果 self. : # 第一个被记录为 FunctionEventAvg 部分的功能,传播 # 字段。 self. = other. self.node_id = other.节点 ID self.是否异步 = other.是否异步 self.是否远程 = other.远程 self.cpu 父节点 = other.cpu 父节点 self.cpu 子节点 = other.CPU 子节点 self.重载名称 = other.重载名称 self.输入形状 = other.输入形状 self. = other. self.范围 = other.范围 self.设备类型 = other.设备类型 self.是否为旧版本 = other.是否为旧版本 self.使用设备 = other.使用设备 self.用户标注 = other.用户标注 断言 isinstance(other, (函数事件, 功能事件平均)) 断言 other. == self. self.cpu_time_total += other.cpu_time_total self.设备时间总计 += other.设备时间总计 self.自身 CPU 总时间 += other.自身 CPU 总时间 self.自定义设备时间总计 += other.自定义设备时间总计 self.CPU 内存使用率 += other.CPU 内存使用率 self.设备内存使用率 += other.设备内存使用率 self.自身 CPU 内存使用率 += other.自身 CPU 内存使用率 self.自身设备内存使用率 += other.自身设备内存使用率 self.计算 += other.计算 如果 self.混乱 : self.混乱 = other.混乱 如果...否则 other.混乱 : self.flops += other.flops 返回 self def __iadd__(self, other): 返回 self.添加(other) def __repr__(self): 设备名称 = "cuda" 如果 self.使用设备 否则 self.使用设备 self_device_time = self.自定义设备时间总计字符串 device_time = self.设备时间字符串 设备内存 = self.设备内存使用率 返回 ( f"<函数事件平均 key="{self.}自身 CPU 时间={self.自身 CPU 时间总字符串} cpu_time={self.cpu_time_str} " fself_{设备名称}_time={自定义设备时间} {设备名称}_时间={device_time}输入形状={str(self.输入形状)} " f"CPU 内存使用量="{self.CPU 内存使用率} {设备名称}_内存使用量={设备内存}> )
[文档]class 字符串表(defaultdict): def __missing__(self, key): # manage cases like 't' (demangled to 'unsigned short') separately, # for now simply check the length to avoid unexpected results for # the short sequences self[key] = torch._C._demangle(key) if len(key) > 1 else key return self[key]
[文档]类 MemRecordsAcc: 用于访问区间内 mem_records 的加速结构。 def __init__(self, mem_records): self._mem_records = mem_records self._start_nses: 列表[int] = [] self._indices: 列表[int] = [] if len(mem_records) > 0: tmp = sorted([(r[0].start_ns(), i) for i, r in enumerate(mem_records)]) self._start_nses, self._indices = zip(*tmp) # 忽略赋值类型
[文档] def in_interval(self, start_us, end_us): r""" 返回给定时间间隔内的所有记录 为了保持向后兼容性,将函数中的 us 转换为 ns """ start_idx = bisect.bisect_left(self._start_nses, start_us * 1000) end_idx = bisect.bisect_right(self._start_nses, end_us * 1000) for i in range(start_idx, end_idx): yield self._mem_records[self._indices[i]]
def 过滤栈条目
(条目): 过滤后的条目 = [ (autograd/__init__, "_make_grads"), ("autograd/__init__", "向后"), ("torch/tensor", "向后"), ("_internal/common_utils", "prof_callable"), ("_internal/common_utils", "prof_func_call"), ("_internal/common_utils", prof_meth_call), ] 返回 所有( (f[0] 条目 f[1] 条目) for f 过滤条目) 内存事件名称 = "[内存]" 内存不足事件名称 = "[内存不足]" def _filter_name(名称): # 忽略以下工具操作 过滤后的名称 = [ 内存事件名称, 仅用于顶级内存事件 内存不足事件名称, "分析器::_record_function_enter", "分析器::_record_function_enter_new", "profiler::_记录函数退出", "aten::是否为叶子节点", "aten::输出 nr", "aten::_版本", ] 返回 名称 过滤后的名称 # 解析名称,可选地重新编写提供的事件名称, # with_wildcard - 是否用通配符替换某些编号的事件名称 # 用通配符名称将它们聚合到分析器表中 输出 def 重写名称(名称, 带通配符=False): 字符串表 = 字符串表() 名称 = 字符串表[名称] 如果 带通配符: 如果 名称.以...开头("ProfilerStep#"): 名称 = "ProfilerStep*" 返回 名称 def _build_table( 活动, 排序=, 标题=, 行限制=100, 最大源列宽度=75, 最大名称列宽度=55, 最大形状列宽=80, 带有 FLOPS=False, 用户内存=False, 仅顶级事件=False, ): """打印事件摘要(可以是 FunctionEvent 或 FunctionEventAvg 的列表)。""" 如果 长度(活动) == 0: 返回 请提供需要翻译的文本 设备时间存在 = 任何(事件.自定义设备时间总计 > 0 for 事件 活动) 设备内存存在 = 任何(事件.自定义设备内存使用情况 > 0 for 事件 活动) 使用设备 = 活动[0].使用设备 在私有用途 1 设备上运行,带有分析器但未启用 ProfilerActivity.PrivateUse1 也可以捕获 privateuse1 内存使用情况。 这里只需检查 has_privateuse1_time,如果不使用设备。 如果 使用设备 has_device_time: 抛出 运行时错误("use_device 为空,但存在设备性能数据。") 存在输入形状 = 任何( (事件.输入形状 长度(事件.输入形状) > 0) for 事件 事件 ) 存在超载名称 = 任何( (事件.重载名称 长度(事件.重载名称) > 0) for 事件 事件 ) 如果 按顺序 : 事件 = 事件列表( 排序( 活动, =Lambda 函数 事件: getattr( 事件, 排序.替换(cuda, "设备") .替换(XPU, "设备") .替换(privateuse1, "设备"), ), reverse=True, ), use_device=use_device, 用户内存=用户内存, 带有 FLOPS=带有 FLOPS, ) 名称列宽度 = 最大值(长度(事件.) for 事件 活动) + 4 如果 最大名称列宽度 : 名称列宽度 = 最小值(名称列宽度, 最大名称列宽度) 形状列宽度 = 最大值(长度(str(事件.输入形状)) for 事件 活动) + 4 如果 最大形状列宽度 : 形状列宽度 = 最小值(形状列宽度, 最大形状列宽) 默认列宽度 = 12 flops 列宽度 = 默认列宽度 源列宽 = = [ 事件. for 事件 事件 如果 事件. 长度(事件.) > 0 ] 是否堆叠 = 长度() > 0 如果 has_stack: src_column_width = ( 最大值(最大值(长度(条目) for 条目 ) for ) + 4 ) 如果 max_src_column_width : src_column_width = 最小值(源列宽, 最大源列宽度) 头部 = [名称] 如果 有重载名称: 头部信息.添加(重载名称) 头部 += [ "自用 CPU%", "自用 CPU", "CPU 总使用率%", "CPU 总使用", "CPU 平均时间", ] 设备名称 = use_device.() 如果 使用设备 否则 "None" 如果 有设备时间: 头部信息.扩展( [ f"自我"{设备名称}", f"自我"{设备名称}百分号, f"{设备名称}总计, f"{设备名称}时间平均, ] ) 如果 用户内存: 头部信息.扩展( [ "CPU 内存", "自身 CPU 内存", ] ) 如果 使用设备 有设备内存: 头部信息.扩展( [ f"{设备名称}记忆", f自我{设备名称}记忆", ] ) 头部信息.添加(呼叫次数) 只有当任何事件具有有效的(>= 0)节点 ID 时才附加节点 ID 附加节点 ID = 任何(事件.节点 ID != -1 for 事件 活动) 如果 附加节点 ID: 头部信息.添加(节点 ID) # 需要使用列表因为 nonlocal 仅在 Py3 中可用... 空间大小 = 2 行格式列表 = [输入文本翻译为简体中文为:""] 分隔符列表 = [输入文本翻译为简体中文为:""] 行长度列表 = [-间距大小] def 添加列(填充, 目录=">"): 行格式列表[0] += ( "{: " + 目录 + str(填充) + "}" + (输入文本为空,请提供需要翻译的文本 * 间距大小) ) 表头分隔符列表[0] += "—" * 填充 + (输入文本为空,请提供需要翻译的文本 * 间距大小) 行长度列表[0] += 填充 + 间距大小 def 自动缩放浮点运算次数(洛普斯): FLOPs 头 = [ FLOPs, "千亿次浮点运算", "百万亿次浮点运算", "十亿次浮点运算", "万亿亿次浮点运算", "PFLOPs", ] 断言 flops > 0 log_flops = 最大值(0, 最小值(数学.log10(洛普斯) / 3, float(长度(flop_headers) - 1))) 断言 log_flops 0 log_flops < 长度(flop_headers) 返回 (pow(10, (数学.向下取整(log_flops) * -3.0)), 滚动标题[int(记录浮点运算)]]) 添加列(列名宽度) 如果 带有重载名称: 添加列(列名宽度) for _ 头部信息[1 + has_overload_names ] add_column(DEFAULT_COLUMN_WIDTH) 如果 输入形状: 头部信息.添加(输入形状) 添加列(形状列宽度) 如果 has_stack: 头部信息.添加("源位置") 添加列(源列宽度, 目录文本="<") 如果 带有 FLOPS: # 自动扩展浮点运算符标题 原始浮点运算 = [事件.浮点运算 for 事件 事件 如果 事件.flops > 0] 如果 长度(raw_flops) != 0: (flops_scale, flops_header) = 自动调整 FLOPS(最小值(原始 FLOPS)) 头部信息.添加(f总计{FLOPS 标题}") 添加列(洛普斯列宽) 否则: 带浮点运算 = 找不到任何有效的浮点操作 行格式 = 行格式列表[0] 表头分隔符 = 列表分隔符[0] 行长度 = 行长度列表[0] 添加列 = # 类型:忽略[赋值] 因为非 local 是 Py3 才有的... 结果 = 输入文本为空,请提供需要翻译的文本 def 添加(s): 结果.添加(s) 结果.添加("输入文本翻译为简体中文为:\n") 是的,结尾也要有换行符 累计 CPU 时间总和 = 0 累计设备时间总和 = 0 for 事件 活动: 总 CPU 时间 += 事件.自身 CPU 总时间 如果 事件.设备类型 == 设备类型.CPU 事件.是遗留的: 在旧版分析器中,内核信息存储在 CPU 事件中 总设备时间 += 事件.自设备时间总和 如果...否则 ( 事件.设备类型 [ 设备类型.CUDA, 设备类型.私有用途 1, 设备类型.翻译技术国际会议, ] 事件.用户标注 ): 在运动分析器中,存在具有正确设备类型(例如 CUDA)的事件 累加设备总时间 += 事件.设备总时间 实际打印 如果 标题 : 添加("=" * 行长度) 添加(标题) 如果 仅顶级事件: 添加("=" * 行长度) 添加("本报告仅显示顶级操作统计信息") 添加(表头分隔符) 添加(行格式.格式(*头部信息)) 添加(表头分隔符) def 裁剪路径(路径, 源列宽度): 如果 长度(路径) > 源列宽度: 偏移 = 长度(路径) - 源列宽度 路径 = 路径[偏移量] 如果 长度(路径) > 3: 路径 = "..." + 路径[3] 返回 路径 事件限制 = 0 for 事件 活动: 如果 事件限制 == 行限制: 断开 如果 仅顶级事件 事件.cpu 父级 : continue 否则: 事件限制 += 1 名称 = 事件. 如果 最大名称列宽度 长度(名称) 最大名称列宽度 - 3: 名称 = 名称[ (最大名称列宽度 - 3)] + "..." 事件.自身 CPU 百分比 = _format_time_share( 事件.self_cpu_time_total, 自身 CPU 总时间之和 ) 事件.总 CPU 百分比 = ( _format_time_share(事件.cpu_time_total, sum_self_cpu_time_total) 如果 事件.是否异步 否则 0 ) row_values = [名称] 如果 has_overload_names: 重载名称 = 事件.重载名称 如果 ( 最大列名宽度 长度(重载名称) 最大列名宽度 - 3 ): 重载名称 = 重载名称[ (最大列名宽度 - 3)] + "..." 行值 += [重载名称] 行值 += [ # 自身 CPU 总百分比,异步事件为 0。 事件.self_cpu_percent, 事件.self_cpu_time_total_str, # 自身 CPU 总 # CPU 总百分比,0 表示异步事件。 事件.总 CPU 百分比, 事件.cpu_time_total_str, # CPU 总览 事件.cpu_time_str, # CPU 平均时间 ] 如果 has_device_time: 事件.总设备百分比 = _format_time_share( 事件.自设备总时间, 自设备总时间之和 ) 行值.扩展( [ 事件.自定义设备时间总计字符串, # 设备时间总计百分比 事件.总设备百分比, 事件.设备时间总计字符串, 事件.设备时间字符串, # 设备时间平均值 ] ) 如果 用户内存: 行值.扩展( [ # CPU 内存总计 _format_memory(事件.CPU 内存使用率), # 自定义 CPU 内存总量 _format_memory(事件.自定义 CPU 内存使用率), ] ) 如果 使用设备 是否有设备内存: 行值.扩展( [ 设备内存总量 _format_memory(事件.设备内存使用), 自定义设备内存总量 _format_memory(事件.自定义设备内存使用情况), ] ) 行值.添加( 事件.数量, 调用次数 ) 如果 添加节点 ID: 行值.添加(事件.node_id) 如果 是否有输入形状: 行值.添加(str(事件.输入形状`):`形状列宽]) 如果 带有 FLOPS: 如果 事件.flops <= 0: 行值.添加(“—”) 否则: 行值.添加(f"{事件.浮点运算次数 * 混合缩放:8.3f}") # type: ignore[possibly-undefined] 如果 有栈: 源字段 = 请提供需要翻译的文本 如果 长度(事件.) > 0: 源字段 = 剪裁路径(事件.[0] 源列宽) 行值.添加(源字段) 添加(行格式.格式(*行值)) 如果 是否有堆栈: 空标题 = [输入文本翻译为简体中文为:""] * (长度(头部信息) - 1) for 条目 事件.[1] 添加( 行格式.格式( *(空标题 + [裁剪路径(条目, 源列宽)]]) ) ) 空标题.添加(输入文本翻译为简体中文为:"") 添加(行格式.格式(*空标题)) 添加(分隔符) 添加(f"总 CPU 时间:"{格式化时间(自身 CPU 时间总和)}") 如果 设备时间存在: 添加( f自我{use_device.() 如果 使用设备 否则 } " f总时间:{格式化时间(累加设备时间总计)}" ) 返回 输入文本翻译为简体中文为:"".连接(结果)

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源