# mypy: 允许未类型化定义
r""
本模块介绍了 CUDA Sanitizer,这是一个用于检测在不同流上运行的内核之间同步错误的工具。
它存储了对张量的访问信息,以确定它们是否同步
或者不。当在 Python 程序中启用且检测到可能的竞态条件时,
详细警告将被打印,程序将退出。
可以通过导入此模块并调用
func:`enable_cuda_sanitizer()` 或者通过导出环境变量 ``TORCH_CUDA_SANITIZER`` 来启用
。
""
导入
枚举
导入 functools
导入
检查
导入
输入/输出
导入
记录日志
导入
正则表达式
导入
系统
导入
文本换行
导入
跟踪回溯
来自 collections.abc
导入
迭代器
来自 dataclasses
导入
数据类,
字段
来自
打字
导入 Any,
可选,
类型变量
导入
火炬
导入 torch.cuda._gpu_trace
是 gpu_trace
来自 torch.utils
导入 _pytree
是 pytree
来自 torch.utils._python_dispatch
导入
Torch 分发模式
DEFAULT_STREAM_ID = 0
TK = 类型变量("TK")
TVa = 类型变量("TVa")
TVb = 类型变量("TVb")
数据指针 =
整型
流标识符 =
整型
事件标识符 =
整型
序列号 =
整型
记录器 =
记录.
获取日志记录器(__name__)
# 注意,这里仅指以张量作为输入的工厂
# 我们关心的那些
工厂函数正则表达式 =
正则表达式.
编译("(new_.*|.*_like)")
类
访问类型(
枚举.
枚举):
读取 =
枚举.
自动()
写作 =
枚举.
自动()
def __str__(self):
返回
从...读取
如果 self
是
访问类型.
读取
否则
"写入到"
@dataclass
类
访问:
r"存储单个张量被内核访问的信息。"
参数:
类型:可以是 AccessType.READ 或 AccessType.Write。
seq_num:执行访问的内核的顺序号。
流 ID:正在执行内核的流的 ID。
操作符:启动内核的模式,其中列出参数和返回类型。
别名:此访问对应的模式中的参数。
别名:此访问对应的模式中的参数。
is_output:张量是否是内核的输出。
堆栈跟踪:访问期间捕获的堆栈摘要对象。
"沉浸式翻译"
类型:
访问类型
序列号:
序列号
流:
流 ID
操作符:
字符串
别名:
列表[str]
是否输出:
布尔值
堆栈跟踪:
跟踪回溯.
栈摘要
类
同步错误(
异常):
"CUDA Sanitizer 检测到的错误的基类。"
类
不同步访问错误(
同步错误):
存储有关对单个数据指针的两个不同步访问的信息。
def __init__(
self,
数据指针:
数据指针,
分配堆栈跟踪:
可选[
跟踪回溯.
栈摘要
]
当前访问:
访问,
前一次访问:
访问,
):
self.数据指针 =
数据指针
self.分配堆栈跟踪 =
分配堆栈跟踪
self.当前访问 =
当前访问
self.前期访问 =
前一次访问
def __str__(self):
def 格式访问(
访问:
访问):
消息.
写(f"{
访问.
操作符}
输入文本翻译为简体中文为:\n{
访问.
类型}")
如果
访问.
别名:
消息.
写(
"参数(们)" +
“,”.
连接(
访问.
别名))
如果
访问.
是否输出:
消息.
写(
“,并且到”)
如果
访问.
是输出:
消息.
写(
“的输出”)
消息.
写(
f"输入文本翻译为简体中文为:\n
带有堆栈跟踪:
输入文本翻译为简体中文为:\n{
请提供需要翻译的文本.
连接(
访问.
堆栈跟踪.
格式())}
输入文本翻译为简体中文为:\n"
)
与
输入/输出.StringIO()
是
消息:
消息.
写(
textwrap.dedent(
f"""\
============================
CSAN 检测到 tensor 数据指针上的可能数据竞争{self.
数据指针}
通过流访问{self.
当前访问.
流}
在内核期间
"沉浸式翻译"
)
)
格式访问(self.
当前访问)
消息.
写(
f"之前的流访问"{self.
前一次访问.
流}
在内核期间:
输入文本翻译为简体中文为:\n"
)
格式访问(self.
前一次访问)
如果 self.
分配堆栈跟踪:
消息.
写(
"张量分配时带有堆栈跟踪:"
输入文本翻译为简体中文为:\n"
f"{请提供需要翻译的文本.
连接(self.
分配堆栈跟踪.
格式())}"
)
否则:
消息.
写(
"未找到张量分配的跟踪信息。")
返回
消息.
获取值()
类
CUDASanitizer 错误(
异常):
"CUDA Sanitizer 报告的错误封装类。"
def __init__(self, 错误:
列表[
同步错误
)]
self.错误 =
错误
def __str__(self):
返回 f
"检测到"{
长度(self.
错误)}
错误
@dataclass
类 TensorInfo:
r"存储有关单个张量及其最近访问的信息。"
参数:
分配堆栈跟踪:在张量捕获的堆栈摘要对象
分配。如果分配未被 CSAN 捕获,则可以是`None`。
读取:自上次以来对张量的读取访问列表
最后写入。
写入:张量的最后写入访问。
"沉浸式翻译"
分配堆栈跟踪:
可选[
跟踪回溯.
堆栈摘要]
读操作:
列表[
访问] =
字段(
默认工厂=
列表)
写:
可选[
访问] =
无
类
_已访问张量:
def __init__(self) -> 无:
self.访问:
字典[
数据指针,
张量信息] = {}
def 确保张量存在(self,
数据指针:
数据指针) ->
无:
如果
数据指针
不是
在 self.
访问:
记录器.
信息(
"找到具有指针的张量:"%s
但是没有匹配的张量 "
"在跟踪中的分配。现在正在回填跟踪。"
"也许在执行一些 torch 操作之后启用了 sanitizer?",
数据指针,
)
self.创建张量(
数据指针,
无)
def 确保张量不存在(self,
数据指针:
数据指针) ->
无:
如果
数据指针
在 self.
访问:
日志记录器.
信息(
"在跟踪中找到重复的 tensor 分配,tensor 为:"
"指针:"%s
. 假设张量释放的跟踪没有被捕获,现在正在回填。
"也许在执行一些 torch 操作之后启用了 sanitizer?"
"也许在执行一些 torch 操作之后启用了 sanitizer?",
数据指针,
)
self.delete_tensor(数据指针)
def 创建张量(
self, 数据指针:
数据指针,
堆栈跟踪:
可选[
跟踪回溯.
堆栈摘要]
) -> 无:
self.访问[
数据指针] = TensorInfo(
堆栈跟踪)
def 删除张量(self,
数据指针:
数据指针) ->
无:
删除 self.
访问[
数据指针]
def 自上次写入以来是否有读取(self,
数据指针:
数据指针) -> bool:
返回
真实
如果 self.
访问[
数据指针].
读取
否则
假
def 获取分配堆栈跟踪(
self, 数据指针:
数据指针
) -> 可选[
跟踪回溯.
栈摘要
]
返回 self.
访问次数[
数据指针].
分配堆栈跟踪
def 获取写入权限(self,
数据指针:
数据指针) ->
可选[
访问
]
返回 self.
访问次数[
数据指针].
写入
def 获取读取(self,
数据指针:
数据指针) ->
列表[
访问
]
返回 self.
访问次数[
数据指针].
阅读
def 添加阅读(self,
数据指针:
数据指针,
访问:
访问) ->
无:
self.访问次数[
数据指针].
读取.append(
访问)
def 设置写入(self,
数据指针:
数据指针,
访问:
访问权限) ->
无:
self.访问[
数据指针].
写 =
访问
self.访问们[
数据指针].
读 =
输入文本为空,请提供需要翻译的文本
类
流同步:
def __init__(self) -> 无:
self.当前同步状态:
字典[
流 ID,
字典[
流 ID,
序列号]] = {}
self.记录的同步状态:
字典[
事件 ID,
字典[
流 ID,
序列号]] = {}
self.主机同步状态:
字典[
流 ID,
序列号] = {}
self.创建流(
默认流 ID)
def 确保流存在(self,
流:
流 ID) ->
无:
如果
流
不是
在 self.
当前同步状态:
记录器.
信息(
找到 ID 为:%s
但是没有匹配的流 "
在跟踪中创建。现在正在回填跟踪。
可能是在一些火炬操作之后启用了清理器?,
流,
)
self.创建流(
流)
def 确保事件存在(self,
事件:
事件 ID) ->
无:
如果
事件
不是
在 self.
记录的同步状态:
记录器.
信息(
"找到事件,ID 为:"%s
,但没有找到匹配的事件
"在跟踪中创建。现在正在回填跟踪。"
"也许在执行一些火炬操作后启用了消毒剂?",
事件,
)
self.创建事件(
事件)
def _确保事件不存在(self,
事件:
事件 ID) ->
无:
如果
事件
在 self.
录制同步状态:
记录器.
信息(
跟踪中发现了重复的事件创建,事件为:
id:%s
. 假设没有捕获到事件删除的跟踪信息 "
"现在正在回填。"
"也许在执行了一些火炬操作之后启用了清理器?",
事件,
)
self.删除事件(
事件)
def 创建流(self,
流:
流 ID) ->
无:
如果
流
在 self.
当前同步状态:
记录器.
信息(
发现跟踪中存在具有 "
"id: "%s
. PyTorch 流只创建一次,所以这个
"跟踪条目将被忽略。",
流,
)
否则:
self.主机同步状态[
流] = 0
self.当前同步状态[
流] = self.
主机同步状态.
复制()
def 创建事件(self,
事件:
事件 ID) ->
无:
self.确保事件不存在(
事件)
self.记录的同步状态[
事件] = {}
def 删除事件(self,
事件:
事件 ID) ->
无:
self.确保事件存在(
事件)
删除 self.
记录的同步状态[
事件]
def 更新序列号(self,
流:
流水号,
序列号:
序列号) ->
无:
self.确保流存在(
流)
self.当前同步状态[
流
]
[流] =
序列号
def 记录状态(self,
事件:
事件 ID,
流:
流 ID) ->
无:
self.确保事件存在(
事件)
self.确保流存在(
流)
self.记录的同步状态[
事件] = self.
当前同步状态[
流].
复制()
def _等待其他状态(
self, 状态:
字典[
流水号,
序列号
]
其他:
字典[
流水号,
序列号]
) -> 无:
for 流,
序列号
在
其他.
项目():
状态[
流] =
最大值(
状态.
获取(
流, -1),
序列号)
def 流等待事件(self,
流:
流 ID,
事件:
事件 ID) ->
无:
self.确保流存在(
流)
self.确保事件存在(
事件)
self.等待其他状态(
self.当前同步状态[
流
] self.
录制同步状态[
事件]
)
def 所有流等待事件(self,
事件:
事件 ID) ->
无:
self.确保事件存在(
事件)
for 流
在 self.
当前同步状态.
键():
self.stream 等待事件(
流,
事件)
self._状态等待其他(
self.主机同步状态, self.
录制同步状态[
事件]
)
def 所有流等待流(self,
流:
流 ID) ->
无:
self.确保流存在(
流)
for 状态
在 self.
当前同步状态.
值():
self.等待其他状态(
状态, self.
当前同步状态[
流])
self.等待其他状态(
self.主同步状态, self.
当前同步状态[
流]
)
def 同步所有流(self) ->
无:
for 流,
状态
在 self.
当前同步状态.
项目():
self.主同步状态[
流] =
状态[
流]
for 状态
在 self.
当前同步状态.
值():
self.等待其他状态(
状态, self.
主同步状态)
def 是在之后排序的(
self, current_stream: 流 ID,
序列号:
序列号,
其他流:
流 ID
) -> bool:
self.确保流存在(current_stream)
self.确保流存在(
其他流)
返回
序列号 <= self.
当前同步状态[current_stream].
获取(
其他流, -1)
类
事件处理器:
分析 CSAN 跟踪以查找同步错误。
存储每个流与其他流同步的信息
通过张量访问来确定给定的内核启动是否可能引起数据竞争
数据竞争。
"沉浸式翻译"
def __init__(self) -> 无:
self.tensors_accessed = _TensorsAccessed()
self.同步 =
流同步()
self.序列号:
序列号 = 0
def _处理内核启动(
self,
流:
流 ID,
只读:
设置[
数据指针
]
读写:
设置[
数据指针
]
输出:
设置[
数据指针
]
操作符: str,
张量别名:
字典[int,
列表[str]],
) -> 列表[
同步错误
]
def 检查冲突(
数据指针:
数据指针,
当前访问:
访问,
上次访问:
可选[
访问]
) -> 无:
如果
前一次访问
是
无:
返回
如果
不是 self.
同步.
在...之后排序(
当前访问.
流,
前一次访问.
序列号,
前一次访问.
流
):
错误列表.append(
不同步访问错误(
数据指针,
self.张量访问.
获取分配堆栈跟踪(
数据指针),
当前访问,
上次访问,
)
)
错误列表:
列表[
同步错误] =
输入文本为空,请提供需要翻译的文本
self.序列号 += 1
self.同步.
更新序列号(
流, self.
序列号)
堆栈跟踪 =
跟踪回溯.
栈摘要.
提取(
跟踪回溯.
遍历栈(
检查.currentframe()),
查找行=
假
)
以这种方式生成的堆栈跟踪是逆序的,因此必须
# reversed
堆栈跟踪.reverse()
for 数据指针
在
只读:
self.访问的张量.
确保张量存在(
数据指针)
当前访问 =
访问(
访问类型.
读取,
self.序列号,
流,
操作符,
张量别名[
数据指针
]
数据指针
在
输出,
堆栈跟踪,
)
检查冲突(
数据指针,
当前访问, self.
访问的张量.
获取写入权限(
数据指针)
)
self.访问的张量.
添加读取(
数据指针,
当前访问)
for 数据指针
在
读写:
self.访问的张量.
确保张量存在(
数据指针)
当前访问 =
访问(
访问类型.
写入,
self.序列号,
流,
操作符,
张量别名[
数据指针
]
数据指针
在
输出,
堆栈跟踪,
)
如果 self.
访问的张量.
自上次写入以来是否有读取(
数据指针):
for 上次访问
在 self.
访问的张量.
获取读取(
数据指针):
检查冲突(
数据指针,
当前访问,
前一次访问)
否则:
检查冲突(
数据指针,
当前访问, self.
访问的张量.
获取写入权限(
数据指针)
)
self.访问的张量.
设置写入(
数据指针,
当前访问)
返回
错误列表
def 处理事件创建(self,
事件:
事件 ID) ->
无:
self.同步.
创建事件(
事件)
def 处理事件删除(self,
事件:
事件 ID) ->
无:
self.同步.
删除事件(
事件)
def 处理事件记录(self,
事件:
事件 ID,
流:
流 ID) ->
无:
self.同步.
记录状态(
事件,
流)
def 处理事件等待(self,
事件:
事件 ID,
流:
流 ID) ->
无:
self.同步.
等待事件流(
流,
事件)
def 处理内存分配(self,
数据指针:
数据指针) ->
无:
self.访问张量.
确保张量不存在(
数据指针)
堆栈跟踪 =
跟踪回溯.
栈摘要.
提取(
跟踪回溯.
遍历栈(
检查.currentframe()),
查找行=
假
)
以这种方式生成的堆栈跟踪是逆序的,因此必须
反转。
堆栈跟踪.reverse()
self.访问的张量.
创建张量(
数据指针,
堆栈跟踪,
)
def 处理内存释放(self,
数据指针:
数据指针) ->
无:
self.访问的张量.
确保张量存在(
数据指针)
self.访问的张量.
删除张量(
数据指针)
def 处理流创建(self,
流:
流 ID) ->
无:
self.同步.
创建流(
流)
def _处理设备同步(self) ->
无:
self.同步.
同步所有流()
def 处理流同步(self,
流:
流 ID) ->
无:
self.同步.
所有流等待流(
流)
def 处理事件同步(self,
事件:
事件 ID) ->
无:
self.同步.
所有流等待事件(
事件)
def 按键分组(a:
字典[TK, TVa
] b:
字典[TK, TVb]) ->
迭代器[
元组[TK, TVa,
电视 b
]]
for arg, 值
在 a.
项目():
如果
参数
在 b:
产生 arg, value, b[arg]
def 压缩参数(
架构:
火炬.
函数模式,
参数:
元组[Any, ...
] kwargs:
字典[str, Any]
) -> 迭代器[
元组[
火炬.
参数, Any
]]
schema_args = 架构.
参数
[
长度(
参数)]
schema_kwargs = {arg.名称:
参数 for
参数
在
架构.
参数[
长度(
参数)
]}
yield from zip(schema_args, 参数)
for _, 论点,
值
在
按键分组(
模式参数, kwargs):
产生 (
论点, value)
类
参数处理器:
def __init__(self) -> 无:
self.数据指针读取:
设置[
数据指针] =
设置()
self.数据指针写入:
设置[
数据指针] =
设置()
self.张量别名:
字典[
数据指针,
列表[str]] = {}
self.输出:
设置[
数据指针] =
设置()
def 处理参数句柄(
self,
value: Any,
是否写入: bool,
仅元数据: bool,
名称:
可选[str] =
无,
输出:
布尔值 =
错误,
) -> 无:
如果 isinstance(value,
火炬.
张量)
和 value.is_cuda:
数据指针 = value.
数据指针()
如果
是否写入:
self.写入数据指针.
添加(
数据指针)
如果...否则
不是
仅元数据:
self.读取数据指针.
添加(
数据指针)
self.张量别名.setdefault(
数据指针,
[]
如果
名称
是
不是
无:
self.张量别名[
数据指针].append(
名称)
如果
是否输出:
self.输出.
添加(
数据指针)
def 解析输入(
self,
架构:
火炬.
函数模式,
参数:
元组[Any, ...
]
kwargs: 字典[str, Any
]
*,
是否为工厂: bool,
) -> 无:
for 参数,
值
在
压缩参数(
架构,
参数, kwargs):
是写入 =
论证.
别名信息
是
不是
无
和
论证.
别名信息.
是写入
仅当它是视图或工厂函数时,更改才是元数据
仅读取元数据
元数据仅 =
是工厂
或者 (
论点.
别名信息
是
不是
无
和
不是
论点.
别名信息.
是写入
)
py 树.
树图_(
functools.偏函数(
self.处理参数,
是否写入=
是否写入,
名称=
参数.
名称,
仅元数据=
仅元数据,
),
value,
)
def 解析输出(
self, 架构:
火炬.
函数模式,
输出: Any, *,
是否为工厂:
布尔值
) -> 无:
for 资源,
值
在 zip(
架构.
返回, (
输出,)):
仅元数据 =
是否为工厂
或者 (
资源.
别名信息
是
不是
无
和
不是
资源.
别名信息.
是否写入
)
py 树.
地图树_(
functools.偏函数(
self.处理参数_,
是否写入=
不是
仅元数据,
是否输出=
是,
仅元数据=
仅元数据,
),
value,
)
类 CUDASanitizerDispatchMode(
火炬调度模式):
def __init__(self) -> 无:
self.事件处理器 =
事件处理器()
火炬._C.
激活 GPU 追踪()
GPU 追踪.
为事件创建注册回调(
self.事件处理器.
处理事件创建
)
GPU 跟踪.
为事件删除注册回调(
self.事件处理器.
处理事件删除
)
gpu 追踪.
为事件记录注册回调(
self.事件处理器.
_处理事件记录
)
gpu 追踪.
注册事件等待回调(
self.事件处理器.
_处理事件等待
)
显卡追踪.
注册内存分配回调(
self.事件处理器.
处理内存分配
)
GPU 追踪.
注册内存释放回调(
self.事件处理器.
处理内存释放
)
gpu 追踪.
为流创建注册回调(
self.事件处理器.
处理流创建
)
显卡追踪.
为设备同步注册回调(
self.事件处理器.
处理设备同步
)
GPU 追踪.
注册流同步的回调函数(
self.事件处理器.
处理流同步
)
GPU 追踪.
注册事件同步的回调函数(
self.事件处理器.
处理事件同步
)
def __torch_dispatch__(self, 函数,
类型,
参数=(), kwargs=
无):
如果 kwargs
是
无:
kwargs = {}
是否为工厂 = bool(
工厂函数正则表达式.
匹配(
函数.
_模式.
名称))
参数处理器 =
论证处理器()
argument_handler.解析输入(
函数.
_模式,
参数, kwargs,
是否为工厂=
是工厂)
输出 =
函数(*
参数, **kwargs)
参数处理器.
解析输出(
函数.
_模式,
输出,
是工厂=
工厂)
错误 = self.
事件处理器.
_处理内核启动(
火炬.cuda.current_stream().
CUDA 流,
程序参数处理.
数据指针读取 -
程序参数处理.
数据指针写入,
程序处理函数.
已写入的数据指针,
程序处理函数.
输出,
函数.
_模式,
程序处理函数.
张量别名,
)
如果
错误:
for 错误
在
错误:
打印(
错误,
文件=
系统模块.
标准错误输出)
抛出
CUDA 安全检查器错误(
错误)
返回
输出
类
CUDA Sanitizer:
管理 CUDASanitizer 调度模式对象的生存期。
CUDASanitizer 类封装了调度模式的进入/退出函数
在启用函数/析构函数中分别使用上下文管理器。这是为了
显式设置调度模式对象的生存期与应用程序相同。
这种方法被认为比使用 atexit 模块更优雅。
"沉浸式翻译"
def __init__(self) -> 无:
self.调度 =
CUDASanitizer 调度模式()
self.启用 =
假
def 启用(self):
self.调度.
__进入__()
self.启用 =
真实
def 禁用(self):
self.派遣.
__退出__(
无,
无,
无)
self.启用 =
假
def __del__(self):
由于此对象的生命周期与 `torch.cuda._sanitizer` Python 模块相关联,
它通常作为整体 `torch` 模块清理的一部分而被删除,
在那时,根据 CPython 版本的不同,torch.* 模块可能处于
# 已经清理好的不同状态。
# 可能其他导入已经清理过,所以 `sys` 也可能已经消失了。
# 如果它超过了运行时间,则跳过退出模式。
# 如果它超过了运行时间,则跳过退出模式。
如果 (
系统
是
不是
无)
和 (
不是
系统模块.
正在完成())
和 self.
启用:
self.禁用()
[文档]def 启用_cuda_sanitizer():
"""启用 CUDA Sanitizer。
清理器将开始分析由 torch 函数调用的底层 CUDA 调用
用于同步错误。所有找到的数据竞争将被打印到标准输出,并附带疑似原因的堆栈跟踪。为了获得最佳结果,应在程序开始时启用该清理器。
错误输出中。为了获得最佳结果,应在程序开始时启用该清理器。
清理器应该被启用在程序开始之初。
"""
cuda_sanitizer.enable()
cuda_sanitizer = CUDASanitizer()