快捷键

源代码 for torch.cuda._sanitizer

# mypy: 允许未类型化定义
r""
本模块介绍了 CUDA Sanitizer,这是一个用于检测在不同流上运行的内核之间同步错误的工具。

它存储了对张量的访问信息,以确定它们是否同步
或者不。当在 Python 程序中启用且检测到可能的竞态条件时,
详细警告将被打印,程序将退出。

可以通过导入此模块并调用
func:`enable_cuda_sanitizer()` 或者通过导出环境变量 ``TORCH_CUDA_SANITIZER`` 来启用

""

导入 枚举
导入 functools
导入 检查
导入 输入/输出
导入 记录日志
导入 正则表达式
导入 系统
导入 文本换行
导入 跟踪回溯
来自 collections.abc 导入 迭代器
来自 dataclasses 导入 数据类, 字段
来自 打字 导入 Any, 可选, 类型变量

导入 火炬
导入 torch.cuda._gpu_trace  gpu_trace
来自 torch.utils 导入 _pytree  pytree
来自 torch.utils._python_dispatch 导入 Torch 分发模式


DEFAULT_STREAM_ID = 0

TK = 类型变量("TK")
TVa = 类型变量("TVa")
TVb = 类型变量("TVb")

数据指针 = 整型
流标识符 = 整型
事件标识符 = 整型
序列号 = 整型

记录器 = 记录.获取日志记录器(__name__)

# 注意,这里仅指以张量作为输入的工厂
# 我们关心的那些
工厂函数正则表达式 = 正则表达式.编译("(new_.*|.*_like)")


 访问类型(枚举.枚举):
    读取 = 枚举.自动()
    写作 = 枚举.自动()

    def __str__(self):
        返回 从...读取 如果 self  访问类型.读取 否则 "写入到"


@dataclass
 访问:
    r"存储单个张量被内核访问的信息。"

参数:
类型:可以是 AccessType.READ 或 AccessType.Write。
seq_num:执行访问的内核的顺序号。
流 ID:正在执行内核的流的 ID。
操作符:启动内核的模式,其中列出参数和返回类型。
别名:此访问对应的模式中的参数。
别名:此访问对应的模式中的参数。
is_output:张量是否是内核的输出。
堆栈跟踪:访问期间捕获的堆栈摘要对象。
"沉浸式翻译"

    类型: 访问类型
    序列号: 序列号
    : 流 ID
    操作符: 字符串
    别名: 列表[str]
    是否输出: 布尔值
    堆栈跟踪: 跟踪回溯.栈摘要


 同步错误(异常):
    "CUDA Sanitizer 检测到的错误的基类。"


 不同步访问错误(同步错误):
    存储有关对单个数据指针的两个不同步访问的信息。

    def __init__(
        self,
        数据指针: 数据指针,
        分配堆栈跟踪: 可选[跟踪回溯.栈摘要]
        当前访问: 访问,
        前一次访问: 访问,
    ):
        self.数据指针 = 数据指针
        self.分配堆栈跟踪 = 分配堆栈跟踪
        self.当前访问 = 当前访问
        self.前期访问 = 前一次访问

    def __str__(self):
        def 格式访问(访问: 访问):
            消息.(f"{访问.操作符}输入文本翻译为简体中文为:\n{访问.类型}")
            如果 访问.别名:
                消息.("参数(们)" + “,”.连接(访问.别名))
                如果 访问.是否输出:
                    消息.(“,并且到”)
            如果 访问.是输出:
                消息.(“的输出”)
            消息.(
                f"输入文本翻译为简体中文为:\n带有堆栈跟踪:输入文本翻译为简体中文为:\n{请提供需要翻译的文本.连接(访问.堆栈跟踪.格式())}输入文本翻译为简体中文为:\n"
            )

         输入/输出.StringIO()  消息:
            消息.(
                textwrap.dedent(
                    f"""\
                    ============================
CSAN 检测到 tensor 数据指针上的可能数据竞争{self.数据指针}
通过流访问{self.当前访问.}在内核期间
"沉浸式翻译"
                )
            )
            格式访问(self.当前访问)

            消息.(
                f"之前的流访问"{self.前一次访问.}在内核期间:输入文本翻译为简体中文为:\n"
            )
            格式访问(self.前一次访问)

            如果 self.分配堆栈跟踪:
                消息.(
                    "张量分配时带有堆栈跟踪:"输入文本翻译为简体中文为:\n"
                    f"{请提供需要翻译的文本.连接(self.分配堆栈跟踪.格式())}"
                )
            否则:
                消息.("未找到张量分配的跟踪信息。")
            返回 消息.获取值()


 CUDASanitizer 错误(异常):
    "CUDA Sanitizer 报告的错误封装类。"

    def __init__(self, 错误: 列表[同步错误)]
        self.错误 = 错误

    def __str__(self):
        返回 f"检测到"{长度(self.错误)}错误


@dataclass
 TensorInfo:
    r"存储有关单个张量及其最近访问的信息。"

参数:
分配堆栈跟踪:在张量捕获的堆栈摘要对象
分配。如果分配未被 CSAN 捕获,则可以是`None`。
读取:自上次以来对张量的读取访问列表
最后写入。
写入:张量的最后写入访问。
"沉浸式翻译"

    分配堆栈跟踪: 可选[跟踪回溯.堆栈摘要]
    读操作: 列表[访问] = 字段(默认工厂=列表)
    : 可选[访问] = 


 _已访问张量:
    def __init__(self) -> :
        self.访问: 字典[数据指针, 张量信息] = {}

    def 确保张量存在(self, 数据指针: 数据指针) -> :
        如果 数据指针 不是  self.访问:
            记录器.信息(
                "找到具有指针的张量:"%s但是没有匹配的张量 "
                "在跟踪中的分配。现在正在回填跟踪。"
                "也许在执行一些 torch 操作之后启用了 sanitizer?",
                数据指针,
            )
            self.创建张量(数据指针, )

    def 确保张量不存在(self, 数据指针: 数据指针) -> :
        如果 数据指针  self.访问:
            日志记录器.信息(
                "在跟踪中找到重复的 tensor 分配,tensor 为:"
                "指针:"%s. 假设张量释放的跟踪没有被捕获,现在正在回填。
                "也许在执行一些 torch 操作之后启用了 sanitizer?"
                "也许在执行一些 torch 操作之后启用了 sanitizer?",
                数据指针,
            )
            self.delete_tensor(数据指针)

    def 创建张量(
        self, 数据指针: 数据指针, 堆栈跟踪: 可选[跟踪回溯.堆栈摘要]
    ) -> :
        self.访问[数据指针] = TensorInfo(堆栈跟踪)

    def 删除张量(self, 数据指针: 数据指针) -> :
        删除 self.访问[数据指针]

    def 自上次写入以来是否有读取(self, 数据指针: 数据指针) -> bool:
        返回 真实 如果 self.访问[数据指针].读取 否则 

    def 获取分配堆栈跟踪(
        self, 数据指针: 数据指针
    ) -> 可选[跟踪回溯.栈摘要]
        返回 self.访问次数[数据指针].分配堆栈跟踪

    def 获取写入权限(self, 数据指针: 数据指针) -> 可选[访问]
        返回 self.访问次数[数据指针].写入

    def 获取读取(self, 数据指针: 数据指针) -> 列表[访问]
        返回 self.访问次数[数据指针].阅读

    def 添加阅读(self, 数据指针: 数据指针, 访问: 访问) -> :
        self.访问次数[数据指针].读取.append(访问)

    def 设置写入(self, 数据指针: 数据指针, 访问: 访问权限) -> :
        self.访问[数据指针]. = 访问
        self.访问们[数据指针]. = 输入文本为空,请提供需要翻译的文本


 流同步:
    def __init__(self) -> :
        self.当前同步状态: 字典[流 ID, 字典[流 ID, 序列号]] = {}
        self.记录的同步状态: 字典[事件 ID, 字典[流 ID, 序列号]] = {}
        self.主机同步状态: 字典[流 ID, 序列号] = {}
        self.创建流(默认流 ID)

    def 确保流存在(self, : 流 ID) -> :
        如果  不是  self.当前同步状态:
            记录器.信息(
                找到 ID 为:%s但是没有匹配的流 "
                在跟踪中创建。现在正在回填跟踪。
                可能是在一些火炬操作之后启用了清理器?,
                ,
            )
            self.创建流()

    def 确保事件存在(self, 事件: 事件 ID) -> :
        如果 事件 不是  self.记录的同步状态:
            记录器.信息(
                "找到事件,ID 为:"%s,但没有找到匹配的事件 
                "在跟踪中创建。现在正在回填跟踪。"
                "也许在执行一些火炬操作后启用了消毒剂?",
                事件,
            )
            self.创建事件(事件)

    def _确保事件不存在(self, 事件: 事件 ID) -> :
        如果 事件  self.录制同步状态:
            记录器.信息(
                跟踪中发现了重复的事件创建,事件为:
                id:%s. 假设没有捕获到事件删除的跟踪信息 "
                "现在正在回填。"
                "也许在执行了一些火炬操作之后启用了清理器?",
                事件,
            )
            self.删除事件(事件)

    def 创建流(self, : 流 ID) -> :
        如果   self.当前同步状态:
            记录器.信息(
                发现跟踪中存在具有 "
                "id: "%s. PyTorch 流只创建一次,所以这个 
                "跟踪条目将被忽略。",
                ,
            )
        否则:
            self.主机同步状态[] = 0
            self.当前同步状态[] = self.主机同步状态.复制()

    def 创建事件(self, 事件: 事件 ID) -> :
        self.确保事件不存在(事件)
        self.记录的同步状态[事件] = {}

    def 删除事件(self, 事件: 事件 ID) -> :
        self.确保事件存在(事件)
        删除 self.记录的同步状态[事件]

    def 更新序列号(self, : 流水号, 序列号: 序列号) -> :
        self.确保流存在()
        self.当前同步状态[]
[] = 序列号

    def 记录状态(self, 事件: 事件 ID, : 流 ID) -> :
        self.确保事件存在(事件)
        self.确保流存在()
        self.记录的同步状态[事件] = self.当前同步状态[].复制()

    def _等待其他状态(
        self, 状态: 字典[流水号, 序列号] 其他: 字典[流水号, 序列号]
    ) -> :
        for , 序列号  其他.项目():
            状态[] = 最大值(状态.获取(, -1), 序列号)

    def 流等待事件(self, : 流 ID, 事件: 事件 ID) -> :
        self.确保流存在()
        self.确保事件存在(事件)
        self.等待其他状态(
            self.当前同步状态[] self.录制同步状态[事件]
        )

    def 所有流等待事件(self, 事件: 事件 ID) -> :
        self.确保事件存在(事件)
        for   self.当前同步状态.():
            self.stream 等待事件(, 事件)

        self._状态等待其他(
            self.主机同步状态, self.录制同步状态[事件]
        )

    def 所有流等待流(self, : 流 ID) -> :
        self.确保流存在()
        for 状态  self.当前同步状态.():
            self.等待其他状态(状态, self.当前同步状态[])

        self.等待其他状态(
            self.主同步状态, self.当前同步状态[]
        )

    def 同步所有流(self) -> :
        for , 状态  self.当前同步状态.项目():
            self.主同步状态[] = 状态[]

        for 状态  self.当前同步状态.():
            self.等待其他状态(状态, self.主同步状态)

    def 是在之后排序的(
        self, current_stream: 流 ID, 序列号: 序列号, 其他流: 流 ID
    ) -> bool:
        self.确保流存在(current_stream)
        self.确保流存在(其他流)
        返回 序列号 <= self.当前同步状态[current_stream].获取(其他流, -1)


 事件处理器:
    分析 CSAN 跟踪以查找同步错误。

存储每个流与其他流同步的信息
通过张量访问来确定给定的内核启动是否可能引起数据竞争
数据竞争。
"沉浸式翻译"

    def __init__(self) -> :
        self.tensors_accessed = _TensorsAccessed()
        self.同步 = 流同步()
        self.序列号: 序列号 = 0

    def _处理内核启动(
        self,
        : 流 ID,
        只读: 设置[数据指针]
        读写: 设置[数据指针]
        输出: 设置[数据指针]
        操作符: str,
        张量别名: 字典[int, 列表[str]],
    ) -> 列表[同步错误]
        def 检查冲突(
            数据指针: 数据指针, 当前访问: 访问, 上次访问: 可选[访问]
        ) -> :
            如果 前一次访问  :
                返回
            如果 不是 self.同步.在...之后排序(
                当前访问., 前一次访问.序列号, 前一次访问.
            ):
                错误列表.append(
                    不同步访问错误(
                        数据指针,
                        self.张量访问.获取分配堆栈跟踪(数据指针),
                        当前访问,
                        上次访问,
                    )
                )

        错误列表: 列表[同步错误] = 输入文本为空,请提供需要翻译的文本
        self.序列号 += 1
        self.同步.更新序列号(, self.序列号)
        堆栈跟踪 = 跟踪回溯.栈摘要.提取(
            跟踪回溯.遍历栈(检查.currentframe()), 查找行=
        )
        以这种方式生成的堆栈跟踪是逆序的,因此必须
        # reversed
        堆栈跟踪.reverse()

        for 数据指针  只读:
            self.访问的张量.确保张量存在(数据指针)
            当前访问 = 访问(
                访问类型.读取,
                self.序列号,
                ,
                操作符,
                张量别名[数据指针]
                数据指针  输出,
                堆栈跟踪,
            )
            检查冲突(
                数据指针, 当前访问, self.访问的张量.获取写入权限(数据指针)
            )
            self.访问的张量.添加读取(数据指针, 当前访问)

        for 数据指针  读写:
            self.访问的张量.确保张量存在(数据指针)
            当前访问 = 访问(
                访问类型.写入,
                self.序列号,
                ,
                操作符,
                张量别名[数据指针]
                数据指针  输出,
                堆栈跟踪,
            )
            如果 self.访问的张量.自上次写入以来是否有读取(数据指针):
                for 上次访问  self.访问的张量.获取读取(数据指针):
                    检查冲突(数据指针, 当前访问, 前一次访问)
            否则:
                检查冲突(
                    数据指针, 当前访问, self.访问的张量.获取写入权限(数据指针)
                )
            self.访问的张量.设置写入(数据指针, 当前访问)

        返回 错误列表

    def 处理事件创建(self, 事件: 事件 ID) -> :
        self.同步.创建事件(事件)

    def 处理事件删除(self, 事件: 事件 ID) -> :
        self.同步.删除事件(事件)

    def 处理事件记录(self, 事件: 事件 ID, : 流 ID) -> :
        self.同步.记录状态(事件, )

    def 处理事件等待(self, 事件: 事件 ID, : 流 ID) -> :
        self.同步.等待事件流(, 事件)

    def 处理内存分配(self, 数据指针: 数据指针) -> :
        self.访问张量.确保张量不存在(数据指针)
        堆栈跟踪 = 跟踪回溯.栈摘要.提取(
            跟踪回溯.遍历栈(检查.currentframe()), 查找行=
        )
        以这种方式生成的堆栈跟踪是逆序的,因此必须
        反转。
        堆栈跟踪.reverse()
        self.访问的张量.创建张量(
            数据指针,
            堆栈跟踪,
        )

    def 处理内存释放(self, 数据指针: 数据指针) -> :
        self.访问的张量.确保张量存在(数据指针)
        self.访问的张量.删除张量(数据指针)

    def 处理流创建(self, : 流 ID) -> :
        self.同步.创建流()

    def _处理设备同步(self) -> :
        self.同步.同步所有流()

    def 处理流同步(self, : 流 ID) -> :
        self.同步.所有流等待流()

    def 处理事件同步(self, 事件: 事件 ID) -> :
        self.同步.所有流等待事件(事件)


def 按键分组(a: 字典[TK, TVa] b: 字典[TK, TVb]) -> 迭代器[元组[TK, TVa, 电视 b]]
    for arg,   a.项目():
        如果 参数  b:
            产生 arg, value, b[arg]


def 压缩参数(
    架构: 火炬.函数模式, 参数: 元组[Any, ...] kwargs: 字典[str, Any]
) -> 迭代器[元组[火炬.参数, Any]]
    schema_args = 架构.参数[ 长度(参数)]
    schema_kwargs = {arg.名称: 参数 for 参数  架构.参数[长度(参数) ]}

    yield from zip(schema_args, 参数)

    for _, 论点,   按键分组(模式参数, kwargs):
        产生 (论点, value)


 参数处理器:
    def __init__(self) -> :
        self.数据指针读取: 设置[数据指针] = 设置()
        self.数据指针写入: 设置[数据指针] = 设置()
        self.张量别名: 字典[数据指针, 列表[str]] = {}
        self.输出: 设置[数据指针] = 设置()

    def 处理参数句柄(
        self,
        value: Any,
        是否写入: bool,
        仅元数据: bool,
        名称: 可选[str] = ,
        输出: 布尔值 = 错误,
    ) -> :
        如果 isinstance(value, 火炬.张量)  value.is_cuda:
            数据指针 = value.数据指针()
            如果 是否写入:
                self.写入数据指针.添加(数据指针)
            如果...否则 不是 仅元数据:
                self.读取数据指针.添加(数据指针)

            self.张量别名.setdefault(数据指针, []
            如果 名称  不是 :
                self.张量别名[数据指针].append(名称)
            如果 是否输出:
                self.输出.添加(数据指针)

    def 解析输入(
        self,
        架构: 火炬.函数模式,
        参数: 元组[Any, ...]
        kwargs: 字典[str, Any]
        *,
        是否为工厂: bool,
    ) -> :
        for 参数,   压缩参数(架构, 参数, kwargs):
            是写入 = 论证.别名信息  不是   论证.别名信息.是写入
            仅当它是视图或工厂函数时,更改才是元数据
            仅读取元数据
            元数据仅 = 是工厂 或者 (
                论点.别名信息  不是   不是 论点.别名信息.是写入
            )
            py 树.树图_(
                functools.偏函数(
                    self.处理参数,
                    是否写入=是否写入,
                    名称=参数.名称,
                    仅元数据=仅元数据,
                ),
                value,
            )

    def 解析输出(
        self, 架构: 火炬.函数模式, 输出: Any, *, 是否为工厂: 布尔值
    ) -> :
        for 资源,   zip(架构.返回, (输出,)):
            仅元数据 = 是否为工厂 或者 (
                资源.别名信息  不是   不是 资源.别名信息.是否写入
            )
            py 树.地图树_(
                functools.偏函数(
                    self.处理参数_,
                    是否写入=不是 仅元数据,
                    是否输出=,
                    仅元数据=仅元数据,
                ),
                value,
            )


 CUDASanitizerDispatchMode(火炬调度模式):
    def __init__(self) -> :
        self.事件处理器 = 事件处理器()
        火炬._C.激活 GPU 追踪()
        GPU 追踪.为事件创建注册回调(
            self.事件处理器.处理事件创建
        )
        GPU 跟踪.为事件删除注册回调(
            self.事件处理器.处理事件删除
        )
        gpu 追踪.为事件记录注册回调(
            self.事件处理器._处理事件记录
        )
        gpu 追踪.注册事件等待回调(
            self.事件处理器._处理事件等待
        )
        显卡追踪.注册内存分配回调(
            self.事件处理器.处理内存分配
        )
        GPU 追踪.注册内存释放回调(
            self.事件处理器.处理内存释放
        )
        gpu 追踪.为流创建注册回调(
            self.事件处理器.处理流创建
        )
        显卡追踪.为设备同步注册回调(
            self.事件处理器.处理设备同步
        )
        GPU 追踪.注册流同步的回调函数(
            self.事件处理器.处理流同步
        )
        GPU 追踪.注册事件同步的回调函数(
            self.事件处理器.处理事件同步
        )

    def __torch_dispatch__(self, 函数, 类型, 参数=(), kwargs=):
        如果 kwargs  :
            kwargs = {}

        是否为工厂 = bool(工厂函数正则表达式.匹配(函数._模式.名称))

        参数处理器 = 论证处理器()
        argument_handler.解析输入(函数._模式, 参数, kwargs, 是否为工厂=是工厂)

        输出 = 函数(*参数, **kwargs)

        参数处理器.解析输出(函数._模式, 输出, 是工厂=工厂)
        错误 = self.事件处理器._处理内核启动(
            火炬.cuda.current_stream().CUDA 流,
            程序参数处理.数据指针读取 - 程序参数处理.数据指针写入,
            程序处理函数.已写入的数据指针,
            程序处理函数.输出,
            函数._模式,
            程序处理函数.张量别名,
        )
        如果 错误:
            for 错误  错误:
                打印(错误, 文件=系统模块.标准错误输出)
            抛出 CUDA 安全检查器错误(错误)

        返回 输出


 CUDA Sanitizer:
    管理 CUDASanitizer 调度模式对象的生存期。

CUDASanitizer 类封装了调度模式的进入/退出函数
在启用函数/析构函数中分别使用上下文管理器。这是为了
显式设置调度模式对象的生存期与应用程序相同。
这种方法被认为比使用 atexit 模块更优雅。
"沉浸式翻译"

    def __init__(self) -> :
        self.调度 = CUDASanitizer 调度模式()
        self.启用 = 

    def 启用(self):
        self.调度.__进入__()
        self.启用 = 真实

    def 禁用(self):
        self.派遣.__退出__(, , )
        self.启用 = 

    def __del__(self):
        由于此对象的生命周期与 `torch.cuda._sanitizer` Python 模块相关联,
        它通常作为整体 `torch` 模块清理的一部分而被删除,
        在那时,根据 CPython 版本的不同,torch.* 模块可能处于
        # 已经清理好的不同状态。
        # 可能其他导入已经清理过,所以 `sys` 也可能已经消失了。
        # 如果它超过了运行时间,则跳过退出模式。
        # 如果它超过了运行时间,则跳过退出模式。
        如果 (系统  不是 )  (不是 系统模块.正在完成())  self.启用:
            self.禁用()


[文档]def 启用_cuda_sanitizer(): """启用 CUDA Sanitizer。 清理器将开始分析由 torch 函数调用的底层 CUDA 调用 用于同步错误。所有找到的数据竞争将被打印到标准输出,并附带疑似原因的堆栈跟踪。为了获得最佳结果,应在程序开始时启用该清理器。 错误输出中。为了获得最佳结果,应在程序开始时启用该清理器。 清理器应该被启用在程序开始之初。 """ cuda_sanitizer.enable()
cuda_sanitizer = CUDASanitizer()

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源