快捷键

torch.autograd.profiler 源代码

# mypy: 允许未类型化定义
导入 uuid
来自 集合 导入 defaultdict
来自 collections.abc 导入 迭代器
来自 dataclasses 导入 数据类
来自 时间 导入 perf_counter_ns
来自 打字 导入 任何, 可选
来自 警告 导入 警告

导入 火炬
导入 torch.cuda
来自 torch._C 导入 _获取私有用途 1 后端名称
来自 torch._C._profiler 导入 _实验配置
来自 torch 自动微分 导入 (
    _disable_profiler,
    启用分析器,
    动态步进,
    准备分析器,
    _ProfilerResult,
    支持的活动,
    切换集合动态,
    设备类型,
    动作可用,
    分析器活动,
    分析器配置,
    分析器状态,
)
来自 torch.autograd.profiler_util 导入 (
    _filter_name,
    _filter_stack_entry,
    _rewrite_name,
    事件列表,
    函数事件,
    内存事件名称,
    内存记录账户,
    内存不足事件名称,
)
来自 torch.futures 导入 未来


全部 = [
    "配置文件",
    "记录函数",
    "发出 itt",
    "emit_nvtx",
    "load_nvprof",
    "EnforceUnique",
    "parse_nvprof_trace",
    "运动步数追踪器",
    "事件列表",
    "功能事件",
    "内存记录账户",
]

尝试:
    Python >= 3.2 可用
    来自 contextlib 导入 上下文装饰器 as _上下文装饰器
除了 导入错误:
    导入 functools

     _上下文装饰器:  # 类型:忽略[重新定义]
        def __进入__(self):
            抛出 未实现异常

        def __退出__(self, 异常类型, 异常值, 异常堆栈):
            抛出 未实现异常

        def __调用__(self, 函数):
            @functools.包装(函数)
            def 包装(*参数, **kwargs):
                替换为 self:
                    返回 函数(*参数, **kwargs)

            返回 包装


全局 Python 状态 - 是否已启用分析器
适用于快速 Python 检查以减少延迟
_is_profiler_enabled: 布尔值 = 


def _set_is_profiler_enabled(启用: bool):
    全局 _is_profiler_enabled
    _is_profiler_enabled = 启用


def _run_on_profiler_start():
    _set_is_profiler_enabled(True)


def 在分析器停止时运行():
    设置是否启用分析器(False)


@dataclass
 分析器统计信息:
    开发者用于捕获问题/回归的分析器计时和统计信息
    性能分析窗口持续时间(秒): 浮点数 = 0
    事件数量: 整型 = 0
    分析器准备调用持续时间(微秒): 整型 = 0
    分析器启用调用持续时间(微秒): 整型 = 0
    profiler_disable_call_duration_us: 整型 = 0
    parse_kineto_call_duration_us: 整型 = 0
    function_events_build_tree_call_duration_us: 整型 = 0


[文档] 个人资料: 管理自动微分分析器状态并保存结果摘要的上下文管理器。 在底层,它只是记录 C++中执行函数的事件, 并将这些事件暴露给 Python。您可以将任何代码封装在其中,它将 只报告 PyTorch 函数的运行时间。 注意:分析器是线程局部变量,并且会自动传播到异步任务中。 参数: enabled (bool, 可选): 将此设置为 False 将使此上下文管理器成为空操作。 use_cuda (bool, 可选): 启用 CUDA 事件的计时。 使用 cudaEvent API。 (将弃用) use_device (str, 可选): 启用设备事件的计时。 使用 CUDA 时,每个张量操作会增加大约 4us 的开销。 有效的设备选项为 'cuda'、'xpu'、'mtia' 和 'privateuseone'。 record_shapes(布尔值,可选):如果启用了形状记录,将收集有关输入维度的信息。这允许用户查看哪些 关于输入维度的信息将被收集。这允许用户查看哪些 内部已使用维度进行分组 使用 prof.key_averages(group_by_input_shape=True)。请注意, 形状记录可能会扭曲您的配置文件数据。建议 使用带和不带形状记录的独立运行来验证时间。 最可能的是,对于最底层事件,偏差将是可以忽略不计的(在这种情况下) 嵌套函数调用的总 自我 CPU 时间可能因形状而人为增加 集合。 with_flops (bool, 可选): 如果设置了 with_flops,则分析器将根据操作符的输入形状估算 FLOPs(浮点运算)值。 这允许估计硬件性能。目前,此选项仅适用于矩阵乘法和 2D 卷积操作符。 这允许估计硬件性能。目前,此选项仅适用于矩阵乘法和 2D 卷积操作符。 目前,此选项仅适用于矩阵乘法和 2D 卷积操作符。 profile_memory (bool, 可选): 跟踪张量内存分配/释放。 with_stack (bool, 可选): 记录操作符的源信息(文件和行号)。 with_modules (bool): 记录模块层次结构(包括函数名)。 对应操作符的调用栈。例如,如果模块 A 的前向调用 模块 B 的前向包含一个 aten::add 操作, 然后 aten::add 的模块层次结构是 A.B 注意,目前这种支持仅存在于 TorchScript 模型中 而不是急切模式模型中。 use_kineto (bool, optional): 实验性,启用 Kineto 分析器进行性能分析。 use_cpu (bool, optional): 分析 CPU 事件;设置为 ``False`` 需要 ``use_kineto=True``,可用于降低仅 GPU 分析的开销。 ``use_kineto=True`` 和可以用于降低仅 GPU 分析的开销。 experimental_config (_ExperimentalConfig) : 一组实验性选项 由 Kineto 等分析库使用。请注意,不保证向后兼容性。 acc_events (bool): 启用在多个分析周期内累积 FunctionEvents .. 警告: 启用内存分析或源归属将导致分析器增加额外的负担 overhead ..警告: 此上下文管理器不应递归调用,即不允许嵌套 实例 ..警告: 由于一些 CUDA 多进程的限制(multiprocessing-cuda-note_), 不能使用 ``use_device = 'cuda'`` 的配置来使用分析器进行基准测试 如果您想对数据加载进行基准测试,请使用 ``num_workers > 0`` 的 DataLoaders。 请使用 `use_device = None` 或 `num_workers = 0`。 示例: >>> # xdoctest: +SKIP >>> # xdoctest: +REQUIRES(env:TORCH_DOCTEST_AUTOGRAD_PROFILER) >>> x = torch.randn((1, 1), requires_grad=True) >>> with torch.autograd.profiler.profile() as prof: >>> for _ in range(100): # 任何正常的 Python 代码,真的! >>> y = x ** 2 >>> y.backward() >>> # 注意:为了简洁,一些列被删除 >>> 打印(prof.key_averages().table(sort_by="self_cpu_time_total")) ----------------------------------- --------------- --------------- --------------- 名称 自身 CPU 总时间 平均 CPU 时间 调用次数 ----------------------------------- --------------- --------------- --------------- mul 32.048ms 32.048ms 200 pow 27.041ms 27.041ms 200 PowBackward0 9.727ms 55.483ms 100 torch::autograd::AccumulateGrad 9.148ms 9.148ms 100 torch::autograd::GraphRoot 691.816us 691.816us 100 ----------------------------------- --------------- --------------- --------------- """ def __init__( self, 启用=True, *, use_cuda=False, # 已废弃 use_device=, record_shapes=False, 带有_flops=False, 配置内存=False, 带有_stack=False, 带有_modules=False, 使用 kineto=False, 使用 CPU=True, 实验性配置=, 事件统计=False, 自定义跟踪 ID 回调=, ): self.启用: 布尔值 = 启用 如果 self.启用: 返回 self.使用 CUDA = 使用 CUDA 如果 self.使用 CUDA: 警告( "属性 `use_cuda` 将很快被弃用," "请使用 ``use_device = 'cuda'`` 代替。", 未来警告, 栈级别=2, ) self.use_device: 可选[str] = "cuda" 否则: self.use_device = 使用设备 # TODO 考虑将 _function_events 改为具有大小限制的数据结构 self._function_events: 可选[事件列表] = self._旧功能事件: 可选[事件列表] = # 函数事件处理是延迟进行的 self._需要处理 = self.输入 = self.记录形状 = 记录形状 self.与 flops 一起 = 带浮点运算 self.记录形状 |= self.带浮点运算 self.内存分析 = 用户内存 self.带栈 = 带栈 self.带模块 = 带有模块 self.使用 CPU = 使用 CPU self.访问事件 = 访问事件 如果 实验配置 : 实验配置 = _实验配置() self.实验配置 = 实验配置 self.动态结果: 可选[_ProfilerResult] = self.性能分析开始时间(纳秒) = 0 self.性能分析结束时间(纳秒) = 0 self.统计 = 分析器统计() self.自定义跟踪 ID 回调 = 自定义跟踪 ID 回调 self.跟踪 ID = 请提供需要翻译的文本 如果 self.使用 CPU: 断言 ( 使用 Kineto ), 仅支持与 Kineto(use_kineto=True)一起使用设备-only 事件 如果 self.使用设备 : 有效的设备选项 = [cuda, XPU, 翻译, hpu] 如果 _get_privateuse1_backend_name() != 私有用途一: 有效的设备选项.添加(_get_privateuse1_backend_name()) 如果 self.使用设备 有效的设备选项: 警告(f"The {self.使用设备}不是一个有效的设备选项。) self.使用设备 = 如果 self.使用设备 == "cuda" PyTorch.cuda.是否可用(): 警告("CUDA 不可用,禁用 CUDA 分析") self.使用 CUDA = self.使用设备 = 如果 self.使用设备 == xpu PyTorch.xpu.是否可用(): 警告("XPU 不可用,禁用 XPU 分析") self.使用设备 = 如果 self.使用设备 == "hpu" ( 有属性(PyTorch, hpu) PyTorch.硬件加速处理器.是否可用() ): 警告("HPU 不可用,禁用 HPU 分析") self.使用设备 = self.动作集 = 设置() 如果 self.使用 CPU: self.动作库.添加(分析器活动.CPU) self.分析器类型 = 分析器状态.运动控制 如果 self.使用设备 == cuda: 如果 使用 KINETO 或者 分析器活动.CUDA 支持的活动(): 断言 self.使用 CPU, "Legacy CUDA 分析需要使用 use_cpu=True" self.分析器类型 = 分析器状态.KINETO_GPU_FALLBACK 否则: self.动作库.添加(分析器活动.CUDA) 如果...否则 self.使用设备 == XPU: 断言 ( 使用 kineto 分析器活动.XPU 支持的活动() ), "XPU 历史版本分析不支持。在 XPU 设备上需要使用 use_kineto=True。" self.动作.添加(分析器活动.XPU) 如果...否则 self.使用设备 == 翻译: 断言 ( 使用 kineto 分析器活动.MTIA 支持的活动() ), "不支持遗留 MTIA 配置文件。在 MTIA 设备上需要使用_kineto=True。" self.运动活动.添加(分析器活动.翻译技术国际会议) 如果...否则 self.使用设备 == hpu: 断言 ( 使用 kineto 分析器活动.HPU 支持的活动() ), "HPU 的 Legacy HPU 分析不受支持。需要在 HPU 设备上使用 use_kineto=True。" self.kineto 活动.添加(分析器活动.HPU) 如果...否则 self.使用设备 self.使用设备 != 私用一: 如果 ( 使用动量 或者 分析器活动.私有用途 1 支持的活动() ): 断言 ( self.使用 CPU ), "Legacy custombackend 性能分析需要使用 use_cpu=True" self.分析器类型 = 分析器状态.KINETO_PRIVATEUSE1_FALLBACK 否则: self.运动活动.添加(分析器活动.私用 1) 断言 ( 长度(self.动作) > 0 ), "未指定分析器的活动" def 默认跟踪 ID(self): 生成一个 UUID uuid 原始 = uuid.uuid4() 返回 f"{uuid 原始.int:032X}" def 创建跟踪 ID(self): 如果 self.自定义跟踪 ID 回调: 返回 self.自定义跟踪 ID 回调() 返回 self.默认跟踪 ID() def 配置(self, 创建跟踪 ID=False): # 仅在准备跟踪时生成新的跟踪 ID,而不是开始跟踪时 如果 创建跟踪 ID: 跟踪 ID = self.创建跟踪 ID() self.trace_id = trace_id 返回 分析器配置( self.profiler_kind, self.record_shapes, self.用户内存, self.带栈, self.带 FLOPs, self.带模块, self.实验配置, self.跟踪 ID, ) def __进入__(self): 如果 self.启用: 返回 如果 self.输入: 抛出 运行时错误("分析器上下文管理器不可重入") self._准备跟踪() self._开始跟踪() 返回 self def 准备追踪(self): self.进入 = 真实 t0 = 性能计数器纳秒() 准备分析器(self.配置(创建跟踪 ID=True), self.动作类型) t1 = 性能计数器纳秒() self._统计数据.分析器准备调用持续时间微秒 = int((t1 - t0) / 1000) def _开始跟踪(self): self.输入 = 真实 在分析器启动时运行() t0 = 性能计数器纳秒() 启用分析器(self.配置(创建跟踪 ID=False), self.动作记录) t1 = 性能计数器纳秒() self.统计.启用分析器调用持续时间微秒 = int((t1 - t0) / 1000) self.分析开始时间(纳秒) = t1 def __退出__(self, 异常类型, 异常值, 异常堆栈): 如果 self.启用: 返回 如果 self.使用设备 有属性(PyTorch, self.使用设备): 设备模块 = getattr(PyTorch, self.使用设备) 如果 有属性(设备模块, "同步"): 设备模块.同步() 如果 self.函数事件 self.访问事件: self.旧函数事件 = self.函数事件 self.功能事件 = self.需要处理 = 真实 t0 = 性能计数器纳秒() self.动态结果 = _disable_profiler() t1 = 性能计数器纳秒() self._统计数据.profiler_disable_call_duration_us = int((t1 - t0) / 1000) self.剖析结束时间纳秒 = t0 _在分析器停止时运行() self._统计数据.性能分析窗口持续时间(秒) = ( (self.性能分析结束时间(纳秒) - self.性能分析开始时间(纳秒)) * 1.0 / 1000000000 ) 如果我们计划累积事件,我们应该立即后处理函数事件 以保留多个启动/停止调用之间的状态 如果 self.acc_events: self._确保函数事件() 返回 def __repr__(self): 如果 self.待处理: self.确保功能事件() 如果 self.功能事件 : 返回 "<未完成的 torch.autograd.profile>" 返回 表示(self._函数事件) def __str__(self): 如果 self.需要处理: self.确保功能事件() 如果 self._函数事件 : 返回 "<未完成的 torch.autograd.profile>" 返回 str(self._函数事件) def _确保函数事件(self): 需要时懒加载处理函数事件 如果 self.函数事件 : 返回 self.需要处理 = t0 = 性能计数器纳秒() 解析结果 = 输入文本为空,请提供需要翻译的文本 如果 self.动力学结果: 解析结果 = self._解析 kineto 结果(self.kineto 结果) t1 = perf_counter_ns() self._stats.parse_kineto_call_duration_us = int((t1 - t0) / 1000) self._function_events = 事件列表( 解析结果, 使用设备=self.使用设备, 配置内存=self.用户内存, 带有 FLOPS=self.带有 FLOPS, ) t0 = 性能计数器纳秒() self.功能事件.构建树() t1 = 性能计数器纳秒() self._统计数据.函数事件构建树调用持续时间微秒 = int((t1 - t0) / 1000) self._stats.事件数量 = 长度(self.功能事件) 如果 self.旧功能事件 self.事件: for 事件 self._old_function_events: self._function_events.添加(evt) self._old_function_events = 如果 self.功能事件 : 抛出 运行时错误(分析器未完成运行) @property def 功能事件(self): 如果 self._功能事件 或者 self.待处理: self.确保功能事件() 返回 self.功能事件 def 表格( self, 排序=, 行限制=100, 最大源列宽度=75, 最大名称列宽度=55, 最大形状列宽度=80, 标题=, 仅显示顶级事件=False, ): self.确保函数事件() 断言 self.函数事件 返回 self.功能事件.表格( 按此排序=排序, 行限制=行限制, 最大源列宽度=最大源列宽度, 最大名称列宽度=最大名称列宽度, 最大形状列宽度=最大形状列宽, 标题=标题, 仅顶级事件=仅顶级事件, ) 表格.__doc__ = 事件列表.表格.__doc__
[文档] def export_chrome_trace(self, path): """ 将收集到的跟踪信息导出为 Chrome JSON 格式。如果启用了 kineto,则只导出 调度中的最后一个周期。 """ 如果 kineto 可用() self.kineto_results.save(path) # 忽略[联合属性] else: self._ensure_function_events() return self._function_events.export_chrome_trace(path) # type: ignore[union-attr]
导出 Chrome 跟踪
.__doc__ = 事件列表.导出 Chrome 跟踪.__doc__ def 导出堆栈(self, 路径: str, 指标: 字符串 = "self_cpu_time_total"): self._确保函数事件() 断言 self._函数事件 , 预期分析结果 断言 self.with_stack, "export_stacks()需要 with_stack=True" 返回 self._function_events.导出堆栈(路径, 指标) def 切换集合动态( self, 启用: bool, 活动: 迭代器[分析器活动] ): "" 切换当前分析实例的活动收集。 """ 返回 切换集合动态(启用, 设置(活动))
[文档] def key_averages( self, group_by_input_shape=False, group_by_stack_n=0, group_by_overload_name=False, ): self._ensure_function_events() assert self._function_events is not None, "Expected profiling results" return self._function_events.key_averages( group_by_input_shape, group_by_stack_n, group_by_overload_name )
key_averages.__doc__ = 事件列表
.关键平均数.__doc__
[文档] def 总平均(self): self._确保函数事件() assert self._function_events is not None, "预期分析结果" 返回 self._function_events 的总平均值()
总平均值
.__doc__ = 事件列表.总平均值.__doc__ @property def self_cpu_time_total(self): 返回在 CPU 上花费的总时间。 总时间是指所有事件的自时间总和。 """ self._ensure_function_events() 断言 self._函数事件 返回 self.功能事件.自身 CPU 总时间 def 解析 kineto 结果(self, 结果: _ProfilerResult): # result.events() 包含大部分事件 - PyTorch 操作级别和设备级别的事件 跟踪开始时间戳 = 结果.跟踪开始时间戳() 内存记录 = [ [事件, False] for 事件 结果.活动() 如果 事件.名称() == 内存事件名称 ] 内存记录 = [ 事件 for 事件 结果.事件列表() 如果 事件.名称() == 内存不足事件名称 ] 内存记录累计 = MemRecordsAcc(内存记录) def _CPU 内存使用(内存记录): 返回 ( 内存记录.字节数() 如果 内存记录.设备类型() [设备类型.CPU, 设备类型.MKLDNN, 设备类型.IDEEP] 否则 0 ) def 设备内存使用(内存记录): 返回 ( 内存记录.字节数() 如果 内存记录.设备类型() [设备类型.CUDA, 设备类型.私有用途 1, 设备类型.HIP] 否则 0 ) 创建并返回包含所有函数事件的 FunctionEvent 列表 创建了 2 个函数事件: all_function_events 包含与每个 kineto 事件相关的所有事件 所有功能事件 = 输入文本为空,请提供需要翻译的文本 # 前端功能事件包含 aten 或 torch 前端级别的 # 其关联 ID 为 0 前端功能事件 = 输入文本为空,请提供需要翻译的文本 设备对应映射: 字典[int, 列表[函数事件]] = {} 最大事件 ID = 0 for 动作事件 结果.活动(): 如果 过滤器名称(动作事件.名称()): continue rel_start_ns = kineto_event.开始命名空间() - 跟踪开始时间戳 相对结束时间戳 = 动态事件.结束时间戳() - 跟踪开始时间戳 绝对结束时间戳 = 运动事件.结束时间戳() CPU 内存使用率 = 0 设备内存使用率 = 0 如果 运动事件.设备类型() == 设备类型.CPU: # 查找对应的内存分配事件 for mem_record mem_records_acc.在区间内( 动作事件.开始纳秒() / 1000, 绝对结束纳秒 / 1000 ): CPU 内存使用率 += _CPU 内存使用率(内存记录[0]) 设备内存使用率 += 设备内存使用(内存记录[0]) 内存记录[1] = 真实 是否异步 = 动作事件.是否异步() 或者 ( 动作事件.开始线程 ID() != 动作事件.线程结束 ID() ) fe = 函数事件( id=动作事件.关联 ID(), 名称=_重写名称(名称=动作事件.名称(), 带通配符=True), 重载名称=动作事件.重载名称(), 跟踪名称=重写名称(名称=动作事件.名称(), 带通配符=False), 线程=动作事件.开始线程 ID(), 开始美国=相对开始 NS / 1000, 结束美国=相对结束 NS / 1000, 前方线程=动态事件.前线程 ID(), 输入形状=动态事件.形状(), 混凝土输入=动作事件.混凝土输入(), 输入参数=动作事件.输入参数(), =[ 条目 for 条目 动作事件.() 如果 过滤栈条目(条目) ] 范围=动作事件.范围(), use_device=self.use_device, CPU 内存使用率=CPU 内存使用率, 设备内存使用率=设备内存使用率, 异步=异步, 序列号=动作事件.序列号(), 设备类型=运动事件.设备类型(), 设备索引=运动事件.设备索引(), 设备资源 ID=动作事件.设备资源 ID(), flops=动作事件.洛普斯(), 是否为用户标注=动作事件.是否为用户标注(), ) max_evt_id = 最大值(max_evt_id, fe.id) 如果 fe.设备类型 == 设备类型.CPU 非阻塞.异步: 如果 self.使用设备 == 私用一: 私有用途 1 时间 = 动作事件.privateuse1 已用时间() 如果 privateuse1 时间 > 0: fe.添加内核(飞机.名称, 飞机.设备索引, 私有用途 1 时间) fe.是否为旧版本 = 真实 如果...否则 self.使用设备 == cuda: # 检查是否有 CUDA 时间作为后备方案 cuda 时间 = 动作事件.cuda 已用时间微秒() 如果 cuda 时间 > 0: fe.添加内核(fe.名称, fe.设备索引, cuda_time) fe.is_legacy = 真实 所有功能事件.添加(粒子) 关联 ID = 动作事件.链接关联 ID() 如果 corr_id > 0: 如果 corr_id 设备关联映射: 设备关联映射[关联 ID] = 输入文本为空,请提供需要翻译的文本 设备关联映射[关联 ID].添加(前端(FE)) 如果...否则 corr_id == 0: 前端功能事件.添加(fe) 否则: 抛出 运行时错误( f"获得负相关 id"{corr_id}在分析器后处理中 ) # 将设备内核和设备运行时(CPU)与 CPU 事件关联 for fe 前端功能事件: 如果 ( 前端缩写.设备类型 == 设备类型.CPU 前端.异步 前端.标识符 设备对应映射 ): for f_evt 设备校正映射[fe.id] 如果 ( f_evt.设备类型 == 设备类型.CUDA 或者 f_evt.设备类型 == 设备类型.私有用途 1 ): fe.添加内核( f_evt.名称, f_evt.设备索引, f_evt.时间范围.末端 - f_evt.时间范围.开始, ) 如果...否则 f_evt.设备类型 == 设备类型.CPU: 确保 CPU Kineto(例如设备运行时)事件的'thread'相关联 正确跟踪相应的 PyTorch 事件的'线程' 父亲和子代 f_evt.线程 = fe.线程 def 为内存事件创建函数事件(事件(evt)): rel_start_ns = evt.start_ns() - trace_start_ns = 功能事件( id=最大事件 ID, 名称=事件.名称(), 超载名称=输入文本翻译为简体中文为:"", 跟踪名称=, # 不在跟踪中输出 线程=事件.开始线程 ID(), 开始时间(美国)=相对开始纳秒 / 1000, 美国结束=开始关系 / 1000, # 无时长 前线程=事件.开始线程 ID(), 输入形状=[] =[] 范围=0, # RecordScope::FUNCTION 使用设备=self.使用设备, CPU 内存使用率=_CPU 内存使用率_(事件), 设备内存使用=设备内存使用(事件), 是否异步=False, 序列号=-1, 设备类型=设备类型.CPU, 设备索引=0, ) 返回 前端 输出顶级内存事件 for 内存记录 内存记录: 如果 内存记录条[1] 最大事件 ID += 1 前端 = 为内存事件创建函数事件(内存记录[0]) 所有函数事件.添加(fe) for 内存溢出记录 内存溢出记录列表: 最大事件 ID += 1 前端 = 为内存事件创建函数事件(内存溢出记录) 所有函数事件.添加(前端(FE)) 所有功能事件.排序( =Lambda 函数 事件: [事件.时间范围.开始, -事件.时间范围.结束] ) 返回 所有功能事件
[文档] 记录功能(_上下文装饰器): 代码块/函数运行自动微分分析器时添加标签的上下文管理器/函数装饰器。 只有启用 CPU 活动跟踪时,标签才会显示。 在追踪代码性能时非常有用。 参数: name (str): 分配给代码块的标签。 节点 ID(int):节点的 ID,用于分布式分析。在非分布式情况下未设置。 非分布式情况。 示例: >>> # xdoctest: +REQUIRES(env:TORCH_DOCTEST_AUTOGRAD_PROFILER) >>> x = torch.randn((1, 1), requires_grad=True) >>> 使用 torch.autograd.profiler.profile() 作为 prof: ... y = x ** 2 ... with torch.autograd.profiler.record_function("label-z"): # 标记该块 ... z = y ** 3 ... y.backward() ... >>> # xdoctest: +IGNORE_WANT >>> # 注意:为了简洁,删除了一些列 >>> 打印(prof.key_averages().table(sort_by="self_cpu_time_total")) ----------------------------------- --------------- --------------- --------------- 名称 自定义 CPU 总百分比 平均 CPU 时间 调用次数 ----------------------------------- --------------- --------------- --------------- pow 60.77% 47.470us 3 mul 21.73% 25.465us 2 PowBackward0 12.03% 121.891us 1 torch::autograd::AccumulateGrad 2.70% 6.324us 1 label-z 2.13% 12.421us 1 torch::autograd::GraphRoot 0.64% 1.503us 1 ----------------------------------- --------------- --------------- --------------- 自定义 CPU 总时间:234.344us CUDA 总耗时:0.000 微秒 """ def __init__(self, 名称: str, 参数: 可选[str] = ): self.名称: 字符串 = 名称 self.参数: 可选[str] = args # 是否在退出时运行记录函数的结束回调。 self.run_callbacks_on_exit: 布尔值 = 真实 # TODO: TorchScript 在此处忽略标准的类型注解。 # self.record: Optional["torch.classes.profiler._RecordFunction"] = None self.记录 = PyTorch.算子.标注( 可选["torch.classes.profiler._RecordFunction"] ) def __进入__(self): self.记录 = PyTorch.操作.分析器._record_function_enter_new( self.名称, self.args ) 返回 self def __退出__(self, 异常类型: 任何, exc_value: 任何, 跟踪回溯: 任何): 如果 self.run_callbacks_on_exit: 返回 # 本地变量由 TorchScript 需要,以将 Optional[T]精炼为 T 记录 = self.记录 断言 记录 # TODO: 启用 __torch_function__ 处理太慢 # 请参阅 https://github.com/pytorch/pytorch/issues/76410 如果 PyTorch.算子.是否正在脚本化(): 替换为 PyTorch._C.禁用 TorchFunction 子类(): PyTorch.操作.分析器.记录函数退出.记录函数(记录) 否则: PyTorch.操作.分析器.记录函数退出(记录) def 在 future 上调用结束回调(self, fut: 未来[任何]) -> 未来[任何] 用于分析返回 future 的异步调用。 调用此函数将扩展记录范围,直到 future 被 满足。这对于分析异步调用的端到端时间很有用。 此函数应仅调用一次以将回调附加到未来,并且 如果多次调用将抛出异常。 参数: fut: (torch._C.Future): 要为其安排回调的未来。 的未来。 返回值: 一个与传入未来的值完成的未来。 当配置文件回调运行完毕后。 """ 抛出异常,因为我们已经为该 future 附加了回调。 如果 self.在退出时运行回调: 抛出 运行时错误("_call_end_callbacks_on_future 只能调用一次。") 我们正在安排在运行此 RecordFunction 的结束回调时 传入的未来完成,因此退出时不要运行结束回调。 self.运行退出回调 = # 本地变量由 TorchScript 用于将 Optional[T]精炼为 T 记录 = self.记录 断言 记录 # TODO: 启用 __torch_function__ 处理太慢 # 请参阅 https://github.com/pytorch/pytorch/issues/76410 如果 PyTorch.算子.是否正在脚本化(): 替换为 PyTorch._C.禁用 TorchFunction 子类(): 个人档案未来 = ( PyTorch.操作.分析器.在 JIT 未来上调用结束回调.记录函数( 记录, 未来 ) ) 否则: 已分析的未来 = PyTorch.操作.分析器.调用 JIT 未来上的结束回调( 记录, fut ) 返回 被记录的未来
[文档] emit_itt: """上下文管理器,使每个自动微分操作都发出 ITT 范围。""" 在使用 Intel(R) VTune Profiler 运行程序时很有用: vtune <常规命令> 仪器和跟踪技术(ITT)API 使您的应用程序能够在执行期间生成并 控制不同 Intel 工具收集的跟踪数据。 这个上下文管理器用于注释 Intel(R) VTune 性能分析跟踪。借助这个上下文管理器, 您可以在 Intel(R) VTune 性能分析器 GUI 中看到标记的范围。 ..警告: 此上下文管理器不应递归调用,即最多只能调用一次 实例在任何给定时间都应该处于启用状态。 参数: 启用(布尔值,可选):将 ``enabled=False`` 设置为无操作(no-op)的上下文管理器。 默认:``True``。 record_shapes(布尔值,可选):如果 ``record_shapes=True``,itt 范围包装 每个自动微分操作都会在以下格式中附加关于接收到的 Tensor 参数大小的信息 由该操作提供: ``[[arg0.size(0), arg0.size(1), ...], [arg1.size(0), arg1.size(1), ...], ...]`` 非张量参数将由 ``[]`` 表示。 后端操作接收到的参数将按顺序列出。 请注意,这个顺序可能与在 Python 端传递参数的顺序不匹配。 还要注意,形状记录可能会增加 itt 范围创建的开销。 默认:``False`` 示例: >>> # xdoctest: +忽略("未定义变量") >>> # xdoctest: +REQUIRES(env:TORCH_DOCTEST_AUTOGRAD_PROFILER) >>> 使用 torch.autograd.profiler.emit_itt() ... 模型(x) """ def __init__(self, 启用=True, record_shapes=False): self.启用 = 启用 self.进入 = self.record_shapes = record_shapes def __进入__(self): 如果 self.启用: 返回 如果 self.输入: 抛出 运行时错误(ITT 注释上下文管理器不是可重入的) self.进入 = 真实 运行在分析器启动时() 启用分析器( 分析器配置( 分析器状态.意大利面, self.record_shapes, False, False, False, False, _实验配置(), ), 设置(), ) 返回 self def __退出__(self, 异常类型, 异常值, 异常堆栈): 如果 self.启用: 返回 _disable_profiler() _在分析器停止时运行() 返回 False
[文档] emit_nvtx: """上下文管理器,使每个自动微分操作都发出一个 NVTX 范围。 当在 nvprof 下运行程序时很有用: nvprof --profile-from-start off -o trace_name.prof -- <regular command here> 很遗憾,无法强制 nvprof 将收集的数据刷新到磁盘,因此进行 CUDA 分析时必须使用此上下文管理器进行注释 以磁盘,因此进行 CUDA 分析时必须使用此上下文管理器进行注释 使用 nvprof 进行跟踪,并在检查之前等待进程退出。 然后,可以使用 NVIDIA Visual Profiler (nvvp)来可视化时间线,或者 使用:func:`torch.autograd.profiler.load_nvprof`可以加载结果以进行检查 例如在 Python 交互式解释器中。 ..警告: 此上下文管理器不应递归调用,即在任何给定时间最多只能启用一个实例。 任何给定时间只能启用一个实例。 参数: 启用(布尔值,可选):将 ``enabled=False`` 设置为上下文管理器为无操作。 默认:``True``。 record_shapes (bool, 可选): 如果 ``record_shapes=True``,则 nvtx 范围包装 每个自动微分操作将附加接收到的张量参数的大小信息 通过该操作,以下格式: ``[[arg0.size(0), arg0.size(1), ...], [arg1.size(0), arg1.size(1), ...], ...]`` 非张量参数将被表示为 ``[]``。 参数将按照后端操作接收的顺序列出。 请注意,这个顺序可能与传递参数时的顺序不匹配。 在 Python 端。另外请注意,形状记录可能会增加 nvtx 范围创建的开销。 默认:``False`` 示例: >>> # xdoctest: +SKIP("未定义变量") >>> # xdoctest: +REQUIRES(env:TORCH_DOCTEST_AUTOGRAD_PROFILER) >>> with torch.cuda.profiler.profile(): ... 模型(x) # 预热 CUDA 内存分配器和分析器 ... 使用 torch.autograd.profiler.emit_nvtx() ... 模型(x) **正向反向相关性** 在使用 Nvidia Visual Profiler 中的`:class:`emit_nvtx`创建的配置文件中查看时, 将每个反向传播操作与相应的正向传播操作关联起来可能很困难。 为了简化这项任务,`:class:`emit_nvtx`将其生成的范围附加序列号信息。 生成。 在正向传播过程中,每个函数范围都被装饰为 `seq=`。`seq` 是一个运行 计数器,每次创建并存储新的 backward 函数对象时增加。 因此,与每个正向函数范围关联的 ``seq=`` 注解告诉您 如果由该正向函数创建了一个反向函数对象, 后向对象将接收到序列号 N。 在反向传播过程中,每个 C++反向函数的最高级范围包装 `apply()` 调用被装饰为 `stashed seq=`。`M` 是序列号, 向后对象是用.创建的。通过比较向后中的“stashed seq”编号与“seq” 按顺序排列的数字,您可以追踪哪个前向操作创建了每个后向函数。 在反向传播过程中执行的所有函数也装饰了 `seq=`。在 默认反向操作(`create_graph=False`)中,此信息无关紧要,实际上, `N` 可能对所有这些函数都是 0。只有与顶级范围相关的 后向函数对象的 `apply()` 方法很有用,可以作为将这些函数对象与早期正向传递相关联的方式。 另一方面,如果正在进行带有 `create_graph=True` 的后向传递(换句话说, **双重后向** 如果,另一方面,正在执行带有 `create_graph=True` 的后向传递(换句话说, 如果您正在设置双重反向操作),每个函数在反向执行 给定一个非零、有用的“seq=”。这些函数本身可能创建函数对象 稍后将在双反向过程中执行,就像原始函数在正向传递中做的那样。 向后和双向的关系在概念上与向后和双向后关系相同 在向前和向后之间:函数仍然发出带有当前序列号标记的范围, 它们创建的函数对象仍然存储这些序列号,并在最终的双向回溯过程中, 函数对象的 `apply()` 范围仍然带有 `存储的序列` 标记, 这些标记的序列号可以与反向传递中的 `seq` 序列号进行比较。 ..警告: 序列号是线程局部变量,并且一些前向函数不会创建关联的 后向函数对象(而是将这一任务委托给调用链中的子函数)。 因此,在以下情况下,存储的序列号之间的对应关系 向后传递函数 `apply()` 中的 `seq` 数字范围 不保证一一对应。序列号本身可能不足以完全 消除混淆,确定哪个前向函数创建了哪个 后退函数对象。您可能需要根据分析知识做出判断。 预期对应应为。 """ def __init__(self, 启用=True, record_shapes=False): self.启用 = 启用 self.进入 = self.记录形状 = 记录形状 def __进入__(self): 如果 self.启用: 返回 如果 self.输入: 抛出 运行时错误("NVTX 注解上下文管理器不可重入") self.进入 = 真实 PyTorch.cuda.同步() 运行在分析器启动时() 启用分析器( 分析器配置( 分析器状态.NVTX, self.record_shapes, False, False, False, False, _实验配置(), ), 设置(), ) 返回 self def __退出__(self, 异常类型, 异常值, 异常堆栈): 如果 self.启用: 返回 PyTorch.cuda.同步() _disable_profiler() _在分析器停止时运行() 返回 False
[文档]def load_nvprof(path): """打开 nvprof 跟踪文件并解析 autograd 注释。 Args: path (str): nvprof 跟踪文件的路径 "``" 返回 EventList 解析 nvprof_trace 路径()
[文档]类 EnforceUnique: "``如果发现一个键被多次看到,则引发错误。``" def __init__(self): self.seen = set()
[文档] def see(self, *key): r""" 观察一个键,如果它被多次看到,则引发错误。 """ 如果键在 self.seen 中: 引发 RuntimeError("重复键: " + str(key)) self.seen.add(key)
[文档]def 解析 nvprof 跟踪(路径): 导入 sqlite3 连接 = sqlite3.连接(路径) .行工厂 = sqlite3. # 解析字符串表 字符串 = {} for r 连接.执行("SELECT _id_ as id, value FROM StringTable"): 字符串[r[id]] = PyTorch._C._demangle(r["值"]) 首先,找到所有函数并为它们创建 FunctionEvents 标记查询 = "" SELECT start.id AS 标记_id, start.name, start.timestamp AS 开始时间, end.timestamp AS 结束时间 FROM CUPTI_ACTIVITY_KIND_MARKER AS start INNER JOIN CUPTI_ACTIVITY_KIND_MARKER AS end ON 起始.id = 结束.id WHERE 起始.name 不等于 0 AND 结束.name 等于 0 """ 函数 = 输入文本为空,请提供需要翻译的文本 函数映射 = {} 唯一的 = 强制唯一() for 连接.执行(标记查询): 独特.(["标记_id"]) 事件 = 函数事件( id=[marker_id] node_id=0, # 调用 FunctionEvent 时缺少 node_id。这只是为了确保 # 创建 FunctionEvent()对象时 pytorch 不会崩溃 名称=字符串[[名称]], 开始美国=["开始时间"] 结束美国=[结束时间] 线程=0, ) # TODO: 在 sqlite 数据库中查找 函数.添加(事件) 函数映射[事件.id] = 事件 现在,将所有内核与 FunctionEvents 关联 内核查询 = "" SELECT start.id AS marker_id, start.name, start.timestamp, end.timestamp, runtime._id_ AS runtime_id, runtime.cbid, runtime.start AS runtime_start, runtime.end AS runtime_end, kernel.start AS kernel_start, kernel.end AS kernel_end, kernel.name AS kernel_name FROM CUPTI_ACTIVITY_KIND_MARKER AS start INNER JOIN CUPTI_ACTIVITY_KIND_MARKER AS end ON start.id = end.id INNER JOIN CUPTI_ACTIVITY_KIND_RUNTIME as runtime ON (start.timestamp < runtime.start AND runtime.end < end.timestamp) INNER JOIN CUPTI_ACTIVITY_KIND_CONCURRENT_KERNEL AS kernel ON kernel.correlationId = runtime.correlationId """ 唯一的 = 确保唯一() for 连接.执行(内核查询): 独特.查看(["标记 ID"] ["运行时 ID"]) # 211 是 CUDA >= 9.2 的 cudaKernelLaunch 断言 ["cbid"] == 211 事件 = 函数映射[["标记_id"]] 事件.添加内核( [内核名称] 0, [内核结束] - ["内核启动"] ) 函数.排序(=Lambda 函数 事件: 事件.时间范围.开始) 返回 函数
[文档] 动作步数追踪器: 提供了一个全局增加步数计数的抽象。 之前,我们只有一个地方可以标记步数发生。 通过 pytorch profiler 的 step()函数在程序中进行。现在我们将添加步骤钩子 在 Optimizer 类中 https://github.com/pytorch/pytorch/issues/88446 - 这可能意味着已经在每个迭代中调用 profiler.step()的程序 最终会导致步骤计数重复增加。 如果一个模型使用多个优化器,我们也可以有双倍或更多的步数计数。 我们通过在调用 kineto 库中的 step()之前添加一层抽象来解决这个问题。想法是维护一个字典,以存储每个请求者的步数。 我们通过在调用 kineto 库中的 step()之前添加一层抽象来解决这个问题。想法是维护一个字典,以存储每个请求者的步数。 我们通过在调用 kineto 库中的 step()之前添加一层抽象来解决这个问题。想法是维护一个字典,以存储每个请求者的步数。 .. 代码块 { "ProfilerStep": 100, # 由分析器 step() 调用触发 "Optimizer1Step": 100, # 优化器 1 或 2 只是示例,可以是 SGD、Adam 等 "Optimizer2Step": 100, } 为了计算全局步数,只需取字典值的最大值(100)。 如果计数之一增加,最大值将上升。 .. 代码块 { "ProfilerStep": 100, "Optimizer1Step": 101,# Optimizer1 首次增加 "Optimizer2Step": 100, } 然后全局步数是 101 只有当全局计数增加时,我们才调用 kineto step()函数。 请注意:请勿在优化器以外的模块中使用 KinetoStepTracker 目前。结果可能是步数的不正确增加。 """ 当前步骤 = 0 _step_dict: 字典[str, int] = 默认字典(int)
[文档] @类方法 def 初始化步骤计数(cls, 请求者: str): r""" 初始化给定的请求者。 """ cls._step_dict[requester] = cls._current_step
[文档] @classmethod def erase_step_count(cls, requester: str) -> bool: r""" 移除指定的请求者。 """ 返回 cls._step_dict.pop(requester, None) 是否为非空
[文档] @classmethod def increment_step(cls, requester: str) -> int: """增加请求者的步骤计数。 此外,如果所有步骤计数中的最大值已增加,则 触发_kineto_step()返回全局步数 """ 如果请求者不在 cls._step_dict 中: cls.init_step_count(requester) cls._step_dict[requester] += 1 new_step = max(cls._step_dict.values()) if new_step > cls._current_step: delta = new_step - cls._current_step if delta > 1: warn( "分析器步骤计数增加超过 1 - " f"当前步骤 = {cls._current_step} 步骤字典 = {cls._step_dict}" ) for _ in 范围(0, delta): _kineto_step() cls._current_step = new_step 返回 cls._current_step
[文档] @classmethod def current_step(cls) -> int: r""" 获取任何请求者的最新步骤 """ 返回 cls._current_step

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源