• 文档 >
  • 模块代码 >
  • torch >
  • torch.profiler.profiler
快捷键

torch.profiler.profiler 的源代码

# mypy: 允许未类型化定义
导入 gzip
导入 json
导入 操作系统
导入 shutil
导入 tempfile
from abc 导入 ABC, 抽象方法
from collections.abc 导入 迭代器
from 枚举 导入 枚举
from functools 导入 部分信息
from 打字 导入 任何, 可调用, 可选
from typing_extensions 导入 自身
from 警告 导入 警告

导入 火炬
导入 torch.autograd.profiler  专业
from torch._C 导入 _获取私有用途 1 后端名称
from torch._C._profiler 导入 (
    _添加执行跟踪观察者,
    _禁用执行跟踪观察者,
    _启用执行跟踪观察者,
    _实验配置,
    移除执行跟踪观察者,
)
from torch._环境 导入 is_fbcode
from torch 自动微分 导入 动作可用, 分析器活动
from torch.profiler._内存分析器 导入 内存配置文件, 内存配置文件时间线


全部 = [
    支持的活动,
    分析器操作,
    "安排",
    "TensorBoardTraceHandler",
    "配置文件",
    "执行跟踪观察者",
]
性能分析步骤名称 = "分析步骤"


 _Numpy 编码器(json.JSON 编码器):
    ""
用于 numpy 类型(np.int, np.float, np.array 等)的 Json 编码器
返回默认编码器,如果 numpy 不可用
"文档"

    定义 默认(, 对象):
        将 NumPy 类型编码为 JSON
        尝试:
            导入 numpy  np
        除了 导入错误:
            返回 json.JSONEncoder.默认(, 对象)
        如果 isinstance(对象, numpy.整数):
            返回 整数(对象)
        elif isinstance(对象, numpy.浮点):
            返回 float(对象)
        elif isinstance(对象, numpy.纳维数组):
            返回 对象.转列表()
        else:
            返回 json.JSONEncoder.默认(, 对象)


定义 支持的活动():
    ""
返回一组支持的分析器跟踪活动。

注意:分析器使用 CUPTI 库来跟踪设备上的 CUDA 内核。
当启用 CUDA 但 CUPTI 不可用时,
将 ``ProfilerActivity.CUDA`` 转换为使用传统 CUDA 的分析结果
分析代码(与传统的 ``torch.autograd.profiler`` 相同)。
这反过来又导致将 CUDA 时间包含在分析表输出中,
但不包括在 JSON 追踪中。
"文档"
    返回 火把.自动微分.支持的活动()


 _ITraceObserver(ABC):
    抽象的 Trace 观察者接口。
该接口满足 3 个方法:开始、停止和清理。

    @abstractmethod
    定义 开始():
        通过

    @abstractmethod
    定义 停止():
        通过

    @abstractmethod
    定义 清理():
        通过


[文档] _KinetoProfile: 低级性能分析器包装自动微分性能分析活动 参数: 活动列表(可迭代):用于性能分析的 activity groups 列表(CPU,CUDA),支持值: ``torch.profiler.ProfilerActivity.CPU``,``torch.profiler.ProfilerActivity.CUDA``, ``torch.profiler.ProfilerActivity.XPU``. 默认值:ProfilerActivity.CPU 以及(当可用时)ProfilerActivity.CUDA 或(当可用时)ProfilerActivity.XPU。 record_shapes (bool):保存操作符输入形状的信息。 profile_memory (bool): 跟踪张量内存分配/释放(参见 ``export_memory_timeline``) for more details). with_stack (bool): 记录操作的源信息(文件和行号)。 with_flops (bool): 使用公式估计特定操作的计算量(FLOPS)。 (矩阵乘法和二维卷积)。 with_modules (bool): 记录模块层次结构(包括函数名)。 对应操作符的调用栈。例如,如果模块 A 的前向调用 模块 B 的前向包含一个 aten::add 操作, 然后 aten::add 的模块层次结构是 A.B 注意,目前这种支持仅存在于 TorchScript 模型中 而不是急切模式模型中。 experimental_config (_ExperimentalConfig) : 一组实验性选项 由 Kineto 等分析库使用。请注意,不保证向后兼容性。 执行跟踪观察者(ExecutionTraceObserver):一个 PyTorch 执行跟踪观察者对象。 `PyTorch 执行跟踪 `__ 提供了 AI/ML 工作负载的基于图的表示,并支持重放基准测试、模拟器和仿真器。 它能够实现 AI/ML 工作负载的基于图的表示,并支持重放基准测试、模拟器和仿真器。 当包含此参数时,观察者的 start()和 stop()方法将与 PyTorch 分析器在相同的时间窗口内被调用。 启用形状和堆栈跟踪会导致额外的开销。 acc_events (bool): 启用在多个分析周期内累积 FunctionEvents .. 注意:: 此 API 是实验性的,未来可能会发生变化。 启用形状和堆栈跟踪会导致额外的开销。 当指定 record_shapes=True 时,分析器将暂时保留对张量的引用; 这可能会进一步防止某些依赖于引用计数的优化,并引入额外的张量副本。 "文档" 定义 __init__( , *, 活动: 可选[迭代器[分析器活动]] = , record_shapes: 布尔 = 错误, 用户内存: 布尔 = 错误, with_stack: 布尔 = 错误, 带有 FLOPS: 布尔 = 错误, 带模块: 布尔 = 错误, 实验性配置: 可选[_实验配置] = , 执行跟踪观察者: 可选[_ITraceObserver] = , 事件: 布尔 = 错误, 自定义跟踪 ID 回调: 可选[可调用[] 字符串]] = , ): .activities = 设置(活动) 如果 activities 否则 supported_activities() .记录形状 = 记录形状 .带浮点运算 = 带浮点运算 .用户内存 = 用户内存 .带栈 = 带栈 .带有模块 = 带有模块 .实验配置 = 实验配置 .执行跟踪观察者 = 执行跟踪观察者 .访问事件 = 访问事件 .自定义跟踪 ID 回调 = 自定义跟踪 ID 回调 .分析器: 可选[性能分析.个人资料] = .内存跟踪: 可选[内存配置文件时间线] = .使用设备 = 如果 分析器活动.CUDA .活动: .使用设备 = "cuda" elif 分析器活动.XPU .活动: .使用设备 = xpu elif 分析器活动.MTIA .活动: .使用设备 = mtia elif 分析器活动.HPU .活动: .使用设备 = "hpu" elif 分析器活动.私有用途 1 .活动: .使用设备 = _get_privateuse1_backend_name() # 用户定义的元数据以供跟踪修改 .预设元数据: 字典[字符串, 字符串] = {} 定义 开始(): .准备追踪() .开始追踪() 定义 停止(): .停止追踪() 定义 准备追踪(): 如果 (.分析器 ) 或者 ( .事件): .分析器 = 专业.个人资料( 使用 CPU=(分析器活动.CPU .活动), use_device=.use_device, record_shapes=.record_shapes, 带有 FLOPS=.带有 FLOPS, 用户内存=.用户内存, with_stack=.with_stack, 带模块=.带模块, 使用 kineto=, 实验性配置=.实验性配置, 事件=.事件, 自定义跟踪 ID 回调=.自定义跟踪 ID 回调, ) .分析器.准备追踪() 定义 开始追踪(): 如果 .执行追踪观察者: .执行追踪观察者.开始() 断言 .分析器 .分析器._开始跟踪() 如果 .用户内存: .添加元数据 JSON("配置内存", "1") 如果 .with_stack: .添加元数据 JSON(带栈, "1") 如果 .record_shapes: .添加元数据 JSON(记录形状, "1") 如果 .带模块: .添加元数据 JSON(使用模块, "1") 如果 .带有 FLOPS: .添加元数据 JSON(with_flops, "1") 如果 动作可用(): 分发信息 = ._获取分发信息() 如果 分发信息: .添加元数据 JSON( 分布式信息, json.导出(分布信息, =_Numpy 编码器) ) 如果 有属性(火把, _inductor): 导入 torch._inductor 配置 电感器配置 如果 电感器配置.三叉戟.cuDAG 图: os.环境["禁用 CUPTI 懒加载初始化"] = 1 .添加元数据 JSON("禁用 CUPTI 懒加载初始化", "1") # FIXME: CUDA 图形与 CUPTI 清理工作不兼容。 # 1) 在清理工作后第一次懒加载 CUPTI 重新初始化时崩溃(CUDA 11) # 2) 在清理工作后第二次非懒加载 CUPTI 重新初始化时崩溃(CUDA 12) # 解决方案:当使用 CUDA 图形时,关闭 CUPTI 清理工作。 os.环境["CUPTI 解构"] = "零" 插入预设用户元数据到跟踪 for k, v .预设元数据.项目(): .添加元数据 JSON(k, v) 定义 停止跟踪(): 如果 .执行跟踪观察者: .执行跟踪观察者.停止() 断言 .分析器 .分析器.__退出__(, , )
[文档] def export_chrome_trace(self, path: str): """ 将收集到的跟踪数据导出为 Chrome JSON 格式。如果启用了 kineto,则只导出 调度中的最后一个周期。 """ 断言 self.profiler 如果 path.endswith(".gz"): fp = tempfile.NamedTemporaryFile("w+b", suffix=".json", delete=False) fp.close() retvalue = self.profiler.export_chrome_trace(fp.name) with open(fp.name, "rb") as fin: with gzip.open(path, "wb") as fout: fout.writelines(fin) os.remove(fp.name) return retvalue else: 返回 self.profiler.export_chrome_trace(path)
[文档] def export_stacks(self, path: str, metric: str = "self_cpu_time_total"): """保存堆栈跟踪到文件 参数: path (str): 将堆栈文件保存到该位置; metric (str): 要使用的指标: "self_cpu_time_total" 或 "self_cuda_time_total" """ 断言 self.profiler 返回 self.profiler.export_stacks(path, metric)
[文档] def toggle_collection_dynamic( self, enable: bool, activities: Iterable[ProfilerActivity] ): """在收集过程中的任何时刻切换活动的收集开关。目前支持切换 Torch Ops (CPU)和 CUDA 活动在 Kineto 中得到支持 Args: activities (可迭代对象): 要在分析中使用的活动组列表,支持的值: ``torch.profiler.ProfilerActivity.CPU``,``torch.profiler.ProfilerActivity.CUDA`` 示例: .. 代码块 :: python with torch.profiler.profile( activities=[ torch.profiler.ProfilerActivity.CPU, torch.profiler.ProfilerActivity.CUDA, ] ) 作为 p: code_to_profile_0() // 关闭收集所有 CUDA 活动 p.toggle_collection_dynamic(False, [torch.profiler.ProfilerActivity.CUDA]) code_to_profile_1() // 开启收集所有 CUDA 活动 // p.toggle_collection_dynamic(True, [torch.profiler.ProfilerActivity.CUDA]) // code_to_profile_2() // 打印(p.key_averages().table( sort_by="self_cuda_time_total", 行数限制=-1)) """ 如果没有 self.profiler: 返回 self.profiler.toggle_collection_dynamic(enable, activities)
[文档] def key_averages( self, group_by_input_shape: bool = False, group_by_stack_n: int = 0, group_by_overload_name: bool = False, ): """平均事件,按操作符名称和(可选)输入形状进行分组, 并且重载名称。 .. 注意:: 要使用形状/堆栈功能,请确保在创建分析器上下文管理器时设置 record_shapes/with_stack 当创建分析器上下文管理器时。 """ 断言 self.profiler 返回 self.profiler.key_averages( 按输入形状分组,按堆栈数量分组,按重载名称分组 )
[文档] def events(self): """ 返回未聚合的剖析器事件列表, 用于在跟踪回调或分析完成后使用 """ 断言 self.profiler 返回 self.profiler.function_events
[文档] def add_metadata(self, key: str, value: str): """ 添加用户定义的元数据,具有字符串键和字符串值 到跟踪文件中 """ wrapped_value = '"' + value.replace('"', '\\"') + '"' torch.autograd._add_metadata_json(key, wrapped_value)
[文档] def add_metadata_json(self, key: str, value: str): "" 添加用户定义的元数据,具有字符串键和有效的 JSON 值 到跟踪文件中 "" torch.autograd._add_metadata_json(key, value)
[文档] def preset_metadata_json(self, key: str, value: str): """ 预设用户定义的元数据,当分析器未启动时 稍后添加到跟踪文件中。 元数据格式为字符串键和有效的 JSON 值。 """ self.preset_metadata[key] = value
定义
获取分布式信息(): 导入 torch.distributed dist 如果 距离.是否可用() 或者 距离.已初始化(): 返回 后端 = 距离.获取后端() 分布式信息 = { "backend": 后端, "排名": 距离.获取排名(), "世界大小": 距离.获取世界大小(), pg_count: 距离.get_pg_count(), pg_config: 距离.分布式_c10d._获取所有 pg 配置(), } 如果 后端 == "nccl": nccl 版本 = 火把.cuda.nccl.版本() 分发信息[nccl 版本] = “。”.连接(字符串(v) for v nccl 版本) 返回 分发信息 定义 内存配置文件() -> MemoryProfile: 必需 = (记录形状, "配置内存", 带堆栈) 缺失 = [f"{i}“真” for i 必需 如果 getattr(, i)] 如果 缺失: raise ValueError(f"{“,”.连接(缺失)}用于内存分析) 断言 .分析器 并且 .分析器.动态结果 返回 内存分析器(.分析器.动态结果)
[文档] def 导出内存时间线(self, path: str, device: Optional[str] = None) -> None: """从分析器收集的内存事件信息导出 为特定设备生成树形图,并导出时间线图。有 3 个可导出的文件使用`export_memory_timeline`,每个文件由`path`的扩展名控制。 使用`export_memory_timeline`导出可导出的文件,每个文件由`path`的扩展名控制。 ``path``的扩展名。 - 对于兼容 HTML 的图表,使用扩展名`.html`,以及内存时间线 故事情节将作为 PNG 文件嵌入 HTML 文件中。 - 对于由 ``[时间, [按类别的大小]]`` 组成的情节点, ``时间`` 是时间戳,``大小`` 是每个类别的内存使用情况。 内存时间线图将保存为 JSON(``.json``)或 gzip 压缩的 JSON (``.json.gz``)根据后缀。 - 对于原始内存点,使用后缀 `.raw.json.gz`。每个原始内存 事件将包含`(时间戳, 操作, 字节数, 类别)`,其中 `action` 是 `[PREEXISTING, CREATE, INCREMENT_VERSION, DESTROY]` 中的一个 并且 ``category`` 是枚举中的一个 ``torch.profiler._memory_profiler.Category``. 输出:以 gzip 压缩的 JSON、JSON 或 HTML 格式编写的内存时间线。 """ 默认为设备 0,如果未设置。使用 CPU 作为后备。 如果 device 为 None: 如果 self.use_device 且 self.use_device 不等于"cuda": device = self.use_device + ":0" else: device = "cuda:0" if torch.cuda.is_available() else "cpu" # 构建内存时间线图数据 self.mem_tl = MemoryProfileTimeline(self._memory_profile()) 根据文件后缀,将数据保存为 json.gz 或 json。 对于 HTML,我们可以将图片嵌入到 HTML 文件中。 如果路径以".html"结尾: self.mem_tl.export_memory_timeline_html(path, device) elif 路径.endswith(".gz"): fp = tempfile.NamedTemporaryFile("w+t", suffix=".json", delete=False) fp.close() if 路径.endswith("raw.json.gz"): self.mem_tl.export_memory_timeline_raw(fp.name, device) else: self.mem_tl.export_memory_timeline(fp.name, device) with open(fp.name) as fin: 使用 gzip.open(path, "wt") as fout: fout.writelines(fin) os.remove(fp.name) else: self.mem_tl.export_memory_timeline(path, device)
[文档]class ProfilerAction(Enum): """ 在指定间隔内可以执行的分析器操作 "空字符串" NONE = 0 WARMUP = 1 RECORD = 2 记录并保存 = 3
[文档]定义 调度( *, 等待: 整数, 预热: 整数, 活动: 整数, 重复: 整型 = 0, 跳过第一个: 整型 = 0, 跳过前几步等待: 整型 = 0, ) -> 可调用: "" 返回一个可调用对象,用作分析器 `schedule` 参数。分析器将跳过 前面的 `skip_first` 步,然后等待 `wait` 步,然后进行下一个 `warmup` 步的预热, 然后进行下一个 `active` 步的活跃记录,然后重复循环,从 `wait` 步开始。 可选的循环次数由 `repeat` 参数指定,零值表示 周期将持续到配置文件完成。 ``skip_first_wait`` 参数控制是否跳过第一个 ``wait`` 阶段。 这可以在用户想要在循环之间等待的时间长于 `skip_first` 时很有用,但不是 对于第一个配置文件。例如,如果 `skip_first` 为 10 且 `wait` 为 20,则第一个循环将 等待 10 + 20 = 30 步,如果 `skip_first_wait` 为零,但如果是非零,则只需等待 10 步 步。然后所有后续循环将在最后一个活动和预热之间等待 20 步。 "文档" 定义 schedule_fn(步长: 整数) -> ProfilerAction: 断言 步骤 >= 0 如果 步骤 < skip_first: 返回 ProfilerAction. else: 步骤 -= 跳过第一个 如果等待 >> 跳过第一个,并且我们想提前获取性能分析,当 skip_first_wait 为 True 时,向左移动等待时间 如果 跳过第一个等待 != 0: 步骤 += 等待 步骤数量 = 等待 + 预热 + 活动 如果 重复 > 0 并且 步骤 / 步骤数量 >= 重复: 返回 分析器操作. 模块步骤 = 步骤 % 步骤数量 如果 模块步骤 < 等待: 返回 分析器动作. elif 模块步骤 < 等待 + 预热: 返回 分析器动作.预热 else: 返回 ( 分析器动作.记录 如果 模块步骤 < 步骤数量 - 1 否则 分析器动作.记录并保存 ) 断言 ( 等待 >= 0 并且 预热 >= 0 并且 活动 > 0 并且 重复 >= 0 并且 跳过第一个 >= 0 ), "无效的剖析器调度参数" 如果 预热 == 0: 警告("剖析器不会使用预热,这可能会影响剖析器结果") 返回 调度函数
定义 _default_schedule_fn(_: 整数) -> 分析器操作: "" 默认分析器行为 - 立即开始记录事件, 在每个分析器步骤中持续进行。 "文档" 返回 分析器操作.记录
[文档]def tensorboard_trace_handler( dir_name: str, worker_name: Optional[str] = None, use_gzip: bool = False ): ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 输出跟踪文件到 ``dir_name`` 目录,然后该目录可以 直接传递到 tensorboard 作为日志目录。 ``worker_name`` 应在每个分布式场景中的每个工作器中是唯一的, 默认情况下将设置为 '[hostname]_[pid]'。 """ 导入 os 导入 socket 模块 导入 time 模块 def handler_fn(prof) -> None: 非局部变量 worker_name 如果没有 os.path.isdir(dir_name): try: os.makedirs(dir_name, exist_ok=True) except Exception as e: raise RuntimeError("无法创建目录: " + dir_name) from e if not worker_name: worker_name = f"{socket.gethostname()}_{os.getpid()}" # 在导出跟踪时使用纳秒以避免命名冲突 file_name = f"{worker_name}.{time.time_ns()}.pt.trace.json" if use_gzip: file_name = file_name + ".gz" prof.export_chrome_trace(os.path.join(dir_name, file_name)) 返回处理函数
[文档] 个人资料(_KinetoProfile): """分析器上下文管理器。 参数: 活动项(可迭代):用于分析的活动组列表(CPU、CUDA),支持值: ``torch.profiler.ProfilerActivity.CPU``, ``torch.profiler.ProfilerActivity.CUDA``, ``torch.profiler.ProfilerActivity.XPU``. 默认值:ProfilerActivity.CPU 以及(当可用时)ProfilerActivity.CUDA 或(当可用时)ProfilerActivity.XPU. 调度(Callable):接受步长(int)作为单个参数的可调用对象 ``ProfilerAction`` 值,指定在每一步要执行的剖析器操作 on_trace_ready(Callable):在每一步当 ``schedule`` 返回 ``ProfilerAction.RECORD_AND_SAVE`` 时被调用的可调用对象 在剖析过程中,当 ``schedule`` 返回 ``ProfilerAction.RECORD_AND_SAVE`` 时,将执行记录和保存操作 record_shapes (bool): 保存操作符输入形状信息。 profile_memory (bool): 跟踪张量内存分配/释放。 with_stack (bool): 记录操作符的源信息(文件和行号)。 with_flops (bool): 使用公式估计特定操作符的 FLOPs(浮点运算)。 (矩阵乘法和二维卷积)。 with_modules (bool): 记录模块层次结构(包括函数名)。 对应操作符的调用栈。例如,如果模块 A 的前向调用 模块 B 的前向包含一个 aten::add 操作, 然后 aten::add 的模块层次结构是 A.B 注意,目前这种支持仅存在于 TorchScript 模型中 而不是急切模式模型中。 experimental_config (_ExperimentalConfig) : 一组实验性选项 用于 Kineto 库功能。注意,不保证向后兼容性。 execution_trace_observer(ExecutionTraceObserver):PyTorch 执行跟踪观察器对象。 `PyTorch 执行跟踪 `__ 提供基于图的 AI/ML 工作负载的表示,并启用重放基准测试、模拟器和仿真器。 当包含此参数时,观察者的 start()和 stop()将在 PyTorch 分析器相同的时间窗口内被调用。 请参阅下面的示例部分以获取代码示例。 acc_events (bool): 启用在多个分析周期内累积 FunctionEvents use_cuda(布尔值): .. 已弃用:: 1.8.1 使用 ``activities`` 代替。 .. 注意:: 使用 :func:`~torch.profiler.schedule` 生成可调用的调度计划。 在分析长时间训练作业时,非默认调度很有用。 允许用户在不同迭代中获取多个跟踪信息 训练过程的跟踪信息。 默认计划简单地记录上下文管理器持续期间的所有事件。 持续时间。 .. 注意:: 使用 `:func:`~torch.profiler.tensorboard_trace_handler` 生成 TensorBoard 的结果文件: ``on_trace_ready=torch.profiler.tensorboard_trace_handler(dir_name)`` 分析完成后,结果文件将保存在指定的目录中。使用以下命令: ``tensorboard --logdir dir_name`` 在 TensorBoard 中查看结果。 更多信息,请参阅 `PyTorch Profiler TensorBoard 插件 `__ .. 注意:: 启用形状和堆栈跟踪结果将导致额外的开销。 当指定 record_shapes=True 时,分析器将暂时保留张量的引用 可能进一步防止依赖于引用计数的某些优化并引入 额外张量副本。 示例: .. 代码块 :: python 使用 torch.profiler.profile() activities=[ torch.profiler.ProfilerActivity.CPU, torch.profiler.ProfilerActivity.CUDA, ] ) as p: profile 代码() 打印(p.key_averages().table( 排序依据="self_cuda_time_total",行限制=-1 使用分析器的 ``schedule``、``on_trace_ready`` 和 ``step`` 函数: .. 代码块 :: python 非默认分析器调度允许用户开启和关闭分析器 在训练循环的不同迭代中; # trace_handler 每次有新的跟踪信息可用时被调用 def trace_handler(跟踪处理程序): print(prof.key_averages().table( 排序方式="self_cuda_time_total",行限制=-1)) # prof.export_chrome_trace("/tmp/test_trace_" + str(prof.step_num) + ".json") with torch.profiler.profile( activities=[ torch.profiler.ProfilerActivity.CPU, torch.profiler.ProfilerActivity.CUDA, ] # 在此示例中,wait=1,warmup=1,active=2,repeat=1, # 跳过第一个步骤/迭代, # 从第二个开始预热,记录 # 第三和第四次迭代, # 之后将可获取跟踪信息 当设置_on_trace_ready(当设置)时被调用; 然后从下一个步骤开始重复循环 schedule=torch.profiler.schedule( wait=1, warmup=1, active=2, repeat=1), on_trace_ready=trace_handler # on_trace_ready=torch.profiler.tensorboard_trace_handler('./log') # used when outputting for tensorboard ) 作为 p: for iter in range(N): code_iteration_to_profile(iter) 向分析器发送信号,表示下一个迭代已开始 p.step() 以下示例展示了如何设置执行跟踪观察器(`execution_trace_observer`) .. 代码块 :: python 使用 torch.profiler.profile( ... 执行跟踪观察者 = ExecutionTraceObserver().register_callback("./execution_trace.json") ), ) as p: for iter in range(N): code_iteration_to_profile(iter) p.step() 您也可以参考 tests/profiler/test_profiler.py 中的 test_execution_trace_with_kineto()。 注意:也可以传递满足 _ITraceObserver 接口的任何对象。 "文档" 定义 __init__( , *, 活动: 可选[迭代器[分析器活动]] = , 调度: 可选[可调用[[整数] 分析器动作]] = , 当跟踪准备就绪时: 可选[可调用[..., 任何]] = , record_shapes: 布尔 = 错误, 用户内存: 布尔 = 错误, with_stack: 布尔 = 错误, 带有 FLOPS: 布尔 = 错误, 带模块: 布尔 = 错误, 实验性配置: 可选[_实验配置] = , 执行跟踪观察者: 可选[_ITraceObserver] = , 事件: 布尔 = 错误, # 已弃用 使用 CUDA: 可选[布尔] = , 自定义跟踪 ID 回调: 可选[可调用[] 字符串]] = , ): activities_set = 设置(活动) 如果 activities 否则 支持的活动() 如果 使用 CUDA : 警告( `use_cuda` 已弃用,请使用 `activities` 参数代替, 未来警告, 栈级别=2, ) 如果 使用 CUDA: 活动集合.添加(分析器活动.CUDA) elif 分析器活动.CUDA 活动集合: 活动集合.删除(分析器活动.CUDA) 断言 长度(活动集合) > 0, 未找到有效的分析器活动 超级().__init__( 活动=活动, record_shapes=record_shapes, 用户内存=用户内存, with_stack=with_stack, 带有 FLOPS=带有 FLOPS, 带模块=带模块, 实验性配置=实验性配置, 执行跟踪观察者=执行跟踪观察者 如果 执行跟踪观察者 否则 ExecutionTraceObserver.从环境构建执行跟踪观察者(), 事件=事件, 自定义跟踪 ID 回调=自定义跟踪 ID 回调, ) 如果 调度: .安排 = 安排 在跟踪和表格视图中添加步骤标记 .记录步骤 = 真实 else: .时间表 = _默认时间表函数 .记录步骤 = .当跟踪准备就绪时 = 跟踪就绪 .步骤编号 = 0 .当前动作 = .调度(.步骤编号) .步骤记录函数: 可选[专业.记录功能] = .动作映射: 字典[ 元组[分析器动作, 可选[分析器操作]], 列表[任何] ] = { # key 是(prev_action,current_action),value 是对应状态对的动作列表。 (分析器操作., 分析器操作.): [] (分析器操作., 分析器操作.预热): [.准备跟踪] (分析器操作., 分析器动作.记录): [ .准备追踪, .开始追踪, ] (分析器动作., 分析器操作.记录并保存): [ .准备跟踪, .开始跟踪, ] (分析器操作.预热, 分析器操作.): [ 偏函数(警告, "时间安排错误:WARMUP 之后跟 NONE"), .开始追踪, .停止追踪, ] (分析器动作.预热, 分析器动作.预热): [] (分析器动作.预热, 分析器动作.记录): [.开始追踪] (分析器操作.预热, 分析器操作.记录并保存): [.开始追踪] (分析器动作.记录, 分析器动作.): [ 偏函数(警告, "错误的日程:记录后跟无"), .停止跟踪, ] (分析器动作.记录, 分析器动作.预热): [ 偏函数(警告, "不正确的调度:记录后跟预热"), .停止追踪, ] (分析器动作.记录, 分析器动作.记录): [] (分析器动作.记录, 分析器动作.记录并保存): [] (分析器动作.记录并保存, 分析器动作.): [ .停止追踪, ._追踪就绪, ] (分析器动作.记录并保存, 分析器动作.预热): [ .停止跟踪, .跟踪准备就绪, .准备跟踪, ] (分析器动作.记录并保存, 分析器操作.记录): [ .停止跟踪, ._跟踪就绪, .准备追踪, .开始追踪, ] (分析器动作.记录并保存, 分析器操作.记录并保存): [ .停止跟踪, ._跟踪就绪, .准备追踪, .开始追踪, ] # 用于退出操作 (分析器动作.预热, ): [.开始追踪, .停止追踪] (分析器动作.记录, ): [.停止追踪, ._追踪就绪] (分析器动作.记录并保存, ): [ .停止追踪, ._追踪就绪, ] } # 开始跟踪对分析步骤的增量,这将被使用 # by 动势 专业.动作步数追踪器.初始化步骤数(分析器步骤名称) 定义 __进入__(): .开始() 返回 self 定义 __退出__(, 异常类型, 异常值, 异常堆栈): .停止() prof.动作步数追踪器.删除步骤数(用户配置文件步骤名称) 如果 .执行跟踪观察者: .执行跟踪观察者.清理() 定义 开始(): ._transit_action(分析器操作., .当前操作) 如果 .记录步骤: .步骤记录函数 = 专业.记录功能( "分析器步骤#" + 字符串(.步骤编号) ) .步骤记录函数.__进入__() 定义 停止(): 如果 .记录步骤 并且 .步骤记录函数: .步骤记录函数.__退出__(, , ) ._transit_action(.当前动作, )
[文档] def 步骤(self): "" 通知分析器下一个分析步骤已开始。 "" 如果 self.record_steps 和 self.step_rec_fn 为真: self.step_rec_fn.__exit__(None, None, None) prev_action = self.current_action self.step_num += 1 self.current_action = self.schedule(self.step_num) self._transit_action(prev_action, self.current_action) if os.environ.get("KINETO_USE_DAEMON", "") or ( is_fbcode() and os.environ.get("KINETO_FORCE_STEP_HOOK", "") ): prof.KinetoStepTracker.increment_step(PROFILER_STEP_NAME) 如果 self.record_steps: self.step_rec_fn = prof.record_function( "ProfilerStep#" + str(self.step_num) ) self.step_rec_fn.__enter__()
[文档] def set_custom_trace_id_callback(self, callback): """ 设置一个回调,当生成新的跟踪 ID 时将被调用。 """ self.custom_trace_id_callback = callback
[文档] def get_trace_id(self): "``" 返回当前的跟踪 ID。 "``" 如果 self.profiler 为 None: 返回 None 返回 self.profiler.trace_id
定义
_trace_ready(): 如果 .on_trace_ready: .跟踪就绪() 定义 _过渡动作(, 上一个动作, 当前动作): 动作列表 = .动作映射.获取((上一个动作, 当前动作)) 如果 动作列表: for 动作 动作列表: 动作() 定义 统计() -> 可选[职业.分析器统计信息] 如果 .分析器 : 返回 返回 .分析器.统计
执行跟踪观察者(_ITraceObserver): 执行跟踪观察者 每个进程可以有一个执行跟踪观察者实例。观察者 可以通过调用 register_callback()来记录函数回调 显式地。如果不调用 unregister_callback(),重复调用 register_callback() 不会添加额外的观察者以记录函数 回调。一旦创建了一个 ExecutionTraceObserver,则调用 start()和 stop() 方法控制事件数据何时被记录。 删除或调用 unregister_callback()将移除观察者 记录函数回调,最终输出文件,并将停止 产生任何开销。 "文档" 定义 __init__() -> : "" 初始化默认状态。 "文档" .已注册 = .执行跟踪正在运行 = .收集额外资源 = .资源目录: 字符串 = 请提供需要翻译的文本 .输出文件路径: 字符串 = 请提供需要翻译的文本 .输出文件路径观察者: 字符串 = 请提供需要翻译的文本 定义 __del__(): "" 调用 unregister_callback()以确保最终化输出。 "文档" .unregister_callback() @staticmethod 定义 从环境构建执行跟踪观察器() -> 可选["执行跟踪观察者"] "" 如果环境变量 ENABLE_PYTORCH_EXECUTION_TRACE 设置为 1,则返回一个 ExecutionTraceObserver 实例,否则返回 None。 配置观察者以收集额外的资源,如果环境变量 设置为 1,则返回一个 ExecutionTraceObserver 实例,否则返回 None。 ``ENABLE_PYTORCH_EXECUTION_TRACE_EXTRAS=1``。这些是生成内核、索引张量数据等资源,这些资源是使执行跟踪可重放所必需的。 等等。这些是使执行跟踪可重放所必需的资源,例如生成的内核、索引张量数据等。 "文档" 如果 os.环境.获取("ENABLE_PYTORCH_EXECUTION_TRACE", "0") == "1": 尝试: fp = 临时文件.命名临时文件("w+t", 后缀=".et.json", 删除=错误) 除了 异常 e: 警告( f"执行跟踪将不会被记录。在创建默认临时文件时发生异常:"{e}" ) 返回 fp.关闭() et = 执行跟踪观察者() et.注册回调(fp.名称) 此外,检查环境是否需要我们收集额外资源 如果 os.环境.获取(启用 PYTORCH 执行跟踪额外信息, "0") == "1": et.设置额外资源集合() else: et.设置额外资源集合(错误) 返回 et 返回 定义 设置额外资源集合(, val) -> : "" 收集额外资源,如生成的内核、索引张量数据以及其他 元数据,用于完成执行跟踪内容。 调用者应在调用 register_callback()之后,使用 val=True 调用此方法,如果他们想 收集额外资源。 "文档" .额外资源收集 = val 如果 .额外资源收集: .获取资源目录(可以创建=) 返回 定义 注册回调(, 输出文件路径: 字符串) -> 自身: "" 添加 ET 观察者以记录函数回调。数据将被 写入到输出文件路径。 "文档" 定义 获取临时未压缩文件() -> 字符串: 文件指针 = 临时文件.命名临时文件("w+b", 后缀=".json", 删除=错误) fp.关闭() 返回 fp.名称 如果 ._已注册: .输出文件路径 = 输出文件路径 如果 输出文件路径.以...结尾(".gz"): 输出文件路径 = 获取未压缩文件() .输出文件路径观察者 = 输出文件路径 .已注册 = 添加执行跟踪观察者(输出文件路径) 返回 self 定义 获取资源目录(, 可以创建=错误) -> 可选[字符串] "" 生成用于生成的内核的资源目录, 或索引张量数据或任何其他所需元数据 以完成执行跟踪内容。 目录将在 ET 文件输出位置创建。 仅当观察者已调用 set_extra_resource_collection(val=True)时才有效。 如果观察者未配置额外资源收集,则返回 None。 "文档" 如果 .额外资源收集: 返回 如果 .资源目录: # 已创建 返回 .资源目录 生成路径 = 执行跟踪观察者.根据 et_path 获取资源目录( .输出文件路径, 创建目录=可以创建 ) 如果 生成的路径: # 无法找到或创建资源目录 返回 .资源目录 = 生成路径 返回 .资源目录 @staticmethod 定义 获取用于 ET 路径的资源目录( 跟踪路径, 创建目录: 布尔 = ) -> 可选[字符串] 工作目录, 文件名 = os.路径.分割(跟踪路径) 资源目录 = os.路径.连接( 工作目录, os.路径.splitext(文件名)0] + _资源 ) 如果 os.路径.存在(资源目录): 如果 创建目录: 尝试: os.建立目录(资源目录) 除了 异常: 警告(f创建时执行跟踪异常{资源目录}") 返回 else: 返回 返回 资源目录 定义 注销回调(): "" 从记录函数回调中移除 ET 观察者。 "文档" 定义 _save_triton_kernels() -> : 尝试: 资源目录 = .获取资源目录() 除了 异常 e: 警告( f"生成资源目录时执行跟踪异常:"{e}" ) 返回 如果 resource_dir: 返回 # 保存生成的内核的内核路径 from torch._inductor.codecache 导入 PyCodeCache PyCodeCache kernel_files = [ v.__file__ for v Py 代码缓存.模块 如果 getattr(v, "__文件__", ) ] for 内核文件 内核文件列表: 如果 内核文件 : continue 名称 = os.路径.basename(内核文件) dst = os.路径.连接(资源目录, 名称) shutil.复制文件(内核文件, 目标) 定义 保存_gz 文件(未压缩文件: 字符串, 输出文件: 字符串) -> : 打印(f执行跟踪:压缩{未压缩文件}{输出文件}") 打开(解压缩文件, rb) 结束符: gzip.打开(输出文件, wb) 输出文件: 失败.写行(结束) os.删除(未压缩文件) 如果 .已注册: .停止() 尝试: 保存 Triton 内核() 除了 异常 e: 警告(f执行跟踪失败,无法保存内核:{e}") 移除执行跟踪观察者() 如果 .输出文件路径.以...结尾(gz): _保存_gz_文件(.输出文件路径观察者, .输出文件路径) ._已注册 = @property 定义 是否已注册(): "" 如果执行跟踪观察者已注册,则返回 True,否则返回 False。 "文档" 返回 .已注册 定义 正在运行(): "" 如果观察者正在运行,则返回 True,否则返回 False。 "文档" 返回 ._执行跟踪正在运行 定义 开始(): "" 开始捕获。 "文档" 如果 .已注册 并且 .执行跟踪正在运行: 启用执行跟踪观察者() .执行跟踪运行 = 真实 .记录配置() 定义 停止(): "" 停止以捕获。 "文档" 如果 .执行跟踪正在运行: 禁用执行跟踪观察者() .执行跟踪正在运行 = 定义 清理(): "" 调用 unregister_callback()以确保最终化输出。 "文档" .注销回调() 定义 获取输出文件路径() -> 可选[字符串] "" 返回输出文件名或 None。 "文档" 如果 .输出文件路径: 返回 .输出文件路径 else: 返回 定义 _记录_pg 配置() -> : # 将 PG 配置信息记录到跟踪中,格式为节点: # ## process_group:init ## 如果 ( .是否已注册 并且 火把.分布式.是否可用() 并且 火把.分布式.已初始化() ): pg_config_info = 火把.分布式.分布式_c10d._世界.pg_config_info 火把.自动微分._record_function_with_args_enter( "##进程组:初始化##", json.压缩包(pg_config_info, =_Numpy 编码器), )

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源