# mypy: 允许未类型化装饰器
# mypy: 允许未类型化定义
导入 contextlib
导入
复制
导入 dataclasses
导入 functools
导入
操作符
导入
类型
导入
警告
from 集合
导入
命名元组
from collections.abc 导入
迭代器
from contextlib 导入 contextmanager
from 打字
导入
任何,
可调用,
最终,
可选,
类型检查,
联合
from torch._higher_order_ops.utils 导入 autograd_not_implemented
from torch._library.fake_class_registry 导入
模拟脚本对象
from torch._子类._模拟实现
导入 (
_注销操作实现,
_是否将操作注册到模拟规则,
注册操作实现,
)
from torch._subclasses.fake_tensor 导入
模拟 Tensor 模式
from torch.fx._utils 导入
首次调用函数神经网络模块堆栈
from torch.fx 图
导入
_PyTree 代码生成器,
_PyTree 信息
from torch.fx 不可变集合
导入
不可变字典,
不可变列表
from torch.fx.passes.runtime_assert 导入
插入延迟运行时断言
如果
类型检查:
# 在类型检查期间导入以下模块以启用代码智能功能,
# 例如在 pylance 等工具中的自动补全,即使这些模块没有明确地
# imported in user code.
导入 sympy
from torch.utils._sympy.value_ranges 导入 ValueRanges
导入
火炬
导入 torch.utils._pytree
是 pytree
from torch._export.utils 导入 (
_collect_all_valid_cia_ops,
_收集并设置常量属性,
收集参数缓冲区元数据,
从 gm 检测伪造模式,
_模拟化参数缓冲区,
_获取 CIA 的分解,
_is_preservable_cia_op,
命名 hoo 子图占位符,
为临时注册的常量覆盖图签名,
_overwrite_signature_for_non_persistent_buffers,
_populate_param_buffer_metadata_to_new_gm,
_register_constants_as_buffers,
_rename_without_collisions,
保留 CIA 的特殊操作,
占位符命名通过,
)
from torch._export.验证器
导入
验证器
from torch._guards 导入
检测伪造模式
from torch._subclasses.fake_tensor 导入
临时取消 fake 设置
from torch.export._tree_utils 导入
是否等效,
重新排序参数
from torch.export.decomp_utils 导入 CustomDecompTable
from torch.fx._兼容性
导入
兼容性
from torch.fx.passes.infra.pass_base 导入 PassResult
from torch.fx.passes.infra.pass_manager 导入 PassManager
from .图签名
导入 ( # noqa: F401
参数规范,
常量参数,
自定义对象参数,
导出图签名,
输入类型,
输入规范,
输出类型,
输出规范,
符号布尔参数,
符号浮点参数,
符号整型参数,
张量参数,
令牌参数,
)
全部 = [
"已导出程序",
模块调用条目,
模块调用签名,
默认分解,
]
通行类型 =
可调用[[
火炬.fx.GraphModule
]
可选[PassResult]]
[文档]@dataclasses.dataclass
class 模块调用签名:
输入:列表[参数规范]
输出:列表[参数规范]
输入规范:pytree.TreeSpec
输出规范:pytree.TreeSpec
forward_arg_names: Optional[list[str]] = None
def replace_all_uses_with(self, original_node, new_node):
for i in self.inputs:
if i.name == original_node.name:
i.name = new_node.name
for o in self.outputs:
if o.name == original_node.name:
o.name = new_node.name
[文档]@dataclasses.dataclass
class 模块调用条目:
完整限定名: str
签名: Optional[模块调用签名] = None
def 禁用预置假模式(
函数):
@functools.包装(
函数)
def 包装器(*
参数, **kwargs):
与
临时取消 fake 设置():
返回
函数(*
参数, **kwargs)
返回
包装器
def fx 集合等价函数(
spec1 类型:
可选[
类型
]
spec1 上下文:
py 树.
上下文,
spec2 类型:
可选[
类型
]
spec2 上下文:
py 树.
上下文,
) 翻译
布尔:
将容器及其不可变变体视为同一类型。否则
正常比较。
""
如果
spec1 类型 is
无
或者
spec2 类型 is
无:
返回
spec1 类型 is
spec2 类型
和
spec1 上下文 ==
spec2 上下文
如果
派生类(
spec1 类型, (
字典,
不可变字典))
和
派生类(
spec2 类型, (
字典,
不可变字典)
):
返回
spec1 上下文 ==
spec2 上下文
如果
派生类(
spec1 类型, (
列表,
不可变列表))
和
派生类(
spec2 类型, (
列表,
不可变列表)
):
返回
spec1 上下文 ==
spec2 上下文
返回
spec1 类型 is
spec2 类型
和
spec1 上下文 ==
spec2 上下文
此列表由 DispatchKey.cpp 编译而成。
策略是使用这些键来覆盖。
导出中的 CIA 反编译。
_AUTOGRAD_ALIAS_BACKEND_KEYS_TO_OVERRIDE = [
火炬._C.
发送键.
自动微分 CPU,
火炬._C.
发送键.
自动微分 CUDA,
火炬._C.
发送键.
自动微分元,
火炬._C.
发送键.
自动微分 XLA,
火炬._C.
发送键.
自动微分懒加载,
火炬._C.
发送键.
自动微分 IPU,
火炬._C.
发送键.
自动微分 XPU,
火炬._C.
发送键.
自动微分 MPS,
火炬._C.
发送键.
自动微分 HPU,
火炬._C.
发送键.
自动微分私有用途 1,
火炬._C.
发送键.
自动微分私有用途 2,
火炬._C.
发送键.
自动微分私有用途 3,
]
此列表由 DispatchKey.cpp 编译而成。
策略是使用这些键来添加。
直接使用默认的 python 内核。
CIA 反编译。
# 注册旧 CIA 到后端内核
_覆盖后端密钥 = [
火炬._C.
发送键.CPU,
火炬._C.
发送键.CUDA,
火炬._C.
发送键.
元数据,
火炬._C.
发送键.XLA,
火炬._C.
发送键.
懒惰,
火炬._C.
发送键.IPU,
火炬._C.
发送键.XPU,
火炬._C.
发送键.MPS,
火炬._C.
发送键.HPU,
]
@contextmanager
def _覆盖复合隐式分解_(
将 CIA 操作转换为可调用,
安全=
是):
# 此函数覆盖了 CompositeImplicitAutograd 的解构
# 用户指定的函数复合操作。理想情况下,我们希望不进行解构
所有复合操作,但今天的 C++函数化仅依赖于
# 运行反编译后与 opset 协同工作的事实。
因此我们只能对功能操作这样做。有一个注意事项是
某些复合操作对其模式撒谎(声称是)
# 功能性但并不真正,即所谓的 dropout),对于这些情况,我们只是分解。
# 当 safe=False 时,我们将假设 ops_to_preserve 可以是可变的/别名引用
# 以及它们的常规分解需要被阴影而不是覆盖。
# 因此,我们将避免断言它们是有效的以保留,并且不会
将他们的 CompositeImplicitAutograd 内核替换为 NotImplemented。
目前唯一使用此模式的用户是 aten::to 的变体,我们将
在 FunctionalTensorMode.__torch_dispatch__ 中替换为 aten::_to_copy。
保存的表格 = {}
修补操作 =
设置()
为
运算符重载,
可分解的函数
在
将 CIA 操作转换为可调用.
项目():
保存的表格[
运算符重载] =
运算符重载.
Python 内核.
复制()
修补操作.
添加(
运算符重载)
为
覆盖分发键
在
_AUTOGRAD 别名后端键覆盖:
如果
覆盖分发键
不是
在
运算符重载.
Python 内核:
# TODO (tmanlaibaatar)https://github.com/pytorch/pytorch/issues/129430
运算符重载.
Python 实现(
覆盖分发键)(
自动微分未实现(
运算符重载,
延迟错误=
是)
)
# 注意:注册旧 CIA 到后端内核
# 在覆盖 py_kernels 之前缓存此内容很重要。
原 CIA 可调用 =
_获取 CIA 的分解(
运算符重载)
如果
火炬._C.
发送键.
组合隐式自动微分
在
运算符重载.
Python 内核:
删除
运算符重载.
Python 内核[
火炬._C.
发送键.
组合隐式自动微分]
如果
安全:
运算符重载.
Python 实现(
火炬._C.
发送键.
组合隐式自动微分)(
可分解的函数
)
# [注意] 直接将伪造的张量规则注册到 CIA 操作
# 我们在这里面临的问题是,如果你的 CIA 自定义规则
# 表示我们想要保留操作,我们将返回 NotImplemented。
很抱歉,您提供的文本中包含代码和专有名词,根据您的要求,这些内容将保持原文不变。以下是翻译结果:
# 不幸的是,这将触发在假张量中的元设备跟踪
导致 CIA 内核具有基于设备的分歧行为
# 分支(其中一个案例是 torch.ops.aten.scaled_dot_product.attention)
绕过这个问题,我们注册了直接的伪造实现,以便我们
在我们尝试在 FakeTensorMode 中分解操作之前,先运行内核。
注意,在大多数情况下,这是一个空操作,因为:
1) 在后分发跟踪中,CIA 已经分解了。
2) 大多数 CIA 实现与设备无关。
def 强制调度到原始 CIA 可调用函数(
模拟张量模式,
操作, *
参数, **kwargs):
原 CIA 可调用 = kwargs[
"原始可调用"]
删除 kwargs[
"原始可调用"]
与
模拟张量模式:
返回
原 CIA 可调用(*
参数, **kwargs)
如果
不是
_是否将操作注册到模拟规则(
运算符重载):
注册操作实现(
运算符重载)(
functools.偏函数(
_强制调度到原 CIA 可调用,
原始可调用=orig_cia_callable,
)
)
为 key
在 _BACKEND_KEYS_TO_OVERRIDE:
如果 key
不是
在
运算符重载.
Python 内核:
# [注意] 注册旧 CIA 到后端内核
# 我们始终将原始 CIA 行为注册到后端键内核
# 原因在于我们在进行伪造张量操作或执行真实内核时,
# 我们最终会在相应的后端调用一个算子,在 Python 调度器中,
# 这将解析为 CIA 密钥。(见 torch/_ops.py 中的 resolve_key)
# 因此,现在的 CIA 将调用自定义用户定义的
#CIA 可能导致问题。
#为了更具体,我们正在处理的案例是:
#(1)在追踪过程中,我们对一个张量常量进行了常量传播
# 在追踪期间
(2) 我们在 autograd 之下调用一个操作(无论是我们处于 autograd 之下,
或者我们在推理模式下进行跟踪,因此后端中的一个键被触发
(3)我们正在调用的操作有一个通常以急切模式运行的 CIA 实现
# (用户希望在跟踪期间调整此 CIA 实现,但在跟踪期间
我们希望原始 CIA 能够运行
运算符重载.
Python 实现(
键)(orig_cia_callable)
尝试:
产生
最后:
为
操作符
在
修补操作:
操作.
Python 内核.
清晰()
操作.
Python 内核.
更新(
保存的表格[
操作])
操作._dispatch_cache.
清晰()
_注销操作实现(
操作)
@contextmanager
def _覆盖_aten_to_variants 的解压缩():
# 保留_aten::to 的变体,理解它们是会修改/别名
# 并且它们的 CompositeImplicitAutograd 内核不会变成 NotImplemented。
# 我们将在函数化时用 aten._to_copy 替换它们。
与
_覆盖复合隐式分解_(
{
火炬.
操作.aten.
到.
数据类型布局:
保留 CIA 的特殊操作,
火炬.
操作.aten.
到.
数据类型:
保留 CIA 的特殊操作,
},
安全=
错误,
):
产生
def _将反编译表分割为 cia 和 python 反编译(
解构表:
字典[
火炬.
操作符.
运算符基类,
可调用]
) 翻译
元组[
字典[
火炬.
操作符.
运算符基类,
可调用
] ...]:
所有可保留的 CIA 操作 =
设置(_collect_all_valid_cia_ops())
可调用的 CIA 操作 = {}
为
操作符
在
列表(
解构表.
键()):
# TODO 我们正在通过一个裂缝静默地允许非安全(非功能)操作通过
# 由于核心 aten 解构表包含非功能条目。一旦我们有了
# 对核心 aten 解构的更严格检查,我们应该提醒用户注意。
# 跟踪问题:(https://github.com/pytorch/pytorch/issues/135759)
# 如果这是一个我们可以修改以导出的有效 CIA 操作,我们检查它是否是:
# 1. 已标记为需要解构。例如:
# decomp_table = decomp_table_to_core_aten()
# del decomp_table[aten.linear]
# 在这种情况下,用户表示除了 aten.linear 之外的所有内容都要分解
# 2. 已标记为自定义分解行为。例如:
# decomp_table = {aten.linear: some_op}
# 对于(1),我们希望移除所有未由用户处理的 CIA 操作,因为
# 它表明它们可以分解,所以我们应该从可保留列表中移除。
# 对于(2),我们只需将自定义分解连接到 AOTDispatcher。
# 在这两种情况下,我们希望从 decomp_table 中删除这个 CIA 操作,因为它很特殊
# 已处理。
如果
操作符
在
所有可保存的 CIA 操作:
将 CIA 操作转换为可调用[
操作] =
解构表[
操作]
所有可保存的 CIA 操作.
删除(
操作)
删除
解构表[
操作]
如果是一个自定义操作,我们仍然想要保留或对其进行操作
如果它是一个功能性的 CIA。我们不从 CIA 列表中删除的原因是因为我们不查询自定义操作。
# 从 CIA 列表中删除的原因是因为我们不查询自定义操作。
如果...否则 _is_preservable_cia_op(
操作):
操作名称 =
操作.
名称()
断言
不是
操作符名称.
以...开头(
aten),
"这是一个自定义操作"
将 CIA 操作转换为可调用[
操作] =
解构表[
操作]
# 如果到达这里,意味着用户故意删除了这些 CIA 操作
解构表
为 k
在
所有可保存的 CIA 操作:
将 CIA 操作转换为可调用[k] =
_特殊操作以保留 CIA
返回
将 CIA 操作转换为可调用,
解构表
[文档]def 默认解构() -> "自定义解构表":
"""
这是默认分解表,其中包含分解
所有 ATEN 算子到核心 aten 算子集。使用此 API 与
:func:`run_decompositions()`
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
返回自定义解压表()
def 使用新签名常量分解并获取 gm(
ep,
*,
cia_to_decomp: 字典[
火炬.
操作符.
运算符基类,
可调用
]
python 反编译表:
字典[
火炬.
操作符.
运算符基类,
可调用
]
联合损失指数:
可选[int
]
自定义 Triton 运算符分解,
):
from torch._export.passes.提升常量_pass
导入
_物化并提升常量
from torch._functorch.自动微分
导入
aot 导出模块
from torch.export._trace 导入 (
_disable_custom_triton_op_functional_decomposition,
_export_to_aten_ir,
_ignore_backend_decomps,
验证神经网络模块堆栈,
验证占位符名称,
验证堆栈跟踪,
)
from torch.fx.experimental.symbolic_shapes 导入
形状环境
def 联合 IR 分解(ep,
联合损失指数):
返回 (
联合损失指数 is
不是
无
或者 ep.
图签名.
向后签名 is
不是
无
)
如果
不是
联合 IR 分解(ep,
联合损失指数):
修饰 = ep.
模块()
包装的参数和缓冲区 = {
**字典(
模块.
命名参数。(
删除重复项=
错误)),
**字典(
模块.
命名缓冲区(
删除重复项=
错误)),
}
from torch._functorch._aot_autograd.subclass_parametrization 导入 (
解包张量子类参数。,
)
# [注意] 解包子类 AOT
# 在 torch.compile 中,子类解包/包装发生在运行时
# 但在导出时不可能,因为它打算在
C++环境。因此,我们在 AOT 阶段解包子类参数。
导出的 Program 的 state_dict 与急切模型不会相同,因为急切模型
可能包含子类权重,而 ExportedProgram 将包含简化版本。
这没关系,因为 run_decompositions 应该针对 post-autograd 进行特殊化。
子类去糖化应发生的图。
解包张量子类参数。(
模块)
解包的参数缓冲区。 = {
**字典(
模块.
命名参数。(
删除重复项=
错误)),
**字典(
模块.
命名缓冲区(
删除重复项=
错误)),
}
# TODO T204030333
模拟模式 =
从 gm 检测伪造模式(ep.
图模块)
如果
模拟模式 is
无:
模拟模式 =
假 Tensor 模式(
形状环境=ShapeEnv(),
导出=
是)
将图输出签名修复为元组,如果为标量
输出规范 =
模块._out_spec
orig_arg_names = 模块.graph.
代码生成器.
pytree 信息.
原始参数
# aot_export 预期返回类型始终为元组。
如果
输出规范.
类型
不是
在 (
列表,
元组):
输出规范 =
py 树.
树规范(
元组,
无, [
输出规范])
模块.graph.
代码生成器 =
_PyTree 代码生成器(
_PyTreeInfo(
原始参数名称,
模块._in_spec,
输出规范,
)
)
模块.
重新编译()
# 导出的模块将存储常量和非持久缓冲区,
# 将它们视为持久缓冲区,因此我们通知常量提升传递
# 并使用之前的程序覆盖新的图签名。
_收集并设置常量属性(ep.
图签名, ep.
常量,
模块)
# 当我们有一个具有常量属性的模块时,AotDispatcher 实际上
# 不会将它们包装为功能张量,因为 dynamo 已经将其转换为缓冲区。
然而,在非严格模式下,AotDispatcher 可以拦截常量,导致它无法
将常数张量上操作的运算符函数化。由于 dynamo 已经
将常量包装为缓冲区,我们暂时将常量注册为缓冲区,然后撤销此操作
# AOTDispatcher 完成后进行的操作。
临时已注册常量 = _register_constants_as_buffers(
模块, ep.
状态字典, ep.
图签名.
非持久缓冲区
)
获取排除常量后的参数和缓冲区
模拟参数缓冲区 =
_模拟化参数缓冲区(
模拟模式,
模块)
将参数缓冲区转换为节点元数据 =
收集参数缓冲区元数据(
模块)
# TODO (tmanlaibaatar) 理想情况下,run_decomp 应该只调用 _non_strict_export
# 但由于对常量的特殊处理,作为非持久性缓冲区使其变得有些困难
# 但我们应该统一这个代码路径。T206837815
from torch._export.non_strict_utils 导入 (
_启用类型为 nn_module 的图输入,
模拟脚本对象,
)
retracing_args = 输入文本为空,请提供需要翻译的文本
为
节点
在
模块.graph.
节点:
如果
节点.
操作符 ==
占位符:
如果 isinstance(
节点.
元数据[
值
]
自定义对象参数):
真实脚本对象 =
无
如果
节点.
元数据[
值].
模拟值 is
无:
真实脚本对象 = ep.
常量[
节点.
元数据[
值].
名称]
else:
真实脚本对象 =
节点.
元数据[
值].
假值.
真实对象
追踪参数.append(
真实脚本对象)
else:
追踪参数.append(
节点.
元数据[
值])
与 (
模拟模式
), _覆盖_aten_to_variants 的解压缩(),
_覆盖复合隐式分解_(
cia_to_decomp,
), _启用类型为 nn_module 的图输入(
ep.示例输入
):
追踪参数未展开 =
py 树.
树形展开(
追踪参数,
模块._in_spec
)
# 这需要空的 kwargs,但不是在 pytree.flattened 格式中
与
模拟脚本对象(
模块,
(
*追踪未展开的参数[0
]
*追踪未展开的参数[1].
值(),
),
{},
模拟模式,
) 是 (
修补模块,
新的伪造参数,
新的伪造关键字参数,
新的伪造常量属性,
将假映射到真,
):
aten 导出工件 = _export_to_aten_ir(
修补模块,
新的伪造参数,
新的伪造关键字参数,
伪造的参数缓冲区,
新的伪造常量属性,
解构表=
python 反编译表,
_检查自动微分状态=
错误,
_美化占位符名称=
错误,
自定义 Triton 运算符分解=
自定义 Triton 运算符分解,
)
aten_export_artifact.constants 仅包含伪造的脚本对象,我们需要将它们映射回
aten 导出工件.
常量 = {
完全限定名: (
将假映射到真[
对象]
如果 isinstance(
对象,
假脚本对象)
否则
对象
)
为
完全限定名,
对象
在
aten 导出工件.
常量.
项目()
}
gm = aten 导出工件.gm
新图签名 =
aten 导出工件.sig
在上一步,我们假设常量作为 AOTDispatcher 的缓冲区
正确地函数化,所以在这里撤销
新图签名 = (
为临时注册的常量覆盖图签名(
新图签名,
临时注册的常量
)
)
_populate_param_buffer_metadata_to_new_gm(
将参数缓冲区到节点元数据, gm,
新图签名
)
# 覆盖非持久性缓冲区的签名
新图签名 = _overwrite_signature_for_non_persistent_buffers(
ep.图签名,
新图签名
)
常量 =
材质化并提升常量(
gm, 新图签名,
新假常量属性
)
占位符命名通过(
gm,
新图签名,
修补模块,
新的伪造参数,
新的伪造关键字参数,
伪造的参数缓冲区,
常量,
)
验证神经网络模块堆栈(gm)
验证堆栈跟踪(gm)
验证占位符名称(gm,
新图签名)
gm, 新图签名 =
移除不必要的复制操作节点(
gm, 新图签名
)
当我们应用参数化规则以展开
子类时,状态字典将会有所不同
我们需要手动过滤这些去糖化参数。理想情况下,我们应该直接返回
并更新 ep.state_dict。
# ep.module 的状态字典,但 ep.module 只存储参数
# 参与前向传播的缓冲区。如果我们取消这种行为,
# 将会破坏一些下游用户。
为
名称, p
在
未包装的参数缓冲区.
项目():
如果
名称
不
在
包装的参数缓冲区:
ep.状态字典[
名称] = p
为
名称, p
在
包装的参数缓冲区.
项目():
缓冲区可以是持久的/非持久的
如果
名称
不
在 ep.
状态字典:
断言
不 isinstance(p,
火炬.
神经网络.
参数)
如果
名称
在 ep.
状态字典:
如果
名称
不
在
未包装的参数缓冲区:
ep.状态字典.
流行(
名称)
返回 gm,
新图签名, ep.
状态字典
旧占位符 = [
节点
为
节点
在 ep.
图模块.graph.
节点
如果
节点.
操作符 ==
占位符
]
模拟参数 = [
节点.
元数据[
值]
为
节点
在
旧占位符]
要删除的缓冲区 = [
名称
为
名称, _
在 ep.
图模块.
命名缓冲区()]
为
名称
在
要删除的缓冲区:
delattr(ep.图模块,
名称)
# TODO(zhxhchen17) 直接返回新的图签名。
模拟模式 =
检测伪造模式(
模拟参数)
模拟模式 = contextlib.
无上下文()
如果
模拟模式 is
无
否则
模拟模式
自定义 Triton 操作分解上下文 = (
contextlib.空上下文
如果
分解自定义 Triton 操作
否则
禁用自定义 Triton 操作的功能分解
)
与 _ignore_backend_decomps(),
模拟模式,
_覆盖复合隐式分解_(
cia_to_decomp
), custom_triton_ops_decomposition_ctx():
gm, graph_signature = aot_export_module(
ep.图模块,
模拟参数,
分解=
python 反编译表,
跟踪关节=
真实
如果
联合损失指数 is
不是
无
否则
错误,
输出损失索引=(
联合损失指数
如果
联合损失指数 is
不是
无
否则
无
),
)
gm.graph.删除死代码()
# 更新签名,以适应调用 aot_export 时可能发生的变化的新占位符名称
# 当调用 aot_export 时,如果签名已更改,请更新签名
def 更新参数(old_arg,
新的_ph):
如果 isinstance(old_arg,
常量参数):
返回
旧参数
如果...否则 isinstance(old_arg,
张量参数):
返回
张量参数(
名称=
新的_ph.
名称)
如果...否则 isinstance(old_arg,
符号整型参数):
返回
符号整型参数(
名称=
新的_ph.
名称)
如果...否则 isinstance(old_arg,
符号浮点参数):
返回
符号浮点参数(
名称=
新的_ph.
名称)
如果...否则 isinstance(old_arg,
符号布尔参数):
返回
符号布尔参数(
名称=
新的_ph.
名称)
提升
运行时错误(f
"不支持的 old_arg 类型:"{
类型(old_arg)}")
新占位符 = [
节点
为
节点
在 gm.graph.
节点
如果
节点.
操作符 ==
占位符]
新输出 =
列表(gm.graph.
节点
)-1].
参数[0]
将占位符重命名
断言
长度(
新占位符) ==
长度(
旧占位符)
为
旧的_ph,
新短语
在 zip(
旧占位符,
新占位符):
新的_ph.
名称 =
新的_ph.
目标 =
旧的_ph.
名称
# 处理新分解的图节点名称冲突
名称映射 = {ph.
名称: ph.
名称
为 ph
在
新占位符}
为
节点
在 gm.graph.
节点:
如果
节点.
操作符 ==
占位符:
continue
节点.
名称 = _rename_without_collisions(
名称映射,
节点.
名称,
节点.
名称)
# 将名称传播到更高阶的子图操作
命名 hoo 子图占位符(gm)
在创建输入/输出规范之前运行此步骤,因为与大小相关的 CSE/DCE 可能会影响输出签名。
之后覆盖输出规范。
from torch._export.passes._node_metadata_hook 导入 (
_节点元数据钩子,
_设置节点元数据钩子,
)
from torch._functorch._aot_autograd 输入输出分析
导入 _graph_output_names
如果
不是
火炬._dynamo.
配置.
不发出运行时断言:
堆栈跟踪 = (
'文件 "torch/fx/passes/runtime_assert.py",第 24 行,'
"在 insert_deferred_runtime_asserts 中"
)
形状环境 = _get_shape_env(gm)
如果
形状环境 is
不是
无:
与
_设置节点元数据钩子(
gm, functools.偏函数(
_节点元数据钩子,
堆栈跟踪=
堆栈跟踪)
):
插入延迟运行断言(
gm,
形状环境,
f导出程序{first_call_function_nn_module_stack(gm.graph)}",
导出=
是,
)
# 更新输出规范
gm.重新编译()
为 i,
名称
在
列举(_graph_output_names(gm)):
如果 isinstance(
新输出[i
]
火炬.fx.
节点):
新输出[i].
名称 =
名称
# 与输出目标匹配正确的输入以进行输入变异
# 需要找到旧到新的占位符映射
旧新占位符映射 = {
规格.arg.
名称:
新占位符[i].
名称
为 i,
规范
在
列举(ep.
图签名.
输入规范)
如果
不是 isinstance(
规格.arg,
常量参数)
}
输入规范 = [
输入规范(
规格.
类型,
更新参数(
规格.arg,
新占位符[i
)]
规格.
目标,
规格.
持久性,
)
为 i,
规范
在
列举(ep.
图签名.
输入规范)
]
输出规范 =
输入文本为空,请提供需要翻译的文本
处理缓冲区与输入突变;这些出现在损失输出和梯度之前
(1)ep.graph_signature.input_specs 告诉我们输入类型
(2)graph_signature.user_inputs 告诉我们节点输入名称的顺序
(3)graph_signature.user_inputs_to_mutate 告诉我们缓冲区和输入突变
# 地图 (3) -> (2) 为输入顺序,-> (1) 为输入类型
用户输入索引 = {
名称: i
为 i,
名称
在
列举(
图签名.
用户输入)}
突变名称 =
列表(
图签名.
要修改的用户输入.
键())
断言
突变名称 == [
节点.
名称
为
节点
在
新输出
[
长度(
突变名称
))
为
输出名称,
输入名称
在
图签名.
要修改的用户输入.
项目():
i = 用户输入索引[
输入名称]
输入规范 = ep.
图签名.
输入规范[i]
断言
输入规范.
仁慈
在 (
输入类型.
用户输入,
输入类型.
缓冲区)
输出类型 = (
输出类型.
缓冲区突变
如果
输入规范.
仁慈 ==
输入类型.
缓冲区
否则
输出类型.
用户输入突变
)
目标 = (
输入规范.
目标
如果
输入规范.
仁慈 ==
输入类型.
缓冲区
否则
输入规范.arg.
名称
)
输出规范.append(
输出规范(
类型=
输出类型,
arg=张量参数(
名称=
输出名称),
目标=
目标,
)
)
# 处理实际用户输出
为 i,
规范
在
列举(ep.
图签名.
输出规范):
输出规范.append(
输出规范(
输出类型.
损失输出
如果 i ==
联合损失指数
否则
规格.
类型,
更新参数(
规格.arg,
新输出[
长度(
突变名称) + i
)]
旧新占位符映射.
获取(
规格.
目标,
规格.
目标),
)
)
如果
联合损失指数 is
不
无:
断言
图签名.
向后签名 is
不
无
梯度 =
图签名.
反向签名.
梯度到用户输入
断言
长度(
图签名.
用户输入) ==
长度(ep.
图签名.
输入规范)
规范 = {
图签名.
用户输入[i]:
规范
为 i,
规范
在
列举(ep.
图签名.
输入规范)
如果 isinstance(
规格.arg,
张量参数)
}
为 i,
节点
在
列举(
新输出[
长度(
输出规范)
(]:)
源 =
渐变[
节点.
名称]
规范 =
规范[
源]
忽略索引
如果
规格.
仁慈 ==
输入类型.
参数:
仁慈 =
输出类型.
梯度到参数
目标 =
规格.
目标
如果...否则
规格.
仁慈 ==
输入类型.
用户输入:
仁慈 =
输出类型.
梯度到用户输入
目标 =
源
else:
提升
断言错误(f
未知输入类型:{
规格.
类型}")
输出规范.append(
输出规范(
类型,
张量参数(
名称=
节点.
名称),
目标,
)
)
断言
长度(
新占位符) ==
长度(
旧占位符)
新图签名 =
导出图签名(
输入规范=
输入规范,
输出规范=
输出规范
)
# 注意:aot_export 为整数占位符添加 symint 元数据
# values;因为这些变得专业化,我们用这样的元数据替换
原始值。
同时,将 param/buffer 元数据恢复到占位符。
为
旧节点,
新节点
在 zip(
旧占位符,
新占位符):
如果
不 isinstance(
旧节点.
元数据[
值
]
火炬.
张量):
新节点.
元数据[
值] =
旧节点.
元数据[
值]
如果 (
新节点.
目标
在
新图签名.
输入到参数
或者
新节点.
目标
在
新图签名.
输入到缓冲区
):
为 k, v
在
旧节点.
元数据.
项目():
新节点.
元数据[k] = v
返回 gm,
新图签名, ep.
状态字典
def 移除不必要的复制操作节点(
gm: 火炬.fx.GraphModule,
新图签名:
导出图签名
) 翻译
元组[
火炬.fx.GraphModule,
导出图签名]:
""
移除由于缓冲区突变而引入的冗余 copy_节点。
"文档"
与 gm._set_replace_hook(
新图签名.get_replace_hook()):
为
节点
在 gm.graph.
节点:
如果
节点.
操作符 ==
输出:
参数, _ =
py 树.
树形展平(
节点.
参数)
为 out
在
参数:
如果 (
isinstance(输出,
火炬.fx.
节点)
和
输出.
名称
在
新图签名.
待变更的缓冲区
):
如果 (
输出.
操作符 ==
调用函数
和
输出.
目标 ==
火炬.
操作.aten.
复制.
默认
):
输出.
替换所有引用(
输出.
参数[1]) # type: ignore[arg-type]
gm.graph.删除节点(
输出)
gm.重新编译()
返回 gm,
新图签名
def _通用 getitem 消除 Pass(
gm: 火炬.fx.GraphModule,
图签名,
模块调用图
):
与 gm._set_replace_hook(
图签名.get_replace_hook()):
为
模块
在 gm.
模块():
如果
不 isinstance(
模块,
火炬.fx.GraphModule):
continue
node_id: 字典[
火炬.fx.
节点,
字符串] = {}
获取项目:
字典[
字符串,
火炬.fx.
节点] = {}
为
节点
在
列表(
模块.graph.
节点):
如果
节点.
操作符 ==
调用函数
和
节点.
目标 ==
操作符.
获取项:
源,
索引 =
节点.args
新 ID = f"{node_id[
源]}.{
索引}"
如果
新 ID
在
获取项目:
节点.
替换所有引用(
获取项目[
新 ID])
为
条目
在
模块调用图:
如果
条目.
签名 is
不
无:
条目.
签名.
替换所有引用(
节点,
获取项目[
新 ID]
)
模块.graph.
删除节点(
节点)
else:
获取项目[
新 ID] =
节点
node_id[节点] =
新 ID
else:
node_id[节点] =
节点.
名称
def 获取更新后的模块调用图(
gm: 火炬.fx.GraphModule,
旧模块调用图:
列表[
模块调用条目
]
):
新模块调用图 =
复制.
深拷贝(
旧模块调用图)
使用节点级来源元数据创建映射
从旧节点名称到新节点名称
源:
字典[
字符串,
字符串] = {}
为
节点
在 gm.graph.
节点:
如果
历史 :=
节点.
元数据.
获取(
from_node, []):
源[
历史[-1].
名称] =
节点.
名称
# 在模块调用签名中将旧名称映射到新名称
为
条目
在
新模块调用图:
签名 =
条目.
签名
如果
签名 is
无:
continue
为 x
在 [*
签名.
输入, *
签名.
输出
]
x.名称 =
源.
获取(x.
名称, x.
名称)
返回
新模块调用图
def _分解导出的程序(
ep,
*,
cia_to_decomp: 字典[
火炬.
操作符.
运算符基类,
可调用
]
python 反编译表:
字典[
火炬.
操作符.
运算符基类,
可调用
]
联合损失指数:
可选[int
]
自定义 Triton 运算符分解:
布尔,
):
(
gm,
新图签名,
状态字典,
) = 使用新签名常量分解并获取 gm(
ep,
cia_to_decomp=cia_to_decomp,
python 反编译表=
python 反编译表,
联合损失指数=
联合损失指数,
自定义 Triton 运算符分解=
自定义 Triton 运算符分解,
)
ep.module_call_graph 的签名指的是输入/输出节点
原始图模块。然而,新的图模块可能
由于分解产生了新节点。因此,我们需要更新这些签名。
# 在分解导出程序的模块调用图中。
新模块调用图 =
获取更新后的模块调用图(
gm,
ep.模块调用图,
)
# TODO 很遗憾,在 aot_export 中无法很好地保留图级别元数据。所以我们手动复制。
# 因此我们手动复制它。
# (节点级别的元数据已在上方处理。)
gm.元数据.
更新(ep.
图模块.
元数据)
新的范围约束 =
_获取更新后的范围约束(
gm,
ep.范围约束,
)
导出程序 =
导出程序(
根=gm,
graph=gm.graph,
图签名=
新图签名,
状态字典=
状态字典,
范围约束=
新范围约束,
模块调用图=
新模块调用图,
示例输入=ep.
示例输入,
常量=ep.
常量,
)
返回
导出程序
[文档]
类
导出程序:
""
从 :func:`export` 导出的程序包。它包含
表示张量计算的 :class:`torch.fx.Graph`,包含
所有提升参数和缓冲区的张量值以及各种元数据。
你可以像追踪原始可调用对象一样调用一个导出的程序
使用与原始相同的调用约定:func:`export`。
要对图进行转换,请使用`.module`属性来访问
一个:class:`torch.fx.GraphModule`。然后你可以使用
`FX 转换 `_
重新编写图。之后,您只需使用 :func:`export`
再次构建正确的 ExportedProgram。
"文档"
def __init__(
self,
根:
并集[
火炬.
神经网络.
模块,
字典[
字符串,
任何]],
graph: 火炬.fx.
图,
图签名:
导出图签名,
状态字典:
字典[
字符串,
并集[
火炬.
张量,
火炬.
神经网络.
参数]],
范围约束:
dict[sympy.Symbol, 任何],
模块调用图:
列表[
模块调用条目
]
示例输入:
可选[
元组[
元组[
任何, ...
]
字典[
字符串,
任何]]] =
无,
常量:
可选[
字典[
字符串,
并集[
火炬.
张量,
假脚本对象,
火炬._C.
脚本对象]]
] = 无,
*,
验证器:
可选[
列表[
类型[
验证器]]] =
无,
):
# 从图中移除与代码生成相关的内容。它应该只是一个扁平的图。
graph.代码生成器 =
火炬.fx.graph.CodeGen()
self._图模块 =
为导出创建图模块(
根, graph)
如果 isinstance(
根,
火炬.fx.GraphModule):
self._图模块.
元数据.
更新(
根.
元数据)
_通用 getitem 消除 Pass(
self._图模块,
图签名,
模块调用图
)
self.图签名:
导出图签名 = graph_signature
self._state_dict: 字典[
字符串,
任何] =
状态字典
self.范围约束:
字典[sympy.
符号, ValueRanges] =
范围约束
断言
模块调用图 is
不
无
self.__模块调用图:
列表[
模块调用条目] =
模块调用图
self._示例输入 =
示例输入
self._常量 =
常量
或者 {}
验证器们 =
验证器们
或者 [
验证器]
断言
所有(
派生类(v,
验证器)
为 v
在
验证器)
self._验证器们 =
验证器们
构造器的最后一步应该是验证。
self.验证()
@property
@compatibility(兼容旧版本=
错误)
def 图模块(self):
返回 self.
_图模块
@graph_module.setter
@compatibility(兼容旧版本=
错误)
def 图模块(self,
值):
提升
运行时错误(
无法设置 ExportedProgram 的 graph_module 属性。)
@property
@compatibility(兼容旧版本=
错误)
def graph(self):
返回 self.
图模块.
图
@graph.setter
@compatibility(兼容旧版本=
错误)
def graph(self, 值):
提升
运行时错误(
"无法设置 ExportedProgram 的图属性。")
@property
@compatibility(兼容旧版本=
错误)
def 图签名(self):
返回 self.
_图签名
@graph_signature.setter
@compatibility(兼容旧版本=
错误)
def 图签名(self,
值):
提升
运行时错误(
无法设置 ExportedProgram 的 graph_signature 属性。)
@property
@compatibility(兼容旧版本=
错误)
def 状态字典(self):
返回 self.
_状态字典
@state_dict.setter
@compatibility(兼容旧版本=
错误)
def 状态字典(self,
值):
提升
运行时错误(
"无法设置 ExportedProgram 的状态字典属性。")
[文档] @兼容性(向后兼容性=False)
def parameters(self) -> 迭代器[torch.nn.Parameter]:
"""
返回原始模块参数的迭代器。
"""
for _, param in self.named_parameters():
yield param
[文档] @兼容性(向后兼容=False)
def named_parameters(self) -> 迭代器[tuple[str, torch.nn.Parameter]]:
"""
返回原始模块参数的迭代器,生成
参数的名称以及参数本身。
```python
# 输入文本
input_text = '"""'
# 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 假设的翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
for param_name in self.graph_signature.parameters: # 对于 self.graph_signature 中的每个参数 param_name:
产出 param_name, self.state_dict[param_name]
[文档] @兼容性(向后兼容=False)
def buffers(self) -> 迭代器[torch.Tensor]:
"""
返回原始模块缓冲区的迭代器。
"""
遍历 self.named_buffers()中的每个 buf
生成 buf
[文档] @兼容性(向后兼容=False)
def named_buffers(self) -> 迭代器[tuple[str, torch.Tensor]]:
"""
返回原始模块缓冲区的迭代器,生成
缓冲区的名称以及缓冲区本身。
"""
non_persistent_buffers = set(self.graph_signature.non_persistent_buffers)
for buffer_name in self.graph_signature.buffers:
如果 buffer_name 在 non_persistent_buffers 中:
yield buffer_name, self.constants[buffer_name]
如果不是:
yield buffer_name, self.state_dict[buffer_name]
@property
@compatibility(兼容旧版本=
错误)
def 范围约束(self):
返回 self.
_范围约束
@range_constraints.setter
@compatibility(兼容旧版本=
错误)
def 范围约束(self,
值):
提升
运行时错误(
"无法设置 ExportedProgram 的 range_constraints 属性。"
)
@property
@compatibility(兼容旧版本=
错误)
def 模块调用图(self):
返回 self.
模块调用图
@module_call_graph.setter
@compatibility(兼容旧版本=
错误)
def 模块调用图(self,
值):
提升
运行时错误(
无法设置 ExportedProgram 的 module_call_graph 属性。
)
@property
@compatibility(兼容旧版本=
错误)
def 示例输入(self):
返回 self.
_示例输入
@example_inputs.setter
@compatibility(兼容旧版本=
错误)
def 示例输入(self,
值):
# 这也是允许的
如果
值 is
无:
self._示例输入 =
值
返回
如果
不 (
isinstance(值,
元组)
和
长度(
值) == 2
和 isinstance(
值[0
]
元组)
和 isinstance(
值[1
]
字典)
):
提升 ValueError(
"示例输入应该是包含示例参数的元组(如“”
"一个元组),以及示例关键字参数(如一个字典)。"
)
参数, kwargs =
值
from ._unlift 导入
检查输入是否匹配
检查输入是否匹配(
参数, kwargs, self.
调用规范.
在规范中)
self._示例输入 =
值
@property
@compatibility(兼容旧版本=
错误)
def 调用规范(self):
调用规范 =
命名元组(
"调用规范", [
in_spec,
out_spec])
如果
长度(self.
模块调用图) == 0:
返回 CallSpec(
在规范中=
无,
输出规范=
无)
断言 self.
模块调用图[0].
完全限定名 ==
请提供需要翻译的文本
返回 CallSpec(
在规范中=self.
模块调用图[0].
签名.
在规范中,
输出规范=self.
模块调用图[0].
签名.
输出规范,
)
@call_spec.setter
@compatibility(兼容旧版本=
错误)
def 调用规范(self,
值):
提升
运行时错误(
无法设置 ExportedProgram 的 call_spec 属性。)
@property
@compatibility(兼容旧版本=
错误)
def 验证器(self)
翻译
任何:
返回 self.
验证者[0]
@verifier.设置器
@compatibility(兼容旧版本=
错误)
def 验证器(self,
值):
提升
运行时错误(
"无法设置 ExportedProgram 的验证器属性。")
@property
@compatibility(兼容旧版本=
错误)
def 方言(self)
翻译
字符串:
断言 self.
_验证器们 is
不
无
返回 self.
验证者[0].
方言
@dialect.setter
@compatibility(兼容旧版本=
错误)
def 方言(self,
值):
提升
运行时错误(
"无法设置 ExportedProgram 的方言属性。")
@property
@compatibility(兼容旧版本=
错误)
def 验证器(self):
返回 self.
_验证器们
@verifiers.setter
@compatibility(兼容旧版本=
错误)
def 验证器(self,
值):
提升
运行时错误(
"无法设置 ExportedProgram 的验证器属性。")
@property
@compatibility(兼容旧版本=
错误)
def 张量常量(self):
返回 self.
_常量
@tensor_constants.setter
@compatibility(兼容旧版本=
错误)
def 张量常量(self,
值):
提升
运行时错误(
"无法设置 ExportedProgram 的 tensor_constants 属性。"
)
@property
@compatibility(兼容旧版本=
错误)
def 常量(self):
返回 self.
_常量
@constants.setter
@compatibility(兼容旧版本=
错误)
def 常量(self,
值):
提升
运行时错误(
"无法设置 ExportedProgram 的 constants 属性。")
def _get_flat_args_with_check(self, 参数, kwargs):
"使用 pytree 展开 args 和 kwargs,然后检查规格。"
参数:
args: Any 类型的 List,原始传递给__call__的参数
kwargs: Dict[str, Any] 原始传递给 __call 的 kwargs
返回:
A tuple of (flat_args, received_spec)
flat_args 是展开的参数 / kwargs
received_spec 是在展开过程中产生的 pytree 规范
元组(args, kwargs)
"文档"
在规范中 = self.
调用规范.
在规范中
如果
在规范中 is
不
无:
kwargs = 重新排序参数(kwargs,
在规范中)
带路径的平铺参数,
收到的规范 =
py 树.
带路径的树扁平化(
(参数, kwargs)
)
self._检查输入约束(
带路径的平铺参数)
平铺参数 =
元组(x[1]
为 x
在
带路径的平铺参数)
返回
平铺参数,
收到的规范
def _graph_module_flat_inputs(self, 参数:
任何, kwargs:
任何)
翻译
任何:
将 __call__ 的 args 和 kwargs 转换为 graph_module 的 args。
self.graph_module 从状态字典中获取输入。
不变量是针对 ep:ExportedProgram 的。
ep(args, kwargs) ==
ep.postprocess(ep.graph_module(ep.graph_module_flat_inputs(args, kwargs)))
"文档"
在规范中 =
我.
调用规范.
在规范中
平铺参数,
收到的规范 =
我._get_flat_args_with_check(
参数, kwargs)
如果
在规范中 is
不
无
和
不
是否等效(
received_spec, 在规范中,
_fx_collection_equivalence_fn(根据上下文可能翻译为“FX 集合等价函数”)
):
提升 ValueError(
尝试使用导出的输入树规范来扁平化用户输入:
输入文本翻译为简体中文为:
\n"
f"{在规范中}
输入文本翻译为简体中文为:\n"
但实际上得到了以下树规范的输入:
输入文本翻译为简体中文为:
\n"
f"{received_spec}"
)
额外输入 =
输入文本为空,请提供需要翻译的文本
为
输入_
在
我.
图签名.
输入规范:
如果
输入_.
仁慈 ==
输入类型.
用户输入:
continue
如果...否则
输入_.
仁慈
在 (
输入类型.
参数,
输入类型.
缓冲区,
):
如果
输入_.
持久 is
错误:
# 这是一个非持久缓冲区,从我们这里获取它
常量而非状态字典
输入补充.append(self.
常量[
输入_.
目标])
else:
输入补充.append(self.
状态字典[
输入_.
目标])
如果...否则
输入_.
仁慈
在 (
输入类型.
常量张量,
输入类型.
自定义对象,
):
输入补充.append(self.
常量[
输入_.
目标])
额外输入 =
元组(
输入补充)
# 第一个参数,然后是缓冲区,最后是用户提供的参数
# 参考:torch/_functorch/aot_autograd.py#L1034
返回
额外输入 +
平铺参数
def __调用__(self, *
参数:
任何, **kwargs:
任何)
翻译
任何:
提升
运行时错误(
"无法直接调用 ExportedProgram。"
"您应该使用`exported_program.module()`代替。"
)
def _postprocess_graph_module_outputs(self, 资源, orig_args,
原始参数):
处理输入的潜在突变。
因为 self.graph_module 是功能性的,所以突变必须在执行 graph_module 后写回。
因为 self.graph_module 是功能性的,所以突变必须在执行 graph_module 后写回。
"文档"
导入
torch._export 错误
是
错误
平铺参数, _ = self._get_flat_args_with_check(orig_args,
原始参数)
如果 self.
调用规范.
输出规范 is
不
无:
缓冲区突变 = self.
图签名.
待变更的缓冲区
用户输入变异 = self.
图签名.
要修改的用户输入
变异数量 =
长度(
缓冲区变异) +
长度(
用户输入变异)
变异值 =
资源
[
突变数量]
# 从最终结果中排除依赖令牌。
断言依赖标记 = self.
图签名.
断言依赖标记
如果
断言依赖标记 is
不
无:
断言依赖标记索引 =
下一(
迭代(
断言依赖标记.
键()))
res = 资源
[
断言依赖令牌索引]
res = 资源[
突变数量
]
尝试:
res = py 树.
树形展开(
资源,
我.
调用规范.
输出规范)
除了
异常:
_, 收到的规范 =
py 树.
树形展平(
资源)
提升
错误.
内部错误( # noqa: B904
尝试使用导出的输出树规范来扁平化用户输出:
输入文本翻译为简体中文为:
\n"
f"{我.
调用规范.
输出规范}
输入文本翻译为简体中文为:\n"
"但实际上得到了树形规格的输出:"
输入文本翻译为简体中文为:
\n"
f"{received_spec}"
)
最后:
用户输入 = [
规范
为
规范
在
我.
图签名.
输入规范
如果
规格.
仁慈 ==
输入类型.
用户输入
]
为 i,
值
在
列举(
突变值):
输出规范 =
我.
图签名.
输出规范[i]
如果
输出规范.
仁慈 ==
输出类型.
缓冲区突变:
断言
输出规范.
目标 is
不
无
我.
状态字典[
输出规范.
目标] =
值
如果...否则
输出规范.
仁慈 ==
输出类型.
用户输入突变:
断言
输出规范.
目标 is
不
无
索引 =
下一(
i
为 i,
规范
在
列举(
用户输入)
如果
规格.arg.
名称 ==
输出规范.
目标
)
平铺参数[
索引].
复制_(
值)
else:
提升
断言错误(f
"意外的类型:"{
输出规范.
类型}")
返回 res
def __str__(我)
翻译
字符串:
图模块 =
我.
图模块.
打印可读(
打印输出=
错误,
彩色=
假
).替换("
输入文本翻译为简体中文为:\n", "
输入文本翻译为简体中文为:\n ")
字符串 = (
"导出程序:"
输入文本翻译为简体中文为:\n"
f" {图模块}
输入文本翻译为简体中文为:\n"
f"图签名:"{
我.
图签名}
输入文本翻译为简体中文为:\n"
f范围约束:{
我.
范围约束}
输入文本翻译为简体中文为:\n"
)
返回
字符串
[文档] def module(self) -> torch.nn.Module:
"""
返回一个包含所有参数/缓冲区的自包含 GraphModule。
"""
从._unlift 导入_unlift_exported_program_lifted_states
module = _unlift_exported_program_lifted_states(self)
def _train(self, mode: bool = True):
raise NotImplementedError("调用 train() 尚不支持。")
def _eval(self, mode: bool = True):
raise NotImplementedError("调用 eval() 尚不支持。")
module.train = types.MethodType(_train, module) # 忽略方法赋值
module.eval = types.MethodType(_eval, module) # 忽略方法赋值
return module
def _num_lifted_params_buffers(我):
返回
下一(
(
i
为 i, s
在
列举(
我.
图签名.
输入规范)
如果 s.
仁慈 ==
输入类型.
用户输入
),
长度(
我.
图签名.
输入规范),
)
[文档] @_disable_prexisiting_fake_mode
def 运行分解(
我,
解构表:
可选[
字典[
火炬.
操作符.
运算符基类,
可调用]] =
无,
自定义 Triton 运算符分解:
布尔值 =
错误,
) 翻译
"已导出程序":
""
运行导出程序的一系列分解并返回新的
导出程序。默认情况下,我们将运行 Core ATen 分解
获取操作符
核心 ATen 运算符集 _.
目前我们不分解关节图。
参数:
decomp_table:
一个可选参数,用于指定 Aten 操作的分解行为
(1)如果为 None,我们将分解为核心 aten 分解
(2) 如果为空,则不分解任何运算符
一些示例:
如果您不想分解任何内容
.. 代码块 :: python
ep = torch.export.export(model, ...)
ep = ep.run_decompositions(decomp_table={})
如果您想获取除某些操作符之外的核心 aten 操作符集,可以执行以下操作:
.. 代码块 :: python
ep = torch.export.export(model, ...)
decomp_table = torch.export.default_decompositions()
decomp_table[your_op] = your_custom_decomp
ep = ep.run_decompositions(decomp_table=decomp_table)
"文档"
_decomp_table = (
默认分解()
如果
解构表 is
无
否则
字典(
解构表)
)
如果 isinstance(_decomp_table,
自定义分解表):
_decomp_table = _decomp_table.实现化()
# 注意 [将解构表分为 CIA 解构和非 CIA 解构]
在这一点上,我们有一个包含解构行为的 decomp_table
适用于 CIA 和后自动求导操作的
我们需要将操作分为两类:
1. CIA 操作:这些是我们想要覆盖的操作
# CompositeImplicitAutograd 分解。对于它们,我们需要使用_override_composite_implicit_decomp
上下文管理器以将其通过 AOTDispatcher 连接
# 2. 非 CIA 操作:这些操作仅在 AOTDIspter 运行后相关,因此只需
检查它们是否具有静态功能就足够了。
对于联合 IR 案例,我们需要使用旧路径,因为我们无法注册
由于无法使用上下文管理器(因为它会安装),因此以这种方式进行自定义分解
由于无法处理自动微分错误节点
(
cia_to_decomp,
python 反编译表,
) = _将反编译表分割为 cia 和 python 反编译(_decomp_table)
返回
_分解导出的程序(
我,
cia_to_decomp=cia_to_decomp,
python 反编译表=
python 反编译表,
联合损失指数=
无,
自定义 Triton 运算符分解=
自定义 Triton 运算符分解,
)
def _transform_不要使用(
我, *
通行证:
遍历类型)
翻译
"已导出程序":
pm = PassManager(列表(
通行证))
# 由于我们抽象地运行通行证,因此在此处需要禁用后端反编译
又一次。
from torch.export._trace 导入 _ignore_backend_decomps
与 _ignore_backend_decomps():
res = 项目经理(
我.
图模块)
变换后的 GM =
资源.
图模块
如果 res is
不
无
否则
我.
图模块
断言
变换后的 GM is
不
无
如果
变换后的 GM is
我.
图模块
和
不
资源.
修改的:
返回 self
# TODO(zhxchen17) 删除此内容。
def 获取更新后的图签名(
旧签名:
导出图签名,
新 GM:
火炬.fx.GraphModule,
) 翻译
导出图签名:
""
更新图签名的用户输入/输出。
"文档"
新输入规范 =
输入文本为空,请提供需要翻译的文本
为 i,
节点
在
列举(
新 GM.graph.
节点):
如果
节点.
操作符 !=
占位符:
断开
断言 i <
长度(
旧签名.
输入规范
), 变换后输入数量变化
旧输入规范 =
旧签名.
输入规范[i]
参数 = (
旧输入规范.
参数
如果 isinstance(
旧输入规范.arg, (
常量参数,
自定义对象参数)
)
否则
类型(
旧输入规范.arg)(
节点.
名称)
)
新输入规范.append(
输入规范(
旧输入规范.
类型,
arg,
旧输入规范.
目标,
旧输入规范.
持久性,
)
)
输出节点 =
列表(
新 GM.graph.
节点
)-1]
断言
输出节点.
操作符 ==
输出
新输出规格 =
输入文本为空,请提供需要翻译的文本
为 i,
节点
在
列举(
输出节点.
参数[0
)]
断言 i <
长度(
旧签名.
输出规范
), 变换后输出数量变化
旧输出规范 =
旧签名.
输出规范[i]
参数 = (
旧输出规范.
参数
如果 isinstance(
旧输出规范.arg, (
常量参数,
自定义对象参数)
)
否则
类型(
旧输出规范.arg)(
节点.
名称)
)
新输出规范.append(
输出规范(
旧输出规范.
类型, arg,
旧输出规范.
目标)
)
新签名 =
导出图签名(
输入规范=
新输入规范,
输出规范=
新输出规格
)
返回
新签名
转换后的片段 =
导出程序(
根=
转换后的 gm,
graph=转换后的 gm.graph,
图签名=
获取更新后的图签名(
我.
图签名,
变换后的 GM
),
状态字典=
我.
状态字典,
范围约束=
_获取更新后的范围约束(
转换后的 gm,
我.
范围约束,
),
模块调用图=
复制.
深拷贝(
我.
__模块调用图),
示例输入=
我.
示例输入,
常量=
我.
常量,
验证器=
我.
验证器,
)
转换后的条目.
图模块.
元数据.
更新(
我.
图模块.
元数据)
转换后的条目.
图模块.
元数据.
更新(
资源.
图模块.
元数据)
返回
转换后的片段
def _检查输入约束(
我,
带路径的平铺参数):
from torch._export.utils 导入
检查图输入约束
占位符 = [p
为 p
在
我.graph.
节点
如果 p.
操作符 ==
占位符]
输入占位符 = [
p
为 p, s
在 zip(
占位符,
我.
图签名.
输入规范)
如果 s.
仁慈 ==
输入类型.
用户输入
]
检查图输入约束(
输入占位符,
带路径的平铺参数,
我.
范围约束
)
@compatibility(兼容旧版本=
错误)
def 验证(
我):
我.
验证()
# TODO: 删除此行
@final
def 验证(
我):
断言 (
长度(
我.
验证器) > 0
), "ExportedProgram 必须至少有一个验证器。"
为 v
在
我.
验证器:
v().检查(
我)
# TODO(zhxchen17) 正式化此内容。
def 更新(
我,
图模块,
图签名,
*,
状态字典=
无,
常量=
无,
验证器=
无,
) 翻译
"已导出程序":
返回
导出程序(
根=
图模块,
graph=图模块.graph,
图签名=
图签名,
状态字典=
状态字典
如果
状态字典 is
不
无
否则
我.
状态字典,
范围约束=
复制.
深拷贝(
我.
范围约束),
模块调用图=
复制.
深拷贝(
我.
__模块调用图),
示例输入=
我.
示例输入,
常量=
常量
如果
常量 is
不
无
否则
我.
常量,
验证器=
验证器们
如果
验证器们 is
不
无
否则
我.
验证器,
)
def _get_shape_env(gm):
vals = [
节点.
元数据[
值]
为
节点
在 gm.graph.
节点
如果
节点.
元数据.
获取(
值,
无) is
不
无
]
from torch._guards 导入
检测伪造模式
模拟模式 =
检测伪造模式(vals)
如果
模拟模式 is
不
无:
返回
模拟模式.
形状环境
为 v
在 vals:
如果 isinstance(v,
火炬.SymInt):
返回 v.
节点.
形状环境
def _获取更新后的范围约束(
gm: 火炬.fx.GraphModule,
旧范围约束:
Optional[dict[sympy.Symbol, 任何]] =
无,
) 翻译
dict[sympy.Symbol, 任何]:
断言
旧范围约束 is
不
无
形状环境 = _get_shape_env(gm)
如果
形状环境 is
无:
返回 {}
范围约束 =
复制.
复制(
旧范围约束)
范围约束 = {
k: v 为 k, v
在
范围约束.
项目()
如果 k
不
在
形状环境.
替换
}
# 仅当存在未支持的 symint,并且用作构造函数输入时,
# runtime_var_to_range 与 var_to_range 相比会有所不同。
# 例如 [2, ∞) -> [0, ∞)
为 k, v
在
形状环境.
变量范围.
项目():
如果 k
不
在
形状环境.
替换
和 k
不
在
范围约束:
范围约束[k] = v
返回
范围约束
def 为导出创建图模块(
根, graph):
尝试:
gm = 火炬.fx.GraphModule(
根, graph)
除了
语法错误:
# 如果在图中使用了存储在内存中的自定义对象,
# 生成的 Python 代码将在自定义对象上产生语法错误,
# 因为它无法解析内存中的对象。
我们仍然可以通过 torch.fx.Interpreter 来 eager 地运行图。
因此我们将绕过这个错误。
警告.
警告(
"无法执行从图中生成的 Python 源代码。"
"图模块将不再可以直接调用。"
"但您仍然可以运行已导出的程序,如果需要的话,您可以 "
"使用 torch.fx.Interpreter 热切地运行图模块。"
)
gm = 火炬.fx.GraphModule(
根,
火炬.fx.
图())
gm._graph = 图
返回 gm