# mypy: 允许未类型化定义
导入 uuid
来自
集合
导入 defaultdict
来自 collections.abc
导入
迭代器
来自 dataclasses
导入
数据类
来自
时间
导入 perf_counter_ns
来自
打字
导入
任何,
可选
来自
警告
导入
警告
导入
火炬
导入 torch.cuda
来自 torch._C
导入
_获取私有用途 1 后端名称
来自 torch._C._profiler
导入
_实验配置
来自
torch 自动微分
导入 (
_disable_profiler,
启用分析器,
动态步进,
准备分析器,
_ProfilerResult,
支持的活动,
切换集合动态,
设备类型,
动作可用,
分析器活动,
分析器配置,
分析器状态,
)
来自 torch.autograd.profiler_util
导入 (
_filter_name,
_filter_stack_entry,
_rewrite_name,
事件列表,
函数事件,
内存事件名称,
内存记录账户,
内存不足事件名称,
)
来自 torch.futures
导入
未来
全部 = [
"配置文件",
"记录函数",
"发出 itt",
"emit_nvtx",
"load_nvprof",
"EnforceUnique",
"parse_nvprof_trace",
"运动步数追踪器",
"事件列表",
"功能事件",
"内存记录账户",
]
尝试:
Python >= 3.2 可用
来自 contextlib
导入
上下文装饰器 as
_上下文装饰器
除了
导入错误:
导入 functools
类
_上下文装饰器:
# 类型:忽略[重新定义]
def __进入__(self):
抛出
未实现异常
def __退出__(self,
异常类型,
异常值,
异常堆栈):
抛出
未实现异常
def __调用__(self,
函数):
@functools.包装(
函数)
def 包装(*
参数, **kwargs):
替换为 self:
返回
函数(*
参数, **kwargs)
返回
包装
全局 Python 状态 - 是否已启用分析器
适用于快速 Python 检查以减少延迟
_is_profiler_enabled: 布尔值 =
假
def _set_is_profiler_enabled(启用: bool):
全局 _is_profiler_enabled
_is_profiler_enabled = 启用
def _run_on_profiler_start():
_set_is_profiler_enabled(True)
def 在分析器停止时运行():
设置是否启用分析器(False)
@dataclass
类
分析器统计信息:
开发者用于捕获问题/回归的分析器计时和统计信息
性能分析窗口持续时间(秒):
浮点数 = 0
事件数量:
整型 = 0
分析器准备调用持续时间(微秒):
整型 = 0
分析器启用调用持续时间(微秒):
整型 = 0
profiler_disable_call_duration_us: 整型 = 0
parse_kineto_call_duration_us: 整型 = 0
function_events_build_tree_call_duration_us: 整型 = 0
[文档]
类
个人资料:
管理自动微分分析器状态并保存结果摘要的上下文管理器。
在底层,它只是记录 C++中执行函数的事件,
并将这些事件暴露给 Python。您可以将任何代码封装在其中,它将
只报告 PyTorch 函数的运行时间。
注意:分析器是线程局部变量,并且会自动传播到异步任务中。
参数:
enabled (bool, 可选): 将此设置为 False 将使此上下文管理器成为空操作。
use_cuda (bool, 可选): 启用 CUDA 事件的计时。
使用 cudaEvent API。 (将弃用)
use_device (str, 可选): 启用设备事件的计时。
使用 CUDA 时,每个张量操作会增加大约 4us 的开销。
有效的设备选项为 'cuda'、'xpu'、'mtia' 和 'privateuseone'。
record_shapes(布尔值,可选):如果启用了形状记录,将收集有关输入维度的信息。这允许用户查看哪些
关于输入维度的信息将被收集。这允许用户查看哪些
内部已使用维度进行分组
使用 prof.key_averages(group_by_input_shape=True)。请注意,
形状记录可能会扭曲您的配置文件数据。建议
使用带和不带形状记录的独立运行来验证时间。
最可能的是,对于最底层事件,偏差将是可以忽略不计的(在这种情况下)
嵌套函数调用的总
自我 CPU 时间可能因形状而人为增加
集合。
with_flops (bool, 可选): 如果设置了 with_flops,则分析器将根据操作符的输入形状估算 FLOPs(浮点运算)值。
这允许估计硬件性能。目前,此选项仅适用于矩阵乘法和 2D 卷积操作符。
这允许估计硬件性能。目前,此选项仅适用于矩阵乘法和 2D 卷积操作符。
目前,此选项仅适用于矩阵乘法和 2D 卷积操作符。
profile_memory (bool, 可选): 跟踪张量内存分配/释放。
with_stack (bool, 可选): 记录操作符的源信息(文件和行号)。
with_modules (bool): 记录模块层次结构(包括函数名)。
对应操作符的调用栈。例如,如果模块 A 的前向调用
模块 B 的前向包含一个 aten::add 操作,
然后 aten::add 的模块层次结构是 A.B
注意,目前这种支持仅存在于 TorchScript 模型中
而不是急切模式模型中。
use_kineto (bool, optional): 实验性,启用 Kineto 分析器进行性能分析。
use_cpu (bool, optional): 分析 CPU 事件;设置为 ``False`` 需要 ``use_kineto=True``,可用于降低仅 GPU 分析的开销。
``use_kineto=True`` 和可以用于降低仅 GPU 分析的开销。
experimental_config (_ExperimentalConfig) : 一组实验性选项
由 Kineto 等分析库使用。请注意,不保证向后兼容性。
acc_events (bool): 启用在多个分析周期内累积 FunctionEvents
.. 警告:
启用内存分析或源归属将导致分析器增加额外的负担
overhead
..警告:
此上下文管理器不应递归调用,即不允许嵌套
实例
..警告:
由于一些 CUDA 多进程的限制(multiprocessing-cuda-note_),
不能使用 ``use_device = 'cuda'`` 的配置来使用分析器进行基准测试
如果您想对数据加载进行基准测试,请使用 ``num_workers > 0`` 的 DataLoaders。
请使用 `use_device = None` 或 `num_workers = 0`。
示例:
>>> # xdoctest: +SKIP
>>> # xdoctest: +REQUIRES(env:TORCH_DOCTEST_AUTOGRAD_PROFILER)
>>> x = torch.randn((1, 1), requires_grad=True)
>>> with torch.autograd.profiler.profile() as prof:
>>> for _ in range(100): # 任何正常的 Python 代码,真的!
>>> y = x ** 2
>>> y.backward()
>>> # 注意:为了简洁,一些列被删除
>>> 打印(prof.key_averages().table(sort_by="self_cpu_time_total"))
----------------------------------- --------------- --------------- ---------------
名称 自身 CPU 总时间 平均 CPU 时间 调用次数
----------------------------------- --------------- --------------- ---------------
mul 32.048ms 32.048ms 200
pow 27.041ms 27.041ms 200
PowBackward0 9.727ms 55.483ms 100
torch::autograd::AccumulateGrad 9.148ms 9.148ms 100
torch::autograd::GraphRoot 691.816us 691.816us 100
----------------------------------- --------------- --------------- ---------------
"""
def __init__(
self,
启用=True,
*,
use_cuda=False, # 已废弃
use_device=无,
record_shapes=False,
带有_flops=False,
配置内存=False,
带有_stack=False,
带有_modules=False,
使用 kineto=False,
使用 CPU=True,
实验性配置=
无,
事件统计=False,
自定义跟踪 ID 回调=
无,
):
self.启用:
布尔值 =
启用
如果
不 self.
启用:
返回
self.使用 CUDA =
使用 CUDA
如果 self.
使用 CUDA:
警告(
"属性 `use_cuda` 将很快被弃用,"
"请使用 ``use_device = 'cuda'`` 代替。",
未来警告,
栈级别=2,
)
self.use_device: 可选[str] = "cuda"
否则:
self.use_device = 使用设备
# TODO 考虑将 _function_events 改为具有大小限制的数据结构
self._function_events: 可选[
事件列表] =
无
self._旧功能事件:
可选[
事件列表] =
无
# 函数事件处理是延迟进行的
self._需要处理 =
假
self.输入 =
假
self.记录形状 =
记录形状
self.与 flops 一起 =
带浮点运算
self.记录形状 |= self.
带浮点运算
self.内存分析 =
用户内存
self.带栈 =
带栈
self.带模块 =
带有模块
self.使用 CPU =
使用 CPU
self.访问事件 =
访问事件
如果
实验配置
是
无:
实验配置 =
_实验配置()
self.实验配置 =
实验配置
self.动态结果:
可选[_ProfilerResult] =
无
self.性能分析开始时间(纳秒) = 0
self.性能分析结束时间(纳秒) = 0
self.统计 =
分析器统计()
self.自定义跟踪 ID 回调 =
自定义跟踪 ID 回调
self.跟踪 ID =
请提供需要翻译的文本
如果
不 self.
使用 CPU:
断言 (
使用 Kineto
), 仅支持与 Kineto(use_kineto=True)一起使用设备-only 事件
如果 self.
使用设备
是
不
无:
有效的设备选项 = [
cuda,
XPU,
翻译,
hpu]
如果 _get_privateuse1_backend_name() !=
私有用途一:
有效的设备选项.
添加(_get_privateuse1_backend_name())
如果 self.
使用设备
不
在
有效的设备选项:
警告(f"The {self.
使用设备}
不是一个有效的设备选项。)
self.使用设备 =
无
如果 self.
使用设备 == "cuda"
和
不
PyTorch.cuda.
是否可用():
警告(
"CUDA 不可用,禁用 CUDA 分析")
self.使用 CUDA =
假
self.使用设备 =
无
如果 self.
使用设备 ==
xpu
和
不
PyTorch.xpu.
是否可用():
警告(
"XPU 不可用,禁用 XPU 分析")
self.使用设备 =
无
如果 self.
使用设备 == "hpu"
和
不 (
有属性(
PyTorch,
hpu)
和
PyTorch.
硬件加速处理器.
是否可用()
):
警告(
"HPU 不可用,禁用 HPU 分析")
self.使用设备 =
无
self.动作集 =
设置()
如果 self.
使用 CPU:
self.动作库.
添加(
分析器活动.CPU)
self.分析器类型 =
分析器状态.
运动控制
如果 self.
使用设备 ==
cuda:
如果
不
使用 KINETO
或者
分析器活动.CUDA
不
在
支持的活动():
断言 self.
使用 CPU,
"Legacy CUDA 分析需要使用 use_cpu=True"
self.分析器类型 =
分析器状态.KINETO_GPU_FALLBACK
否则:
self.动作库.
添加(
分析器活动.CUDA)
如果...否则 self.
使用设备 ==
XPU:
断言 (
使用 kineto
和
分析器活动.XPU
在
支持的活动()
), "XPU 历史版本分析不支持。在 XPU 设备上需要使用 use_kineto=True。"
self.动作.
添加(
分析器活动.XPU)
如果...否则 self.
使用设备 ==
翻译:
断言 (
使用 kineto
和
分析器活动.MTIA
在
支持的活动()
), "不支持遗留 MTIA 配置文件。在 MTIA 设备上需要使用_kineto=True。"
self.运动活动.
添加(
分析器活动.
翻译技术国际会议)
如果...否则 self.
使用设备 ==
hpu:
断言 (
使用 kineto
和
分析器活动.HPU
在
支持的活动()
), "HPU 的 Legacy HPU 分析不受支持。需要在 HPU 设备上使用 use_kineto=True。"
self.kineto 活动.
添加(
分析器活动.HPU)
如果...否则 self.
使用设备
是
不
无
和 self.
使用设备 !=
私用一:
如果 (
不
使用动量
或者
分析器活动.
私有用途 1
不
在
支持的活动()
):
断言 (
self.使用 CPU
), "Legacy custombackend 性能分析需要使用 use_cpu=True"
self.分析器类型 =
分析器状态.KINETO_PRIVATEUSE1_FALLBACK
否则:
self.运动活动.
添加(
分析器活动.
私用 1)
断言 (
长度(self.
动作) > 0
), "未指定分析器的活动"
def 默认跟踪 ID(self):
生成一个 UUID
uuid 原始 = uuid.uuid4()
返回 f"{
uuid 原始.int:032X}"
def 创建跟踪 ID(self):
如果 self.
自定义跟踪 ID 回调:
返回 self.
自定义跟踪 ID 回调()
返回 self.
默认跟踪 ID()
def 配置(self,
创建跟踪 ID=False):
# 仅在准备跟踪时生成新的跟踪 ID,而不是开始跟踪时
如果
创建跟踪 ID:
跟踪 ID = self.
创建跟踪 ID()
self.trace_id = trace_id
返回
分析器配置(
self.profiler_kind,
self.record_shapes,
self.用户内存,
self.带栈,
self.带 FLOPs,
self.带模块,
self.实验配置,
self.跟踪 ID,
)
def __进入__(self):
如果
不 self.
启用:
返回
如果 self.
输入:
抛出
运行时错误(
"分析器上下文管理器不可重入")
self._准备跟踪()
self._开始跟踪()
返回 self
def 准备追踪(self):
self.进入 =
真实
t0 = 性能计数器纳秒()
准备分析器(self.
配置(
创建跟踪 ID=True), self.
动作类型)
t1 = 性能计数器纳秒()
self._统计数据.
分析器准备调用持续时间微秒 = int((t1 - t0) / 1000)
def _开始跟踪(self):
self.输入 =
真实
在分析器启动时运行()
t0 = 性能计数器纳秒()
启用分析器(self.
配置(
创建跟踪 ID=False), self.
动作记录)
t1 = 性能计数器纳秒()
self.统计.
启用分析器调用持续时间微秒 = int((t1 - t0) / 1000)
self.分析开始时间(纳秒) = t1
def __退出__(self,
异常类型,
异常值,
异常堆栈):
如果
不 self.
启用:
返回
如果 self.
使用设备
和
有属性(
PyTorch, self.
使用设备):
设备模块 = getattr(
PyTorch, self.
使用设备)
如果
有属性(
设备模块,
"同步"):
设备模块.
同步()
如果 self.
函数事件
和 self.
访问事件:
self.旧函数事件 = self.
函数事件
self.功能事件 =
无
self.需要处理 =
真实
t0 = 性能计数器纳秒()
self.动态结果 = _disable_profiler()
t1 = 性能计数器纳秒()
self._统计数据.profiler_disable_call_duration_us = int((t1 - t0) / 1000)
self.剖析结束时间纳秒 = t0
_在分析器停止时运行()
self._统计数据.
性能分析窗口持续时间(秒) = (
(self.性能分析结束时间(纳秒) - self.
性能分析开始时间(纳秒)) * 1.0 /
1000000000
)
如果我们计划累积事件,我们应该立即后处理函数事件
以保留多个启动/停止调用之间的状态
如果 self.acc_events:
self._确保函数事件()
返回
假
def __repr__(self):
如果 self.
待处理:
self.确保功能事件()
如果 self.
功能事件
是
无:
返回
"<未完成的 torch.autograd.profile>"
返回
表示(self.
_函数事件)
def __str__(self):
如果 self.
需要处理:
self.确保功能事件()
如果 self.
_函数事件
是
无:
返回
"<未完成的 torch.autograd.profile>"
返回 str(self.
_函数事件)
def _确保函数事件(self):
需要时懒加载处理函数事件
如果 self.
函数事件
是
不
无:
返回
self.需要处理 =
假
t0 = 性能计数器纳秒()
解析结果 =
输入文本为空,请提供需要翻译的文本
如果 self.
动力学结果:
解析结果 = self.
_解析 kineto 结果(self.
kineto 结果)
t1 = perf_counter_ns()
self._stats.parse_kineto_call_duration_us = int((t1 - t0) / 1000)
self._function_events = 事件列表(
解析结果,
使用设备=self.
使用设备,
配置内存=self.
用户内存,
带有 FLOPS=self.
带有 FLOPS,
)
t0 = 性能计数器纳秒()
self.功能事件.
构建树()
t1 = 性能计数器纳秒()
self._统计数据.
函数事件构建树调用持续时间微秒 = int((t1 - t0) / 1000)
self._stats.事件数量 =
长度(self.
功能事件)
如果 self.
旧功能事件
和 self.
事件:
for 事件
在 self._old_function_events:
self._function_events.添加(evt)
self._old_function_events = 无
如果 self.
功能事件
是
无:
抛出
运行时错误(
分析器未完成运行)
@property
def 功能事件(self):
如果 self.
_功能事件
是
无
或者 self.
待处理:
self.确保功能事件()
返回 self.
功能事件
def 表格(
self,
排序=
无,
行限制=100,
最大源列宽度=75,
最大名称列宽度=55,
最大形状列宽度=80,
标题=
无,
仅显示顶级事件=False,
):
self.确保函数事件()
断言 self.
函数事件
是
不
无
返回 self.
功能事件.
表格(
按此排序=
排序,
行限制=
行限制,
最大源列宽度=
最大源列宽度,
最大名称列宽度=
最大名称列宽度,
最大形状列宽度=
最大形状列宽,
标题=
标题,
仅顶级事件=
仅顶级事件,
)
表格.__doc__ =
事件列表.
表格.__doc__
[文档] def export_chrome_trace(self, path):
"""
将收集到的跟踪信息导出为 Chrome JSON 格式。如果启用了 kineto,则只导出
调度中的最后一个周期。
"""
如果 kineto 可用()
self.kineto_results.save(path) # 忽略[联合属性]
else:
self._ensure_function_events()
return self._function_events.export_chrome_trace(path) # type: ignore[union-attr]
导出 Chrome 跟踪.__doc__ =
事件列表.
导出 Chrome 跟踪.__doc__
def 导出堆栈(self,
路径: str,
指标:
字符串 = "self_cpu_time_total"):
self._确保函数事件()
断言 self.
_函数事件
是
不
无,
预期分析结果
断言 self.with_stack,
"export_stacks()需要 with_stack=True"
返回 self._function_events.
导出堆栈(
路径,
指标)
def 切换集合动态(
self, 启用: bool,
活动:
迭代器[
分析器活动]
):
""
切换当前分析实例的活动收集。
"""
返回
切换集合动态(
启用,
设置(
活动))
[文档] def key_averages(
self,
group_by_input_shape=False,
group_by_stack_n=0,
group_by_overload_name=False,
):
self._ensure_function_events()
assert self._function_events is not None, "Expected profiling results"
return self._function_events.key_averages(
group_by_input_shape, group_by_stack_n, group_by_overload_name
)
key_averages.__doc__ = 事件列表.
关键平均数.__doc__
[文档] def 总平均(self):
self._确保函数事件()
assert self._function_events is not None, "预期分析结果"
返回 self._function_events 的总平均值()
总平均值.__doc__ =
事件列表.
总平均值.__doc__
@property
def self_cpu_time_total(self):
返回在 CPU 上花费的总时间。
总时间是指所有事件的自时间总和。
"""
self._ensure_function_events()
断言 self.
_函数事件
是
不
无
返回 self.
功能事件.
自身 CPU 总时间
def 解析 kineto 结果(self,
结果: _ProfilerResult):
# result.events() 包含大部分事件 - PyTorch 操作级别和设备级别的事件
跟踪开始时间戳 =
结果.
跟踪开始时间戳()
内存记录 = [
[事件, False] for
事件
在
结果.
活动()
如果
事件.
名称() ==
内存事件名称
]
内存记录 = [
事件 for
事件
在
结果.
事件列表()
如果
事件.
名称() ==
内存不足事件名称
]
内存记录累计 = MemRecordsAcc(
内存记录)
def _CPU 内存使用(
内存记录):
返回 (
内存记录.
字节数()
如果
内存记录.
设备类型()
在 [
设备类型.CPU,
设备类型.MKLDNN,
设备类型.IDEEP]
否则 0
)
def 设备内存使用(
内存记录):
返回 (
内存记录.
字节数()
如果
内存记录.
设备类型()
在 [
设备类型.CUDA,
设备类型.
私有用途 1,
设备类型.HIP]
否则 0
)
创建并返回包含所有函数事件的 FunctionEvent 列表
创建了 2 个函数事件:
all_function_events 包含与每个 kineto 事件相关的所有事件
所有功能事件 =
输入文本为空,请提供需要翻译的文本
# 前端功能事件包含 aten 或 torch 前端级别的
# 其关联 ID 为 0
前端功能事件 =
输入文本为空,请提供需要翻译的文本
设备对应映射:
字典[int,
列表[
函数事件]] = {}
最大事件 ID = 0
for 动作事件
在
结果.
活动():
如果
过滤器名称(
动作事件.
名称()):
continue
rel_start_ns = kineto_event.开始命名空间() -
跟踪开始时间戳
相对结束时间戳 =
动态事件.
结束时间戳() -
跟踪开始时间戳
绝对结束时间戳 =
运动事件.
结束时间戳()
CPU 内存使用率 = 0
设备内存使用率 = 0
如果
运动事件.
设备类型() ==
设备类型.CPU:
# 查找对应的内存分配事件
for mem_record 在 mem_records_acc.
在区间内(
动作事件.
开始纳秒() / 1000,
绝对结束纳秒 / 1000
):
CPU 内存使用率 +=
_CPU 内存使用率(
内存记录[0])
设备内存使用率 +=
设备内存使用(
内存记录[0])
内存记录[1] =
真实
是否异步 =
动作事件.
是否异步()
或者 (
动作事件.
开始线程 ID() !=
动作事件.
线程结束 ID()
)
fe = 函数事件(
id=动作事件.
关联 ID(),
名称=
_重写名称(
名称=
动作事件.
名称(),
带通配符=True),
重载名称=
动作事件.
重载名称(),
跟踪名称=
重写名称(
名称=
动作事件.
名称(),
带通配符=False),
线程=
动作事件.
开始线程 ID(),
开始美国=
相对开始 NS / 1000,
结束美国=
相对结束 NS / 1000,
前方线程=
动态事件.
前线程 ID(),
输入形状=
动态事件.
形状(),
混凝土输入=
动作事件.
混凝土输入(),
输入参数=
动作事件.
输入参数(),
栈=[
条目
for 条目
在
动作事件.
栈()
如果
过滤栈条目(
条目)
]
范围=
动作事件.
范围(),
use_device=self.use_device,
CPU 内存使用率=
CPU 内存使用率,
设备内存使用率=
设备内存使用率,
异步=
异步,
序列号=
动作事件.
序列号(),
设备类型=
运动事件.
设备类型(),
设备索引=
运动事件.
设备索引(),
设备资源 ID=
动作事件.
设备资源 ID(),
flops=动作事件.
洛普斯(),
是否为用户标注=
动作事件.
是否为用户标注(),
)
max_evt_id = 最大值(max_evt_id, fe.id)
如果 fe.
设备类型 ==
设备类型.CPU
和
不
非阻塞.
异步:
如果 self.
使用设备 ==
私用一:
私有用途 1 时间 =
动作事件.
privateuse1 已用时间()
如果
privateuse1 时间 > 0:
fe.添加内核(
飞机.
名称,
飞机.
设备索引,
私有用途 1 时间)
fe.是否为旧版本 =
真实
如果...否则 self.
使用设备 ==
cuda:
# 检查是否有 CUDA 时间作为后备方案
cuda 时间 =
动作事件.
cuda 已用时间微秒()
如果
cuda 时间 > 0:
fe.添加内核(fe.
名称, fe.
设备索引, cuda_time)
fe.is_legacy = 真实
所有功能事件.
添加(
粒子)
关联 ID =
动作事件.
链接关联 ID()
如果 corr_id > 0:
如果 corr_id
不
在
设备关联映射:
设备关联映射[
关联 ID] =
输入文本为空,请提供需要翻译的文本
设备关联映射[
关联 ID].
添加(
前端(FE))
如果...否则 corr_id == 0:
前端功能事件.
添加(fe)
否则:
抛出
运行时错误(
f"获得负相关 id"{corr_id}
在分析器后处理中
)
# 将设备内核和设备运行时(CPU)与 CPU 事件关联
for fe 在
前端功能事件:
如果 (
前端缩写.
设备类型 ==
设备类型.CPU
和
不
前端.
异步
和
前端.
标识符
在
设备对应映射
):
for f_evt 在
设备校正映射[fe.id
]
如果 (
f_evt.设备类型 ==
设备类型.CUDA
或者 f_evt.
设备类型 ==
设备类型.
私有用途 1
):
fe.添加内核(
f_evt.名称,
f_evt.设备索引,
f_evt.时间范围.
末端 - f_evt.
时间范围.
开始,
)
如果...否则 f_evt.
设备类型 ==
设备类型.CPU:
确保 CPU Kineto(例如设备运行时)事件的'thread'相关联
正确跟踪相应的 PyTorch 事件的'线程'
父亲和子代
f_evt.线程 = fe.
线程
def 为内存事件创建函数事件(
事件(evt)):
rel_start_ns = evt.start_ns() - trace_start_ns
费 =
功能事件(
id=最大事件 ID,
名称=
事件.
名称(),
超载名称=
输入文本翻译为简体中文为:"",
跟踪名称=
无,
# 不在跟踪中输出
线程=
事件.
开始线程 ID(),
开始时间(美国)=
相对开始纳秒 / 1000,
美国结束=
开始关系 / 1000,
# 无时长
前线程=
事件.
开始线程 ID(),
输入形状=
[]
栈=
[]
范围=0, # RecordScope::FUNCTION
使用设备=self.
使用设备,
CPU 内存使用率=
_CPU 内存使用率_(
事件),
设备内存使用=
设备内存使用(
事件),
是否异步=False,
序列号=-1,
设备类型=
设备类型.CPU,
设备索引=0,
)
返回
前端
输出顶级内存事件
for 内存记录
在
内存记录:
如果
不
内存记录条[1
]
最大事件 ID += 1
前端 =
为内存事件创建函数事件(
内存记录[0])
所有函数事件.
添加(fe)
for 内存溢出记录
在
内存溢出记录列表:
最大事件 ID += 1
前端 =
为内存事件创建函数事件(
内存溢出记录)
所有函数事件.
添加(
前端(FE))
所有功能事件.
排序(
键=
Lambda 函数
事件: [
事件.
时间范围.
开始, -
事件.
时间范围.
结束]
)
返回
所有功能事件
[文档]
类
记录功能(
_上下文装饰器):
代码块/函数运行自动微分分析器时添加标签的上下文管理器/函数装饰器。
只有启用 CPU 活动跟踪时,标签才会显示。
在追踪代码性能时非常有用。
参数:
name (str): 分配给代码块的标签。
节点 ID(int):节点的 ID,用于分布式分析。在非分布式情况下未设置。
非分布式情况。
示例:
>>> # xdoctest: +REQUIRES(env:TORCH_DOCTEST_AUTOGRAD_PROFILER)
>>> x = torch.randn((1, 1), requires_grad=True)
>>> 使用 torch.autograd.profiler.profile() 作为 prof:
... y = x ** 2
... with torch.autograd.profiler.record_function("label-z"): # 标记该块
... z = y ** 3
... y.backward()
...
>>> # xdoctest: +IGNORE_WANT
>>> # 注意:为了简洁,删除了一些列
>>> 打印(prof.key_averages().table(sort_by="self_cpu_time_total"))
----------------------------------- --------------- --------------- ---------------
名称 自定义 CPU 总百分比 平均 CPU 时间 调用次数
----------------------------------- --------------- --------------- ---------------
pow 60.77% 47.470us 3
mul 21.73% 25.465us 2
PowBackward0 12.03% 121.891us 1
torch::autograd::AccumulateGrad 2.70% 6.324us 1
label-z 2.13% 12.421us 1
torch::autograd::GraphRoot 0.64% 1.503us 1
----------------------------------- --------------- --------------- ---------------
自定义 CPU 总时间:234.344us
CUDA 总耗时:0.000 微秒
"""
def __init__(self, 名称: str,
参数:
可选[str] =
无):
self.名称:
字符串 =
名称
self.参数:
可选[str] = args
# 是否在退出时运行记录函数的结束回调。
self.run_callbacks_on_exit: 布尔值 =
真实
# TODO: TorchScript 在此处忽略标准的类型注解。
# self.record: Optional["torch.classes.profiler._RecordFunction"] = None
self.记录 =
PyTorch.
算子.
标注(
可选["torch.classes.profiler._RecordFunction"
]
无
)
def __进入__(self):
self.记录 =
PyTorch.
操作.
分析器._record_function_enter_new(
self.名称, self.args
)
返回 self
def __退出__(self,
异常类型:
任何, exc_value:
任何,
跟踪回溯:
任何):
如果
不 self.run_callbacks_on_exit:
返回
# 本地变量由 TorchScript 需要,以将 Optional[T]精炼为 T
记录 = self.
记录
断言
记录
是
不
无
# TODO: 启用 __torch_function__ 处理太慢
# 请参阅 https://github.com/pytorch/pytorch/issues/76410
如果
不
PyTorch.
算子.
是否正在脚本化():
替换为
PyTorch._C.
禁用 TorchFunction 子类():
PyTorch.
操作.
分析器.
记录函数退出.
记录函数(
记录)
否则:
PyTorch.
操作.
分析器.
记录函数退出(
记录)
def 在 future 上调用结束回调(self, fut:
未来[
任何]) ->
未来[
任何
]
用于分析返回 future 的异步调用。
调用此函数将扩展记录范围,直到 future 被
满足。这对于分析异步调用的端到端时间很有用。
此函数应仅调用一次以将回调附加到未来,并且
如果多次调用将抛出异常。
参数:
fut: (torch._C.Future): 要为其安排回调的未来。
的未来。
返回值:
一个与传入未来的值完成的未来。
当配置文件回调运行完毕后。
"""
抛出异常,因为我们已经为该 future 附加了回调。
如果
不 self.
在退出时运行回调:
抛出
运行时错误(
"_call_end_callbacks_on_future 只能调用一次。")
我们正在安排在运行此 RecordFunction 的结束回调时
传入的未来完成,因此退出时不要运行结束回调。
self.运行退出回调 =
假
# 本地变量由 TorchScript 用于将 Optional[T]精炼为 T
记录 = self.
记录
断言
记录
是
不
无
# TODO: 启用 __torch_function__ 处理太慢
# 请参阅 https://github.com/pytorch/pytorch/issues/76410
如果
不
PyTorch.
算子.
是否正在脚本化():
替换为
PyTorch._C.
禁用 TorchFunction 子类():
个人档案未来 = (
PyTorch.
操作.
分析器.
在 JIT 未来上调用结束回调.
记录函数(
记录,
未来
)
)
否则:
已分析的未来 =
PyTorch.
操作.
分析器.
调用 JIT 未来上的结束回调(
记录, fut
)
返回
被记录的未来
[文档]
类 emit_itt:
"""上下文管理器,使每个自动微分操作都发出 ITT 范围。"""
在使用 Intel(R) VTune Profiler 运行程序时很有用:
vtune <常规命令>
仪器和跟踪技术(ITT)API 使您的应用程序能够在执行期间生成并
控制不同 Intel 工具收集的跟踪数据。
这个上下文管理器用于注释 Intel(R) VTune 性能分析跟踪。借助这个上下文管理器,
您可以在 Intel(R) VTune 性能分析器 GUI 中看到标记的范围。
..警告:
此上下文管理器不应递归调用,即最多只能调用一次
实例在任何给定时间都应该处于启用状态。
参数:
启用(布尔值,可选):将 ``enabled=False`` 设置为无操作(no-op)的上下文管理器。
默认:``True``。
record_shapes(布尔值,可选):如果 ``record_shapes=True``,itt 范围包装
每个自动微分操作都会在以下格式中附加关于接收到的 Tensor 参数大小的信息
由该操作提供:
``[[arg0.size(0), arg0.size(1), ...], [arg1.size(0), arg1.size(1), ...], ...]``
非张量参数将由 ``[]`` 表示。
后端操作接收到的参数将按顺序列出。
请注意,这个顺序可能与在 Python 端传递参数的顺序不匹配。
还要注意,形状记录可能会增加 itt 范围创建的开销。
默认:``False``
示例:
>>> # xdoctest: +忽略("未定义变量")
>>> # xdoctest: +REQUIRES(env:TORCH_DOCTEST_AUTOGRAD_PROFILER)
>>> 使用 torch.autograd.profiler.emit_itt()
... 模型(x)
"""
def __init__(self, 启用=True, record_shapes=False):
self.启用 =
启用
self.进入 =
假
self.record_shapes = record_shapes
def __进入__(self):
如果
不 self.
启用:
返回
如果 self.
输入:
抛出
运行时错误(
ITT 注释上下文管理器不是可重入的)
self.进入 =
真实
运行在分析器启动时()
启用分析器(
分析器配置(
分析器状态.
意大利面,
self.record_shapes,
False,
False,
False,
False,
_实验配置(),
),
设置(),
)
返回 self
def __退出__(self,
异常类型,
异常值,
异常堆栈):
如果
不 self.
启用:
返回
_disable_profiler()
_在分析器停止时运行()
返回 False
[文档]
类 emit_nvtx:
"""上下文管理器,使每个自动微分操作都发出一个 NVTX 范围。
当在 nvprof 下运行程序时很有用:
nvprof --profile-from-start off -o trace_name.prof -- <regular command here>
很遗憾,无法强制 nvprof 将收集的数据刷新到磁盘,因此进行 CUDA 分析时必须使用此上下文管理器进行注释
以磁盘,因此进行 CUDA 分析时必须使用此上下文管理器进行注释
使用 nvprof 进行跟踪,并在检查之前等待进程退出。
然后,可以使用 NVIDIA Visual Profiler (nvvp)来可视化时间线,或者
使用:func:`torch.autograd.profiler.load_nvprof`可以加载结果以进行检查
例如在 Python 交互式解释器中。
..警告:
此上下文管理器不应递归调用,即在任何给定时间最多只能启用一个实例。
任何给定时间只能启用一个实例。
参数:
启用(布尔值,可选):将 ``enabled=False`` 设置为上下文管理器为无操作。
默认:``True``。
record_shapes (bool, 可选): 如果 ``record_shapes=True``,则 nvtx 范围包装
每个自动微分操作将附加接收到的张量参数的大小信息
通过该操作,以下格式:
``[[arg0.size(0), arg0.size(1), ...], [arg1.size(0), arg1.size(1), ...], ...]``
非张量参数将被表示为 ``[]``。
参数将按照后端操作接收的顺序列出。
请注意,这个顺序可能与传递参数时的顺序不匹配。
在 Python 端。另外请注意,形状记录可能会增加 nvtx 范围创建的开销。
默认:``False``
示例:
>>> # xdoctest: +SKIP("未定义变量")
>>> # xdoctest: +REQUIRES(env:TORCH_DOCTEST_AUTOGRAD_PROFILER)
>>> with torch.cuda.profiler.profile():
... 模型(x) # 预热 CUDA 内存分配器和分析器
... 使用 torch.autograd.profiler.emit_nvtx()
... 模型(x)
**正向反向相关性**
在使用 Nvidia Visual Profiler 中的`:class:`emit_nvtx`创建的配置文件中查看时,
将每个反向传播操作与相应的正向传播操作关联起来可能很困难。
为了简化这项任务,`:class:`emit_nvtx`将其生成的范围附加序列号信息。
生成。
在正向传播过程中,每个函数范围都被装饰为 `seq=`。`seq` 是一个运行
计数器,每次创建并存储新的 backward 函数对象时增加。
因此,与每个正向函数范围关联的 ``seq=`` 注解告诉您
如果由该正向函数创建了一个反向函数对象,
后向对象将接收到序列号 N。
在反向传播过程中,每个 C++反向函数的最高级范围包装
`apply()` 调用被装饰为 `stashed seq=`。`M` 是序列号,
向后对象是用.创建的。通过比较向后中的“stashed seq”编号与“seq”
按顺序排列的数字,您可以追踪哪个前向操作创建了每个后向函数。
在反向传播过程中执行的所有函数也装饰了 `seq=`。在
默认反向操作(`create_graph=False`)中,此信息无关紧要,实际上,
`N` 可能对所有这些函数都是 0。只有与顶级范围相关的
后向函数对象的 `apply()` 方法很有用,可以作为将这些函数对象与早期正向传递相关联的方式。
另一方面,如果正在进行带有 `create_graph=True` 的后向传递(换句话说,
**双重后向**
如果,另一方面,正在执行带有 `create_graph=True` 的后向传递(换句话说,
如果您正在设置双重反向操作),每个函数在反向执行
给定一个非零、有用的“seq=”。这些函数本身可能创建函数对象
稍后将在双反向过程中执行,就像原始函数在正向传递中做的那样。
向后和双向的关系在概念上与向后和双向后关系相同
在向前和向后之间:函数仍然发出带有当前序列号标记的范围,
它们创建的函数对象仍然存储这些序列号,并在最终的双向回溯过程中,
函数对象的 `apply()` 范围仍然带有 `存储的序列` 标记,
这些标记的序列号可以与反向传递中的 `seq` 序列号进行比较。
..警告:
序列号是线程局部变量,并且一些前向函数不会创建关联的
后向函数对象(而是将这一任务委托给调用链中的子函数)。
因此,在以下情况下,存储的序列号之间的对应关系
向后传递函数 `apply()` 中的 `seq` 数字范围
不保证一一对应。序列号本身可能不足以完全
消除混淆,确定哪个前向函数创建了哪个
后退函数对象。您可能需要根据分析知识做出判断。
预期对应应为。
"""
def __init__(self, 启用=True, record_shapes=False):
self.启用 =
启用
self.进入 =
假
self.记录形状 =
记录形状
def __进入__(self):
如果
不 self.
启用:
返回
如果 self.
输入:
抛出
运行时错误(
"NVTX 注解上下文管理器不可重入")
self.进入 =
真实
PyTorch.cuda.
同步()
运行在分析器启动时()
启用分析器(
分析器配置(
分析器状态.NVTX,
self.record_shapes,
False,
False,
False,
False,
_实验配置(),
),
设置(),
)
返回 self
def __退出__(self,
异常类型,
异常值,
异常堆栈):
如果
不 self.
启用:
返回
PyTorch.cuda.
同步()
_disable_profiler()
_在分析器停止时运行()
返回 False
[文档]def load_nvprof(path):
"""打开 nvprof 跟踪文件并解析 autograd 注释。
Args:
path (str): nvprof 跟踪文件的路径
"``"
返回 EventList 解析 nvprof_trace 路径()
[文档]类 EnforceUnique:
"``如果发现一个键被多次看到,则引发错误。``"
def __init__(self):
self.seen = set()
[文档] def see(self, *key):
r"""
观察一个键,如果它被多次看到,则引发错误。
"""
如果键在 self.seen 中:
引发 RuntimeError("重复键: " + str(key))
self.seen.add(key)
[文档]def
解析 nvprof 跟踪(
路径):
导入 sqlite3
连接 = sqlite3.
连接(
路径)
连.
行工厂 = sqlite3.
行
# 解析字符串表
字符串 = {}
for r 在
连接.
执行("SELECT _id_ as id, value FROM StringTable"):
字符串[r[
id]] =
PyTorch._C._demangle(r[
"值"])
首先,找到所有函数并为它们创建 FunctionEvents
标记查询 =
""
SELECT
start.id AS 标记_id, start.name, start.timestamp AS 开始时间, end.timestamp AS 结束时间
FROM
CUPTI_ACTIVITY_KIND_MARKER AS start INNER JOIN CUPTI_ACTIVITY_KIND_MARKER AS end
ON 起始.id = 结束.id
WHERE
起始.name 不等于 0 AND 结束.name 等于 0
"""
函数 =
输入文本为空,请提供需要翻译的文本
函数映射 = {}
唯一的 =
强制唯一()
for 行
在
连接.
执行(
标记查询):
独特.
看(
排[
"标记_id"])
事件 =
函数事件(
id=排[
marker_id
]
node_id=0, # 调用 FunctionEvent 时缺少 node_id。这只是为了确保
# 创建 FunctionEvent()对象时 pytorch 不会崩溃
名称=
字符串[
行[
名称]],
开始美国=
行[
"开始时间"
]
结束美国=
行[
结束时间
]
线程=0,
) # TODO: 在 sqlite 数据库中查找
函数.
添加(
事件)
函数映射[
事件.id] =
事件
现在,将所有内核与 FunctionEvents 关联
内核查询 =
""
SELECT
start.id AS marker_id, start.name, start.timestamp, end.timestamp,
runtime._id_ AS runtime_id, runtime.cbid, runtime.start AS runtime_start, runtime.end AS runtime_end,
kernel.start AS kernel_start, kernel.end AS kernel_end, kernel.name AS kernel_name
FROM
CUPTI_ACTIVITY_KIND_MARKER AS start
INNER JOIN CUPTI_ACTIVITY_KIND_MARKER AS end
ON start.id = end.id
INNER JOIN CUPTI_ACTIVITY_KIND_RUNTIME as runtime
ON (start.timestamp < runtime.start AND runtime.end < end.timestamp)
INNER JOIN CUPTI_ACTIVITY_KIND_CONCURRENT_KERNEL AS kernel
ON kernel.correlationId = runtime.correlationId
"""
唯一的 =
确保唯一()
for 行
在
连接.
执行(
内核查询):
独特.
查看(
行[
"标记 ID"
]
行[
"运行时 ID"])
# 211 是 CUDA >= 9.2 的 cudaKernelLaunch
断言
行["cbid"] == 211
事件 =
函数映射[
行[
"标记_id"]]
事件.
添加内核(
行[
内核名称
] 0,
行[
内核结束] -
行[
"内核启动"]
)
函数.
排序(
键=
Lambda 函数
事件:
事件.
时间范围.
开始)
返回
函数
[文档]
类
动作步数追踪器:
提供了一个全局增加步数计数的抽象。
之前,我们只有一个地方可以标记步数发生。
通过 pytorch profiler 的 step()函数在程序中进行。现在我们将添加步骤钩子
在 Optimizer 类中 https://github.com/pytorch/pytorch/issues/88446
- 这可能意味着已经在每个迭代中调用 profiler.step()的程序
最终会导致步骤计数重复增加。
如果一个模型使用多个优化器,我们也可以有双倍或更多的步数计数。
我们通过在调用 kineto 库中的 step()之前添加一层抽象来解决这个问题。想法是维护一个字典,以存储每个请求者的步数。
我们通过在调用 kineto 库中的 step()之前添加一层抽象来解决这个问题。想法是维护一个字典,以存储每个请求者的步数。
我们通过在调用 kineto 库中的 step()之前添加一层抽象来解决这个问题。想法是维护一个字典,以存储每个请求者的步数。
.. 代码块
{
"ProfilerStep": 100, # 由分析器 step() 调用触发
"Optimizer1Step": 100, # 优化器 1 或 2 只是示例,可以是 SGD、Adam 等
"Optimizer2Step": 100,
}
为了计算全局步数,只需取字典值的最大值(100)。
如果计数之一增加,最大值将上升。
.. 代码块
{
"ProfilerStep": 100,
"Optimizer1Step": 101,# Optimizer1 首次增加
"Optimizer2Step": 100,
}
然后全局步数是 101
只有当全局计数增加时,我们才调用 kineto step()函数。
请注意:请勿在优化器以外的模块中使用 KinetoStepTracker
目前。结果可能是步数的不正确增加。
"""
当前步骤 = 0
_step_dict: 字典[str, int] =
默认字典(int)
[文档] @类方法
def 初始化步骤计数(cls, 请求者: str):
r"""
初始化给定的请求者。
"""
cls._step_dict[requester] = cls._current_step
[文档] @classmethod
def erase_step_count(cls, requester: str) -> bool:
r"""
移除指定的请求者。
"""
返回 cls._step_dict.pop(requester, None) 是否为非空
[文档] @classmethod
def increment_step(cls, requester: str) -> int:
"""增加请求者的步骤计数。
此外,如果所有步骤计数中的最大值已增加,则
触发_kineto_step()返回全局步数
"""
如果请求者不在 cls._step_dict 中:
cls.init_step_count(requester)
cls._step_dict[requester] += 1
new_step = max(cls._step_dict.values())
if new_step > cls._current_step:
delta = new_step - cls._current_step
if delta > 1:
warn(
"分析器步骤计数增加超过 1 - "
f"当前步骤 = {cls._current_step} 步骤字典 = {cls._step_dict}"
)
for _ in 范围(0, delta):
_kineto_step()
cls._current_step = new_step
返回 cls._current_step
[文档] @classmethod
def current_step(cls) -> int:
r"""
获取任何请求者的最新步骤
"""
返回 cls._current_step