# mypy: 允许未类型化定义
导入 abc
导入
复制
导入
记录日志
导入
操作符
导入
正则表达式
from 集合
导入 defaultdict
from contextlib 导入 contextmanager
from 复制
导入
深拷贝
from dataclasses 导入
数据类
from 枚举
导入
枚举
from 打字
导入
任何,
可调用,
角色,
可选,
联合
导入
火炬
导入 torch.fx._pytree
是 fx_pytree
导入 torch.utils._pytree
是 pytree
from torch._library.fake_class_registry 导入
模拟脚本对象
from torch.export._tree_utils 导入
重新排序参数
from torch.export.exported_program 导入 (
恒定参数,
导出程序,
导出图签名,
输入类型,
模块调用签名,
符号布尔参数,
符号浮点参数,
符号整型参数,
张量参数,
)
from torch.fx._symbolic_trace 导入 is_fx_tracing
from torch.fx 图形模块
导入 _get_attr,
通过属性列表获取属性,
打印可读
from torch.utils._pytree 导入
获取属性键,
序列键
from 移除效果标记_pass
导入
移除效果标记
日志 =
记录.
获取日志记录器(__name__)
全部 = [
平坦参数适配器,
解释器模块,
解释器模块分发器,
非扁平化模块,
反扁平化,
]
类
属性类型(
枚举):
参数 =
参数
缓冲区 =
缓冲区
常量 =
"常数"
模块 =
"模块"
使用解释器运行 =
真实
@contextmanager
def _禁用解释器():
全局
使用解释器运行
旧旗帜 =
使用解释器运行
使用解释器运行 =
假
尝试:
产生
最后:
使用解释器运行 =
旧旗帜
# 将属性 'from_obj' 分配给 'to_module' 上的合格名称 'target'
# 如果目标下不存在,则安装空的模块
def _分配属性(
从对象:
联合[
火炬.
张量,
火炬.
脚本对象,
火炬.
神经网络.
模块
]
模块到:
火炬.
神经网络.
模块,
目标:
字符串,
属性类型: _AttrKind,
持久性:
布尔值 =
是,
):
*前缀,
字段 =
目标.
分割(
“。”)
需要生成 `to_module` 的所有子模块,这些子模块位于 `prefix` 下。
`prefix` 的变体,这些变体仅通过调用名称不同。所有这些子模块
都将分配 `from_obj` 在 `field` 上,以便它们可以共享此属性。
例如,如果目标是 foo.bar.f,foo 另有一个调用名称 foo@1,
# bar 还有其他名称,如 bar@1、bar@2,那么我们将 f 分配给
# foo.bar、foo.bar@1、foo.bar@2、foo@1.bar、foo@1.bar@1、foo@1.bar@2。
模块化 = {
模块到}
为
项目
在
前缀:
ts: 设置[
火炬.
神经网络.
模块] =
设置()
为
模块
在
模块化:
如果
不是
有属性(
模块到,
项目):
setattr(模块到,
项目,
火炬.
神经网络.
模块())
ts.更新(
调用
# 类型:忽略[杂项]
为 k,
调用
在
模块到.
模块.
项目()
如果 _is_call_name(k,
项目)
)
到模块 =
简体中文
为
到模块
在
到模块:
如果
属性类型 == _AttrKind.
参数:
断言 isinstance(
从对象,
火炬.
神经网络.
参数)
模块到.
注册参数(
字段,
从对象)
如果...否则
属性类型 ==
属性类型.
缓冲区:
断言 isinstance(
从对象,
火炬.
张量)
模块到.
注册缓冲区(
字段,
从对象,
持久性=
持久性)
如果...否则
属性类型 ==
属性类型.
常量:
断言
不是 isinstance(
从对象,
模拟脚本对象
), "FakeScriptObject 应仅在跟踪期间存在。"
断言 isinstance(
来自对象,
(
火炬.
张量,
火炬.
脚本对象,
),
)
setattr(到模块,
字段,
来自对象)
如果...否则
属性类型 ==
属性类型.
模块:
断言 isinstance(
从对象,
火炬.
神经网络.
模块)
setattr(到模块,
字段, from_obj)
类 _SubmoduleBase:
_ty: 可选[
字符串]
def type_name(self) 翻译
可选[
字符串
]
返回 self._ty
[文档]
类
解释器模块(
_子模块基类,
火炬.
神经网络.
模块):
"""一个使用 torch.fx.Interpreter 执行而不是常规的模块
GraphModule 使用的代码生成器。这提供了更好的堆栈跟踪信息
并使调试执行更容易。
""
图模块:
可选[
火炬.fx.GraphModule]
def __init__(
self,
graph: 火炬.fx.
图,
ty: 可选[
字符串] =
无,
):
超级().__init__()
self.图 =
图
self._ty = ty
self.graph.拥有模块 = self
self.使用解释器运行 =
使用解释器运行
def 前向(self, *
参数, **kwargs):
断言 self.
图模块 is
不是
无,
没有完成这个解释器模块
如果
不是 is_fx_tracing()
和 (
火炬.
编译器.
动态编译中()
或者
不是 self.
使用解释器运行
):
# Dynamo 无法通过 torch.fx.Interpreter 进行跟踪,因此回退到
# 本例中的 GraphModule 代码生成。
将代码生成的正向操作调整为使用此解释器模块运行,
因此属性访问等操作都在此模块上执行。
返回
类型(self.
图模块).
前向(self, *
参数, **kwargs)
else:
如果 kwargs:
处理 **kwargs。FX 仅原生支持位置参数(通过占位符)。因此,为了传递 kwargs,我们必须将占位符的名称与 kwarg 字典中的键相对应。
# arguments (through placeholders). So in order to pass in
# kwargs, we must correspond the names of the placeholders with
# the keys in the kwarg dict.
参数列表 =
列表(
参数)
关键字参数名称 = self.
参数名称[
长度(
参数列表)
]
参数列表.
扩展(
kwargs[关键字参数名]
为
关键字参数名
在
关键字参数名称
如果
关键字参数名
在 kwargs
)
# 断言传入的 kwargs 与 GraphModule 指定的位置参数完全匹配
# 这应该与 GraphModule 指定的位置参数一致
# guaranteed by the unflattening process.
断言
长度(
关键字名称) ==
长度(kwargs)
断言
长度(
参数列表) ==
长度(self.
参数名)
args = 元组(
参数列表)
返回
火炬.fx.
解释器(self, graph=self.graph).run(
*参数,
启用 IO 处理=
假
)
def 完成(self):
# 我们需要“最终化”,因为 GraphModule 会填充自己的 state_dict
基于图中的 get_attrs 观察结果。因此,我们需要完全
构建图并在生成 GraphModule 之前调用_sink_params。
在 GraphModule。
需要将`graph_module`直接设置在字典上,以避免它被
作为子模块注册。
self.字典[
graph_module] =
火炬.fx.GraphModule(self, self.graph)
self.graph.检查()
缓存参数名称以处理关键字参数(参见 forward())。
self.参数名 =
输入文本为空,请提供需要翻译的文本
为
节点
在 self.graph.
节点:
如果
节点.
操作符 ==
占位符:
self.参数名.append(
节点.
目标)
def 打印可读(
self,
打印输出=
是,
包含步长=
错误,
包含设备=
错误,
彩色=
错误,
):
返回
打印可读的(
self,
解释器模块,
打印输出,
包含步长,
包含设备,
彩色,
)
[文档]类 InterpreterModuleDispatcher(_SubmoduleBase, torch.nn.Module):
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
一个模块,包含一系列与解释器模块相对应的模块
该模块的一系列调用。每次调用该模块都会触发
到下一个 InterpreterModule,并在最后一个之后回绕。
"""
"""
def __init__(self, attrs: set[str], call_modules: list[InterpreterModule]):
super().__init__()
assert call_modules
self._modules = call_modules[0]._modules
for accessor in attrs:
setattr(self, accessor, getattr(call_modules[0], accessor))
self._ty = call_modules[0]._ty
self._call_modules = call_modules
self._num_calls = 0
def forward(self, *args, **kwargs):
call_module = self._call_modules[self._num_calls]
self._num_calls = (self._num_calls + 1) % len(self._call_modules)
try:
return call_module(*args, **kwargs)
except Exception:
self._num_calls = 0
raise
def call_modules(self):
return self._call_modules
def 打印可读的(
self,
print_output=True,
include_stride=False,
include_device=False,
colored=False,
):
outputs = [
mod.print_readable(
打印输出,
包含步长,
包含设备,
颜色
)
对于 mod in self._call_modules
]
return "\n".join(输出列表)
[文档]class 平坦参数适配器(abc.ABC):
"""
将 ``input_spec`` 输入参数适配以与 ``target_spec`` 对齐。
"""
[文档] @abc.abstractmethod
def adapt(
self,
target_spec: pytree.TreeSpec,
input_spec: pytree.TreeSpec,
input_args: list[Any],
metadata: Optional[dict[str, Any]] = None,
def -> list[Any]:
注意:此适配器可能会修改给定的 `input_args_with_path`。
...
类
反扁平化模块(
火炬.
神经网络.
模块):
def __init__(
self,
导出模块:
导出程序,
平铺参数适配器:
可选[
平坦参数适配器] =
无,
):
超级().__init__()
如果
导出模块.
图签名.
向后签名 is
不是
无:
提升 ValueError(
"在 JointExportModule 上未实现解平铺")
完整限定名列表 = [
条目.
完全限定名
为
条目
在
导出模块.
模块调用图]
断言
完全限定名列表[0] ==
请提供需要翻译的文本
导出图 =
深拷贝(
导出模块.graph)
self.graph_signature = 深拷贝(
导出模块.
图签名)
self.图 =
火炬.fx.
图()
self.graph.拥有模块 = self
self.模块调用图 =
深拷贝(
导出模块.
模块调用图)
self.平坦参数适配器 =
平坦参数适配器
self.元数据 =
导出模块.
图模块.
元数据
self.元数据[
未展平的模块] = self
# 标志位,表示是否已调整参数。
self.已调整 =
假
self.使用解释器运行 =
使用解释器运行
原地缓冲区突变(
导出图, self.
图签名)
self.ivals = _IVals()
# 记录任何使用的中间值 x,以及使用它的模块,
# 并生成读取相应属性的指令
已见模块,
已见属性 =
_子模块概要(
导出图, self)
对于每个读取的中间值 x,找到创建它的模块
# 生成更新相应属性的指令
最后,初始化所有这些属性
self.ival.
创建(
已见模块.
值(), self)
# 将对应于 HOPs 图参数的属性移动
# 从导出程序到未展平的子模块
_复制图属性(
导出模块.
_图模块, self,
已见属性)
self.范围约束 =
导出模块.
范围约束
self.等价约束:
列表 =
输入文本为空,请提供需要翻译的文本
# 别名/未使用参数或缓冲区问题:
# 在严格模式导出中,Dynamo 导出将去重别名张量,
# 并忽略未使用的张量。对于别名,当某些别名未使用时,
# 我们无法将占位符节点与正确的 FQN 匹配,这会导致问题。
这可能导致图签名具有错误的 FQN 目标,
并且下游问题中参数被分配到错误的目标属性,
与未展开模块中的相关占位符节点不匹配。
要解决这个问题,我们恢复(_assign_attr)所有别名/未使用的张量,
# 状态字典作为模块属性,但仅保留用于图的前向传递(_sink_params)的使用的张量。
# 导出模块
状态字典 =
导出模块.
状态字典
分配的参数:
设置[
字符串] =
设置()
跟踪未使用的参数
id_to_param: 字典[int,
火炬.
神经网络.
参数] = {}
处理权重共享
为
名称
在 self.
图签名.
参数:
此循环添加已使用的参数
参数 =
状态字典[
名称]
如果 id(
参数)
不是
在 id_to_param:
id_to_param[id(参数)] =
火炬.
神经网络.
参数(
参数.
克隆(),
需要梯度=
参数.requires_grad
)
_分配属性(
id_to_param[id(参数
)],
self,
名称,
属性类型=_AttrKind.
参数,
)
分配参数.
添加(
名称)
非持久缓冲区 =
设置(self.
图签名.
非持久缓冲区)
分配缓冲区:
设置[
字符串] =
设置()
# 跟踪未使用缓冲区
id_to_buffer: 字典[int,
元组[
火炬.
神经网络.
参数,
布尔]] = {}
为
名称
在 self.
图签名.
缓冲区:
# 这个循环添加已使用的缓冲区
如果
名称
在
非持久缓冲区:
持久 =
假
缓冲区 =
导出模块.
常量[
名称]
else:
持久 =
真实
缓冲区 =
状态字典[
名称]
如果 id(
缓冲区)
不是
在 id_to_buffer:
id_to_buffer[id(缓冲区)] = (
缓冲区.
克隆(),
持久性)
_assign_attr(
id_to_buffer[id(缓冲区)][0
]
self,
名称,
属性类型=
_属性类型.
缓冲区,
持久性=
持久性,
)
分配的缓冲区.
添加(
名称)
# 恢复已别名化/未使用的参数和缓冲区
# 这些出现在状态字典中但不在图签名中
为
名称,
张量
在
状态字典.
项目():
如果
名称
在
分配的参数
或者
名称
在
分配的缓冲区:
# 已分配
continue
是否为缓冲区 =
假
如果 id(
张量)
在 id_to_buffer
或者
不是 isinstance(
张量,
火炬.
神经网络.
参数
): # 代理缓冲区
是缓冲区 =
真实
如果
是缓冲区:
如果 (
id(张量)
不是
在
id 转缓冲区
): 这完全未使用(不共享权重)
id_to_buffer[id(张量)] = (
张量,
是,
) # 分配以尊重原始模型
_分配属性(
id_to_buffer[id(张量)][0
]
self,
名称,
属性类型=
属性类型.
缓冲区,
持久性=
是,
)
else:
如果 id(
张量)
不是
在
id 转参数:
# 未使用
id 转参数[id(
张量)] =
张量
_分配属性(
id_to_param[id(张量
)],
self,
名称,
属性类型=_AttrKind.
参数,
)
使用 id 映射,以避免重复克隆别名常量
id_to_const: 字典[int,
联合[
火炬.
张量,
火炬._C.
脚本对象]] = {}
为
完全限定名,
常量
在
导出模块.
常量.
项目():
如果 id(
恒定)
不是
在
ID 转常量:
如果 isinstance(
恒定,
火炬.
张量):
常量 =
恒定.
克隆()
ID 转常量[id(
恒定)] =
恒量
_恒量 =
id_to 恒量[id(
恒定)]
_赋值属性(
恒定,
self,
完全限定名,
属性类型=
_属性类型.
常量,
)
处理指向相同张量的参数/缓冲区
对象 ID -> (节点名称,目标名称) 列表
常量映射:
字典[int,
列表[
元组[
字符串,
字符串]]] =
默认字典(
列表)
常量目标:
设置[
字符串] =
设置()
def 添加到常量映射(
对象 ID,
节点名称,
目标名称):
名称列表 =
常量映射[obj_id]
name_list.append((node_name, target_name))
添加参数缓冲区:
设置[
字符串] =
设置()
跟踪别名/未使用参数、缓冲区
为 s
在 self.
图签名.
输入规范:
如果 s.
仁慈 ==
输入类型.
参数
或者 (
s.仁慈 ==
输入类型.
缓冲区
和 s.
持久
):
断言
有属性(s.arg,
名称)
断言 isinstance(s.
目标,
字符串)
添加到常量映射(
id(导出模块.
状态字典[s.
目标
)] s.arg.
名称, s.
目标
)
常量目标.
添加(s.
目标)
添加参数缓冲区.
添加(s.
目标)
如果...否则 (
(s.仁慈 ==
输入类型.
缓冲区
和
不是 s.
持久性)
或者 s.
仁慈 ==
输入类型.
常量张量
或者 s.
仁慈 ==
输入类型.
自定义对象
):
断言
有属性(s.arg,
名称)
断言 isinstance(s.
目标,
字符串)
添加到常量映射(
id(导出模块.
常量[s.
目标
)] s.arg.
名称, s.
目标
)
常量目标.
添加(s.
目标)
# 添加已别名且未出现在图签名中的常量
为
常量名称, const
在
导出模块.
常量.
项目():
如果
常量名
不是
在
常量目标:
断言 (
id(常量)
在
常量映射
), 常量应被别名化或出现在图签名中
ph_name, _ = consts_map[id(const)][0]
添加到常量映射(id(
常量),
变量名,
常量名)
添加参数缓冲区.
添加(s.
目标)
# 添加在图签名中未出现的别名/未使用参数和缓冲区
为
完全限定名,
张量
在
导出模块.
状态字典.
项目():
如果
完全限定名
不是
在
添加的参数缓冲区:
如果 id(
张量)
不是
在
常量映射:
完全未使用(无权重共享),忽略。
此权重未出现在图模块中,
# 不会引起完全限定名称分配问题
continue
变量名, _ =
常量映射[id(
张量)][0]
添加到常量映射(id(
张量), ph_name,
完全限定名)
节点名称 -> 可能的目标列表
输入到状态:
字典[
字符串,
列表[
字符串]] = {}
为
节点目标
在
常量映射.
值():
目标 = [t[1]
为 t
在
节点目标]
为 n, _
在
节点目标:
输入到状态[n] =
目标
_汇出参数(self,
输入到状态,
[]
重定向调用索引 =
_去重模块(
已见模块.
值())
FQN 列表 = [
完全限定名
为
完全限定名
在
完整限定名列表
如果
完全限定名
不是
在
重定向调用索引]
self._调度模块(
重定向调用索引,
常量目标)
完全限定名列表 = [
完全限定名
为
完全限定名
在
完全限定名列表
如果
“@”
不是
在
完全限定名]
缓存,这样我们就不必每次都进行计算。
注意:这需要与占位符保持同步,但我们目前没有保证这一点的办法。
# self.graph,但我们目前没有保证这一点的办法。
self.输入占位符 = [
节点
为
节点
在 self.graph.
节点
如果
节点.
操作符 ==
占位符
]
self.检查输入约束 =
真实
# TODO(zhxchen17) 我们可以提前注册模块,而不是稍后重新排序。
完全限定名称顺序 = {
完全限定名: i
为 i,
完全限定名
在
列举(
完全限定名称列表)}
在旧版信息检索中,我们可能会在元数据中缺少一些模块。
为
名称, _
在 self.
命名模块(
删除重复项=
错误):
如果
名称
不是
在
全局订单:
全局订单[
名称] =
长度(
全局订单)
重新排序子模块(self, fqn_order)
self.graph.检查()
def 打印图(self):
为
完全限定名,
修饰
在 self.
命名模块():
打印(
完全限定名 +
“:”)
如果
有属性(
模块,
"图")
和 isinstance(
模块.graph,
火炬.fx.
图):
打印(
模块.graph)
def _适配扁平参数(self,
平坦参数, in_spec):
签名 = self.
模块调用图[0].
签名
如果
在规范中 ==
签名.
在规范中:
返回
平坦参数
如果 self.
平坦参数适配器 is
无:
提升
类型错误(
"未指定平坦参数适配器。"
"您确定您使用的是正确的参数调用这个吗?"
)
else:
平坦参数 = self.flat_args_adapter.
适配(
目标规范=
签名.in_spec,
输入规范=in_spec,
输入参数=
平坦参数,
元数据=self.
元数据,
)
如果
长度(
平坦参数) !=
签名.
在规范中.
叶子数量:
提升
类型错误(
f平铺参数适配失败,参数数量不匹配
f"适配:"{
长度(
平坦参数)}
输入文本翻译为简体中文为:\n"
f"导出模块:"{
签名.
在规范中.num_leaves}"
)
返回
平坦参数
def process_forward_inputs(self, *参数, **kwargs):
签名 = self.
模块调用图[0].
签名
重新排序的参数 =
重新排序参数(kwargs,
签名.
在规范中)
带路径的扁平参数,
在规范中 =
py 树.
带路径的树扁平化(
(参数,
重新排序的参数)
)
平坦参数 = [x[1]
为 x
在
带路径的扁平参数]
如果 is_fx_tracing():
返回
平坦参数
如果
在规范中 !=
签名.
在规范中:
如果
不是 self.
适应的:
打印(
输入的 treespec 与导出模块的不匹配:
输入文本翻译为简体中文为:
\n"
f输入的 treespec:{
在规范中}
.,
f导出的模块 treespec:{
签名.
在规范中}",
)
打印(
"适配平面参数以匹配导出模块的 treespec")
平坦参数 = self.
_适配平面参数(
平坦参数,
在规范中)
self.已调整 =
真实
如果 self.
检查输入约束:
# 在此处导入以避免不幸的循环依赖。
# TODO(suo): 解决这个问题。
from torch._export.utils 导入
检查图输入约束
如果 self.
已调整 is
是:
# TODO(suo): FlatArgsAdapter 返回一个平铺参数的列表,
# which we don't have keypaths for. For now, just create a dummy
# keypath to associate with the arg.
新的平坦参数带路径 = [ # type: ignore[var-annotated]
((序列键(
索引=0),
获取属性键(
名称=
"未知位置")), arg)
为
参数
在
平坦参数
]
else:
带有路径的新 flat_args =
带有路径的 flat_args
# 类型:忽略[赋值]
检查图输入约束(
self.输入占位符,
带有路径的新 flat_args, self.
范围约束
)
返回
平坦参数
def 前向(self, *
参数, **kwargs):
平坦参数 =
火炬._dynamo.
禁用(self.
处理前向输入)(*
参数, **kwargs)
签名 = self.
模块调用图[0].
签名
如果 is_fx_tracing():
返回值 =
火炬.fx.
解释器(self, graph=self.graph).run(
*平坦参数,
启用 IO 处理=
假
)
# 对于标量返回值,fx.Graph 将其包装在元组中
如果 isinstance(
返回值,
元组)
和
长度(
返回值) == 1:
返回
返回值[0]
返回
返回值
如果
火炬.
编译器.
动态编译中()
和
不是 self.
使用解释器运行:
树输出 =
火炬.fx.GraphModule(self, self.graph)(*
平坦参数)
else:
树输出 =
火炬.fx.
解释器(self, graph=self.graph).run(
*平坦参数,
启用 IO 处理=
假
)
返回
py 树.
树形展开(
树出,
签名.
输出规范)
def _分发模块(self,
重定向调用索引,
常量目标):
对于调用签名被保留的模块,替换为
多个模块对应对该模块的多次调用
使用单个调度模块来跟踪调用哪个模块。
""
对于每个模块调用签名保留的 FQN
将完全限定名(fqn)映射到被调用的模块列表
被调用模块 =
默认字典(
列表)
为
条目
在 self.
模块调用图:
如果
条目.
完全限定名
和
条目.
签名:
# 一些模块被移除,并且它们的完全限定名被重定向到其他
# 完全限定名在去重过程中
完全限定名 =
条目.
完全限定名
修饰 =
获取属性(self,
重定向调用索引.
获取(
完全限定名,
完全限定名))
基础,
索引 =
完全限定名.
分割(
“@”)
如果
“@”
在
完全限定名
否则 [
完全限定名, "0"]
被调用模块[
基础].append((int(
索引),
模块))
属性映射 =
默认字典(
设置)
为
目标
在
常量目标:
如果 "."
在
目标:
原始全限定名,
名称 =
目标.
右分割(
“。”, 1)
属性映射[
原始全限定名].
添加(
名称)
else:
attrs_map[输入文本翻译为简体中文为:""].
添加(
目标)
将多个调用模块替换为单个调度模块
为 orig_fqn,
索引调用模块
在
被调用模块.
项目():
调用模块 = [
修饰
为 _,
修饰
在
排序(
索引调用模块)]
如果
长度(
调用模块) > 1:
为 i
在
范围(
长度(
调用模块)):
完全限定名 =
_调用名称(
原始全限定名称, i + 1)
如果
完全限定名
不是
在
重定向调用索引:
*前缀,
名称 =
完全限定名.
分割(
“。”)
通过属性列表获取属性(self,
前缀).
模块.
流行(
名称)
self.设置子模块(
原始全限定名,
解释器模块调度器(attrs_map[orig_fqn
]
调用模块),
)
在调用模块中省略调用索引,因为它们在调度器模块中自动跟踪
在调用模块中省略调用索引,因为它们在调度器模块中自动跟踪
def 省略调用索引(
前缀, graph):
为
节点
在 graph.
节点:
如果
节点.
操作符 ==
调用模块:
完全限定名 =
节点.
目标.
分割(
@
)0]
路径 = f"{
前缀}.{
完全限定名}"
如果
前缀
否则
完全限定名
如果
路径
在
被调用模块:
节点.
目标 =
完全限定名
为
完全限定名,
修饰
在 self.
命名模块(
删除重复项=
错误):
如果
有属性(
模块,
"图"):
省略调用索引(
完全限定名,
模块.graph)
如果...否则
有属性(
模块,
"_调用模块"):
为
模_
在
模块.
调用模块:
断言
有属性(
模块_,
"图")
省略调用索引(
完全限定名,
模块_.graph)
def 打印可读(
self,
打印输出=
是,
包含步长=
错误,
包含设备=
错误,
彩色=
错误,
):
返回
打印可读(
self,
非扁平化模块,
打印输出,
包含步长,
包含设备,
彩色,
)
[文档]def unflatten(
模块:ExportedProgram, flat_args_adapter:Optional[FlatArgsAdapter] = None
) -> UnflattenedModule:
将导出的 ExportedProgram 展开,生成一个与原始急切模块具有相同模块层次结构的模块。如果您尝试使用:mod:`torch.export`与期望模块层次结构而不是:mod:`torch.export`通常生成的扁平图的其他系统,这可能很有用。
这可以用于您尝试使用:mod:`torch.export`与期望模块层次结构而不是:mod:`torch.export`通常生成的扁平图的其他系统。
这可以用于您尝试使用:mod:`torch.export`与期望模块层次结构而不是:mod:`torch.export`通常生成的扁平图的其他系统。
这可以用于您尝试使用:mod:`torch.export`与期望模块层次结构而不是:mod:`torch.export`通常生成的扁平图的其他系统。
.. 注意:: 未展平的模块的 args/kwargs 不一定匹配,因此进行模块交换(例如::code:`self.submod = new_mod`)不一定有效。如果您需要替换模块,您需要设置:code:`preserve_module_call_signature`参数
的 eager 模块,所以进行模块交换(例如::code:`self.submod = new_mod`)不一定有效。如果您需要替换模块,您需要设置:code:`preserve_module_call_signature`参数
的参数。如果您需要替换模块,您需要设置:code:`preserve_module_call_signature`参数
的参数。如果您需要替换模块,您需要设置:code:`preserve_module_call_signature`参数
`torch.export.export`.
Args:
module (ExportedProgram): 要展开的 ExportedProgram。
flat_args_adapter (Optional[FlatArgsAdapter]): 如果输入的 TreeSpec 与导出模块不匹配,则适配扁平参数。
返回:
一个实例:class:`UnflattenedModule`,它具有与原始 eager 模块预导出相同的模块层次结构。
它具有与原始 eager 模块预导出相同的模块层次结构。
"""
module = _remove_effect_tokens(module)
return UnflattenedModule(module, flat_args_adapter)
def _inplace_buffer_mutations(
graph: 火炬.fx.
图,
图签名:
导出图签名,
) 翻译
无:
"""将缓冲区突变从其函数化形式转换为复制
图中的节点。
功能化表示通过传递缓冲区作为输入和输出来实现缓冲区突变。例如,急切代码:
def forward(self, x):
self.buffer += x
返回 x * x
将成为一个看起来像这样的图:
def forward(self, buffer, x):
mutated_buffer = aten.add(buffer, x)
mul = aten.mul(x, x)
返回(已修改的缓冲区,乘数)
我们希望原地将其修改成类似原始急切代码的样子:
def forward(self, buffer, x):
mutated_buffer = aten.add(buffer, x)
buffer.copy_(mutated_buffer)
mul = aten.mul(x, x)
return (mul,)
""
输出节点 =
下一(
迭代(
反转(graph.
节点)))
断言
输出节点.
操作符 ==
输出
和
长度(
输出节点.
参数) == 1
return_args = 输出节点.
参数[0]
节点转缓冲区 =
图签名.
待变更的缓冲区
变异 =
返回参数
[
长度(
节点转缓冲区)]
缓冲区到输入 = {v: k
为 k, v
在
图签名.
输入到缓冲区.
项目()}
输入名称到节点 = {
节点.
名称:
节点
为
节点
在 graph.
节点
如果
节点.
操作符 ==
占位符
}
为
变异
在
变异集:
缓冲区名称 =
节点到缓冲区的突变[
突变.
名称]
输入名称 =
缓冲区到输入[
缓冲区名称]
输入节点 =
输入名称到节点[
输入名称]
与 graph.
插入之后(
突变):
新节点 = graph.
创建节点(
调用函数,
火炬.
操作.aten.
复制_, (
输入节点,
变异)
)
为 k, v
在
变异.
元数据.
项目():
新节点.
元数据[k] = v
将所有之前功能正常的变异替换为我们的 copy_输出。
突变.
替换所有引用(
新节点, lambda x: x is
不是
新节点)
# 从图输出中移除已突变的缓冲区,因为我们不再需要
# 通过它 anymore。我们不需要处理输入,这些输入
# 将由 _sink_params 处理。
用户输出 =
元组(
返回参数[
长度(
将突变节点转换为缓冲区)
,
)
输出节点.args = ((
用户输出),)
def _is_prefix(候选人,
目标):
检查`candidate`是否是`target`的前缀。
返回
长度(
候选人) <
长度(
目标)
和
目标
[
长度(
候选人)] ==
候选人
def 计算访问器(
父全限定名:
字符串,
子全限定名:
字符串)
翻译
字符串:
如果
父全限定名 ==
输入文本翻译为简体中文为:"":
正确处理根模块。
返回
子全限定名
父分割 =
父全限定名.
分割(
“。”)
子模块分割 =
子模块全限定名.
分割(
“。”)
# TODO: 支持通过内联子模块来实现跳过连接。
如果
子模块分割
[
长度(
父级分割)] !=
父级分割:
提升
运行时错误(
f"子模块 '"{
子全限定名}
'不是父模块的子模块'{
'parent_fqn'}
'。"'
'目前不支持此功能。'
请尝试直接将子模块附加到父模块。
)
返回
“。”.
连接(
子分割[
长度(
父分割)
])
def _检查图等价性(x:
火炬.
神经网络.
模块, y:
火炬.
神经网络.
模块):
def 图导出(graph:
火炬.fx.
图)
翻译
字符串:
返回 =
输入文本为空,请提供需要翻译的文本
节点索引:
字典[int, int] = {}
def 参数导出(arg)
翻译
字符串:
如果 isinstance(arg,
火炬.fx.
节点):
返回
百分号 +
字符串(
节点索引[id(arg
)]])
返回
字符串(arg)
为 i,
节点
在
列举(graph.
节点):
参数转储 = [
字符串(arg)
为
参数
在
py 树.tree_map(
参数转储,
节点.
参数)]
参数转储 += [
f"{键}={
值}"
为
键,
值
在
py 树.tree_map(
arg_输出,
节点.kwargs).
项目()
]
目标 =
节点.
目标
如果
节点.
操作符
在 (
调用函数,
获取属性)
否则
请提供需要翻译的文本
返回.append(f"{i}: {
节点.
操作}[{
目标}
]{
“,”.
连接(
参数转储)})")
节点索引[id(
节点)] = i
返回 "
输入文本翻译为简体中文为:\n".
连接(
返回)
断言 isinstance(x.graph,
火炬.fx.
图)
断言 isinstance(y.graph,
火炬.fx.
图)
返回
图转储(x.graph) ==
图形导出(y.graph)
def 添加规范(gm:
火炬.
神经网络.
模块,
规格)
翻译
字符串:
i = 0
当
有属性(gm, f
_spec_{i}"):
i += 1
名称 = f
_spec_{i}"
setattr(gm, 名称,
规格)
返回
名称
def _generate_flatten(gm: 火炬.fx.GraphModule,
节点)
翻译
火炬.fx.
节点:
展平 = gm.graph.
调用函数(
py 树.
树形展平, (
节点,))
getitem_0 = gm.graph.调用函数(
操作符.
获取项, (
展平, 0))
返回 getitem_0
def _generate_flatten_spec(
gm: 联合[
火炬.fx.GraphModule, InterpreterModule,
反扁平化模块
]
节点,
规范
) 翻译
火炬.fx.
节点:
名称 = _add_spec(gm,
规格)
规范节点 = gm.graph.
获取属性(
名称)
返回 gm.graph.
调用函数(fx_pytree.
树扁平化规范, (
节点,
规范节点))
def 生成非扁平化(
gm: 联合[
火炬.fx.GraphModule,
解释器模块,
反扁平化模块
]
节点,
规范
) 翻译
火炬.fx.
节点:
名称 =
添加规范(gm,
规格)
规范节点 = gm.graph.
获取属性(
名称)
返回 gm.graph.
调用函数(
py 树.
树形展开, (
节点,
规范节点))
def _获取子模块(
模块:
火炬.
神经网络.
模块,
目标:
字符串):
*前缀,
字段 =
目标.
分割(
“。”)
为
项目
在
前缀:
子模块 = getattr(
模块,
项目,
无)
如果
子模块 is
无:
返回
无
如果
不是 isinstance(
子模块,
火炬.
神经网络.
模块):
返回
无
修饰 =
子模块
返回 getattr(
模块,
字段,
无)
def _添加子模块(
模块:
火炬.
神经网络.
模块,
目标:
字符串,
要添加的模块:
火炬.
神经网络.
模块,
创建模块:
可选[
可调用[[
字符串
]
火炬.
神经网络.
模块]] =
无,
):
*前缀,
字段 =
目标.
分割(
“。”)
为 i,
项目
在
列举(
前缀):
子模块 = getattr(
模块,
项目,
无)
如果
子模块 is
无:
如果
创建模块 is
不是
无:
子模块 =
创建模块(
“。”.
连接(
前缀
[ i + 1]))
else:
子模块 =
火炬.
神经网络.
模块()
setattr(模块,
项目,
子模块)
如果
不是 isinstance(
子模块,
火炬.
神经网络.
模块):
返回
假
修饰 =
子模块
模块.
添加模块(
字段,
要添加的模块)
def _调用名(
基础:
字符串, n: int)
翻译
字符串:
# 给定 n >= 0,生成对子模块 `base` 的调用名,形式为
`base`, `base@1`, `base@2` 等。
返回
基础
如果 n == 1
否则 f"{
基础}@{n - 1}"
def _is_call_name(call_name: 字符串,
基础:
字符串)
翻译
布尔:
# 当 call_name = _call_name(base, n) 且 n >= 0 时识别。
返回
正则表达式.
匹配(
正则表达式.
转义(
基础) + r
"(@\\d+)?$",
调用名称) is
不是
无
类
模块框架:
def __init__(
self,
平面图:
火炬.fx.
图,
节点:
元组[
火炬.fx.
节点, ...
]
已见节点,
已见模块,
已见属性,
创建的模块,
父节点,
模块栈:
列表[
元组[
字符串,
可选[
字符串
] int]],
模块 ID,
模块调用图:
字典[
字符串,
模块调用签名
]
模块:
可选[
联合[
火炬.fx.GraphModule,
反扁平化模块]] =
无,
):
self.平面图 =
平面图
self.节点 =
节点
self.已见节点 =
已见节点
self.已见模块 =
已见模块
self.已见属性 =
已见属性
self.已创建模块 =
已创建模块
self.父级 =
父级
self.模块堆栈 =
模块堆栈
self.模块 ID =
模块 ID
self.模块调用图 =
模块调用图
self.详细 =
假
self.完全限定名,
为用户提供沉浸式翻译体验, num_calls = self.module_stack[-1]
# generate call name for self.fqn
self.child_fqn = 调用名称(self.
完全限定名,
呼叫次数 + 1)
self.模块:
联合[
火炬.fx.GraphModule,
反扁平化模块,
解释器模块]
如果
模块 is
不是
无:
self.模块 =
模块
self.ivals = 模块.ivals
如果
有属性(
模块,
ivals)
否则 {} # type: ignore[var-annotated]
else:
self.模块 = self.
创建的模块.
获取(
self.完全限定名,
解释器模块(
火炬.fx.
图(),
为用户提供沉浸式翻译体验=
为用户提供沉浸式翻译体验),
)
self.ivals = 父节点.ivals
self.图 = self.
模块.
图
平面图中的节点到本图节点的映射。
self.节点映射:
字典[
火炬.fx.
节点,
火炬.fx.
节点] = {}
self.节点到占位符 = {}
self.父调用模块:
可选[
火炬.fx.
节点] =
无
如果
父级 is
不是
无:
访问器 =
_计算访问器(
父节点.
完全限定名, self.
子全称)
def 创建模块(
完全限定名):
路径 = f"{
父节点.
完全限定名}.{
完全限定名}"
如果
父节点.
完全限定名
否则
完全限定名
如果
路径
在 self.
已创建的模块:
返回 self.
已创建的模块[
路径]
子模块 =
解释器模块(
火炬.fx.
图(),
他=
他)
self.创建模块[
路径] =
子模块
返回
子模块
_添加子模块(
父节点.
模块,
访问器, self.
模块,
创建模块)
self.父调用模块 =
父节点.graph.
调用模块(
访问器)
如果 self.
已见模块[self.
模块 ID
]
基础模块框架 = self.
已见模块[self.
模块 ID
]
[0]
self.模块.
模块 =
基础模块框架.
模块.
模块
self.已见模块[self.
模块 ID].append(
子模块条目(
父全限定名=self.
父节点.
完全限定名,
父模块=self.
父节点.
模块,
父调用模块=self.
父调用模块,
完全限定名=self.
完全限定名,
call_idx=num_calls + 1,
模块=self.
模块,
)
)
签名 =
模块调用图.
获取(self.
子全称)
如果
签名 is
不是
无
和 self.
父级 is
不是
无:
断言
签名.
在规范中.
子数量 == 2
参数规范 =
签名.
在规范中.
子规格[0]
kwargs 规范 =
签名.
在规范中.
子规格[1]
断言
参数规范.
上下文 is
无
断言
kwargs 规范.
上下文 is
不是
无
与 self.graph.
插入之后(
无):
参数节点 = [
self.graph.占位符(f
"_位置参数_"{
索引}")
为
索引
在
范围(
参数规范.
子女数量)
]
关键字参数节点 = {}
为
名称
在
关键字参数规范.
上下文:
关键字参数节点[
名称] = self.graph.
占位符(
名称)
平铺参数 =
生成扁平规范(
self.模块,
(元组(
参数节点),
关键字参数节点),
签名.
在规范中,
)
为
索引,
参数
在
列举(
签名.
输入):
平坦参数节点 = self.graph.
创建节点(
操作=
调用函数,
目标=
操作符.
获取项,
参数=(
平铺参数,
索引),
名称=(
arg.名称
如果
不是 isinstance(arg,
恒定参数)
否则 f
_常量_{
索引}"
),
)
如果 isinstance(arg,
恒定参数):
continue
如果 arg.
名称
在 self.
已见节点:
平坦参数节点.
元数据 =
复制.
复制(self.
已看到的节点[arg.
名称].
元数据)
self.节点到占位符[
self.已看到节点[arg.
名称]
] = 平坦的节点
与 self.
父节点.graph.
插入前(self.
父调用模块):
输入节点:
列表[
可选[
火炬.fx.
节点]] =
输入文本为空,请提供需要翻译的文本
为
输入
在
签名.
输入:
如果 isinstance(
输入,
恒定参数):
输入节点.append(
输入.
值) # type: ignore[arg-type]
如果...否则
输入.
名称
不是
在 self.
已见节点:
输入节点.append(
无)
else:
断言 isinstance(
输入,
(
张量参数,
符号整型参数,
符号布尔参数,
符号浮点参数,
),
)
输入节点.append(
self.父节点.
重新映射输入(self.
已见节点[
输入.
名称])
)
输入节点 =
生成未展平(
self.父节点.
模块,
输入节点,
签名.
在规范中,
)
参数节点 = self.
父节点.graph.
调用函数(
操作符.
获取项, (
输入节点, 0)
)
关键字参数节点 = self.
父节点.graph.
调用函数(
操作符.
获取项, (
输入节点, 1)
)
参数节点 = [
self.父节点.graph.
调用函数(
操作符.
获取项, (
节点参数, i))
为 i
在
范围(
参数规范.
子节点数量)
]
关键字节点 = {
k: self.父节点.graph.
调用函数(
操作符.
获取项, (
kwargs_node
参数节点, k)
)
为 k
在
kwargs 规范.
上下文
}
断言 self.
父调用模块 is
不是
无
self.父调用模块.args =
元组(
端节点)
self.父调用模块.kwargs =
关键字参数节点
# 类型:忽略[赋值]
def 添加占位符(self, x):
断言 self.
完全限定名 !=
输入文本翻译为简体中文为:"", f
无法添加占位符{x}
到根模块
断言 x.
图 is self.flat_graph
# x 不在子图中,为子图创建一个新的占位符
与 self.graph.
插入之前(
无):
占位节点 = self.graph.
占位符(x.
名称, type_expr=x.
类型)
# 复制所有元字段,即使某些字段可能与此无关
# 占位符节点
placeholder_node.元数据 =
复制.
复制(x.
元数据)
self.节点到占位符[x] =
占位符节点
def 复制符号调用函数(self, x):
# 这只存在于我们在扁平导出图中去重 sym_size 节点时
如果设置了 preserve_module_call_signature,我们可能无法传递 sym_size
或者将节点及其下游用户作为子模块调用的输入。
为了避免这种情况,我们将这些具有 sym_type 结果的 call_function 节点进行复制。
然而,这仅应针对 sym_type 节点进行 - 张量上的 call_function 节点
# 应该一开始就不进行去重。
args = py 树.
仅树映射(
火炬.fx.
节点, self.
重新映射输入, x.
参数)
kwargs = py 树.
仅树映射(
火炬.fx.
节点, self.
重映射输入, x.kwargs)
节点 = self.graph.
调用函数(x.
目标,
参数, kwargs)
节点.
元数据 =
复制.
复制(x.
元数据)
self.节点映射[x] =
节点
返回
节点
def 重新映射输入(self, x):
断言 x.
图 is self.
平面图
如果 x
在 self.
节点映射:
返回 self.
节点映射[x]
self.打印(f
"重映射输入("{x})")
如果 x
在 self.
节点到占位符:
返回 self.
节点到占位符[x]
如果...否则 (
x.操作符 ==
占位符
或者 self.
模块调用图.
获取(self.
完全限定名) is
无
允许创建占位符,如果我们不保留模块调用签名
):
self.添加占位符(x)
如果 self.
父调用模块 is
不是
无:
重要,需要*前置*输出以匹配我们的方式
插入占位节点
与 self.
父节点.graph.
插入之前(self.
父调用模块):
self.父调用模块.
插入参数(0, self.
父节点.
映射输入(x))
返回 self.
节点到占位符[x]
如果...否则 x.
操作符 ==
调用函数
和 (
x.目标
在 (
火炬.
操作.aten.
符号大小.int,
火炬.
操作.aten.
项目.
默认,
火炬.
操作.aten.
解绑.int,
火炬.
操作.aten.
总和.dim_IntList,
火炬.
操作.aten.
视图.
默认,
火炬.
操作.aten.
差异.
默认,
)
或者 (
有属性(x.
目标, "__module__")
和 x.
目标.__module__ ==
"_操作符")
):
# 导出去重 sym_size 节点,可能需要重新复制
# 如果需要保留模块调用签名
self.复制_sym_call_function(x)
返回 self.
节点映射[x]
如果...否则 self.
模块调用图.
获取(self.
完全限定名) is
不是
无:
# x 是不在占位符中的 ival,因此创建一个
# 获取与属性 __ival__x 对应的 get_attr 节点
返回 self.ivals.
阅读(self.
完全限定名, self.graph, x)
# 类型:忽略[运算符,联合属性]
else:
提升
运行时错误(
f无法在操作类型上运行 remap_input(){x.
操作}
对于节点{x}"
)
def finalize_outputs(self):
self.创建的模块.
流行(self.
完全限定名,
无)
orig_outputs = 输入文本为空,请提供需要翻译的文本
签名 = self.
模块调用图.
获取(self.child_fqn)
如果
签名 is
不是
无
和 self.
父级 is
不是
无:
为
输出
在
签名.
输出:
如果 isinstance(
输出,
(张量参数,
符号整型参数,
符号布尔参数,
符号浮点参数),
):
如果
输出.
名称
在 self.
已见节点:
原始输出.append(self.
已见节点[
输出.
名称])
else:
原始输出.append(
无)
else:
提升
运行时错误(
f不支持输出节点的数据类型:{
输出}"
)
def 获取实际输出节点(
输出):
如果
输出 is
无:
返回
无
已见节点 = self.
已见节点[
输出.
名称]
如果
已见节点
在 self.
节点映射:
返回 self.
节点映射[
已见节点]
如果...否则
已见节点
在 self.
节点到占位符:
返回 self.
节点到占位符[
已见节点]
else:
提升
运行时错误(
f"无法找到输出节点"{
输出}
. 图:{self.graph}"
)
tree_out_node = _generate_unflatten(
self.模块,
元组(
获取实际输出节点(
输出)
为
输出
在
原始输出),
签名.
输出规范,
)
父输出:
可选[
火炬.fx.
节点] = _generate_flatten_spec(
self.父节点.
模块, self.
父调用模块,
签名.
输出规范
)
图输出:
联合[
火炬.fx.
节点,
列表[
火炬.fx.
节点]] =
树的输出节点
else:
图输出 =
输入文本为空,请提供需要翻译的文本
# 遍历我们复制到 self.graph 中的节点。
为
原节点
在 self.
节点映射.
键():
为
用户节点
在
原节点.
用户:
如果
用户节点.
名称
不是
在 self.
已见节点:
# 外部用户节点,需要暴露为输出
原始输出.append(
原始节点)
图输出.append(self.
节点映射[
原始节点])
断开
父输出 = self.
父调用模块
如果
长度(
图输出) == 1:
图输出 =
图输出[0]
断言 isinstance(
图输出, (
列表,
火炬.fx.
节点))
self.graph.输出(
图输出)
# 在父模块中重写输出
如果
父模块输出 is
无:
返回
父模块输出.
元数据[
值] = (
图输出.
元数据.
获取(
值)
如果 isinstance(
图输出,
火炬.fx.
节点)
否则 [o.
元数据.
获取(
值)
为 o
在
图输出]
)
如果
长度(
原始输出) == 1
和
签名 is
无:
self.父节点.
节点映射[
原始输出[0]] =
父输出
else:
为 i,
原始输出
在
列举(
原始输出列表):
如果
原始输出 is
无:
continue
使用代理记录 getitem 访问。
代理输出 =
火炬.fx.
代理(
父输出
)i].
节点
忽略索引
代理输出.
元数据[
值] =
原始输出.
元数据.
获取(
值)
self.父节点.
节点映射[
原始输出] =
代理输出
def 复制节点(self,
节点):
self.打印(
复制中,
节点.
格式化节点())
self.节点映射[
节点] = self.graph.
节点复制(
节点, self.
重新映射输入)
self.已见节点[
节点.
名称] =
节点
def 运行外部(self):
为 i,
节点
在
列举(self.
平面图.
节点):
self.打印(i,
节点.
元数据.
获取(
神经模块栈),
节点.
格式化节点())
# 复制所有图输入
节点索引:
整型 = 0
节点 = self.
节点[
节点索引]
当
节点.
操作符 ==
占位符:
self.复制节点(
节点)
节点索引 += 1
节点 = self.
节点[
节点索引]
self.从中运行(
节点索引)
# 复制图输出
为
节点
在 self.
平面图.
节点:
如果
节点.
操作符 ==
输出:
self.复制节点(
节点)
def 打印(self, *
参数, **kwargs):
如果 self.
详细模式:
打印(*
参数, **kwargs)
def 从运行(self,
节点索引):
模块索引 = 0
遍历图,构建一个新的包含正确子模块的新图
当
节点索引 <
长度(self.
节点):
节点 = self.
节点[
节点索引]
断言
节点.
操作符 !=
占位符
self.打印()
self.打印(
步骤,
节点索引,
节点.
格式节点())
self.打印(self.module_stack)
深度 =
长度(self.module_stack)
如果
节点.
操作符 ==
输出:
如果
深度 == 1:
我们希望处理原始图的输出节点
特别地,由最外层的堆栈帧(在 run_outer 中)执行。
此处跳过最终化。
返回 node_idx
我们已经到达了图的末尾。整理所有现有的栈帧。
self.finalize_outputs()
返回 node_idx
如果
长度(
节点.
元数据.
获取("nn_module_stack", {})) == 0:
提升
运行时错误(f
"无法找到 nn_module_stack 节点"{
节点}")
nn_module_stack = 节点.
元数据["nn_module_stack"]
from torch._export.passes._node_metadata_hook 导入 (
空的 NN 模块堆栈键,
)
如果 (
长度(
nn 模块堆栈) == 1
和
空的 NN 模块堆栈键
在
nn 模块堆栈
):
# 空情况来自节点元数据钩子
模块堆栈 = self.
模块堆栈
else:
节点模块栈 = [
(
路径,
ty 如果
路径
否则
无,
int(k.分割(
@
)-1])
如果
“@”
在 k
否则 0,
)
为 k, (
路径,
他)
在
节点.
元数据[
"nn 模块栈"].
项目()
]
如果
节点模块栈
[
深度] != self.module_stack:
这意味着当前模块已执行完毕
当前节点是新模块的开始。
#
在这种情况下,我们应该最终确定这个模块并返回
# 增加节点计数器。
self.完成输出()
self.打印(
概述, self.
完全限定名)
self.打印(self.graph)
返回
节点索引
断言
节点模块栈 is
不是
无
如果 _is_prefix(self.module_stack, node_module_stack):
这意味着当前节点代表执行一个新
模块。
下一个模块 =
节点模块堆栈[
深度]
self.打印(
"为创建新的堆栈帧",
下一个模块)
从当前节点运行模块大纲的嵌套版本
计数器完成后再从那个点继续
下一个模块键 =
列表(
节点.
元数据[
神经网络模块栈].
键
()
深度]
节点索引 =
模块框架(
self.平面图,
self.节点,
self.已见节点,
self.已见模块,
self.已见属性,
self.创建的模块,
self,
self.模块堆栈 + [
下一个模块
]
下一个模块键.
分割(
@
)0
]
self.模块调用图,
).从运行处(node_idx)
module_idx += 1
continue
# 仅剩的可能性是我们处于正确的栈中
# 将节点复制到该帧的图中并增加节点计数器。
断言
node_module 栈 == self.
模块堆栈
如果
节点.
操作符 ==
获取属性:
# 这必须是一个 HOP 的图参数
self.已见属性[self.
子全称].
添加(
节点.
目标)
self.复制节点(
节点)
节点索引 += 1
@dataclass
类
_子模块条目:
父完全限定名:
字符串
父模块:
火炬.
神经网络.
模块
父调用模块:
火炬.fx.
节点
完全限定名:
字符串
call_idx: 整型
模块:
火炬.
神经网络.
模块
def _子模块概要(
原始图:
火炬.fx.
图,
根模块:
反扁平化模块):
已见节点:
字典[
字符串,
火炬.fx.
节点] = {}
已见模块:
字典[int,
列表[
_子模块条目]] =
默认字典(
列表)
已见属性:
字典[
字符串,
设置[
字符串]] =
默认字典(
设置)
创建的模块:
字典[
字符串,
火炬.
神经网络.
模块] = {}
模块框架(
原始图,
元组(
原始图.
节点),
已见节点,
已见模块,
已见属性,
创建的模块,
无,
[
输入文本翻译为简体中文为:"",
无, 0
)],
输入文本翻译为简体中文为:"",
{
条目.
完全限定名:
条目.
签名
为
条目
在
根模块.
模块调用图
如果
条目.
签名
},
模块=
根模块,
).运行外部()
返回
已见模块,
已见属性
def 重新排序子模块(
父节点:
火炬.
神经网络.
模块, fqn_order:
字典[
字符串, int
]
前缀:
字符串 =
请提供需要翻译的文本
):
# TODO 可通过提前添加子模块进行优化。
如果
前缀 ==
输入文本翻译为简体中文为:"":
为
完全限定名
在
列表(fqn_order.
键
())1
]
如果
_获取子模块(
父节点,
完全限定名) is
无:
_添加子模块(
父节点,
完全限定名,
火炬.
神经网络.
模块())
子代 =
输入文本为空,请提供需要翻译的文本
为
名称,
儿童
在
列表(
父节点.
模块.
项目()):
如果
儿童 is
无:
continue
完全限定名 =
前缀 +
名称
重新排序子模块(
儿童, fqn_order,
前缀=
完全限定名.
分割(
@
)0] +
“。”)
delattr(父节点,
名称)
儿童.append((fqn_order[
完全限定名
]
名称,
儿童))
儿童.
排序(
键=
操作符.itemgetter(0))
为 _,
名称,
儿童
在
儿童:
父节点.
注册模块(
名称,
儿童)
类 _IVals:
""
在图中收集缓冲区变动的中间值,
以及创建和使用它们的模块调用 FQNs。稍后,
在每个与中间值关联的 FQN 中,我们将安装一个相应的属性,以便它可以被更新和读取。
例如:在下面的图中,假设 buf_in 和 buf_out 是缓冲区的输入和输出值。
示例:在以下图中,假设 buf_in 和 buf_out 是缓冲区的输入和输出值。
示例:在以下图中,假设 buf_in 和 buf_out 是缓冲区的输入和输出值。
buf_in = placeholder()
...
ival1 = f0(buf_in, ...) # inside self.n0(...)
...
ival2 = f1(ival1, ...) # inside self.n1(...)
...
buf_out = f2(ival2, ...) # inside self.n2(...)
返回 buf_out, ...
这里 ival1 和 ival2 是内部创建的中间值
分别调用 n0 和 n1,并在调用内部使用
n1 和 n2 分别。
因此,我们的分析将产生 {ival1: {n0, n1}, ival2: {n1, n2}}。
""
def __init__(self):
# ival 节点名称 -> 创建和使用它的全限定名称集合
self.fqns = 默认字典(
设置)
# ival 节点名称 -> 对应属性的张量存储
self.存储 = {}
def 阅读(self,
完全限定名, graph,
节点):
""
读取与给定中间值对应的属性。
""
# 读取 ival x,获取属性__ival__x
与 graph.
插入前(
无):
ival 节点 = graph.
获取属性("__ival__" +
节点.
名称, type_expr=
节点.
类型)
ival 节点.
元数据 =
复制.
复制(
节点.
元数据)
如果
节点.
名称
不是
在 self.
存储:
创建一个匹配 fake 的空张量,使用缓存
确保每次返回相同的张量,使用 ival_name
fake = 节点.
元数据[
值]
self.存储[
节点.
名称] =
火炬.
空的(
欺诈.
形状,
数据类型=
欺诈.
数据类型)
self.fqns[节点.
名称].
添加(
完全限定名)
返回 ival_node
def 更新(self,
完全限定名, graph,
节点):
""
更新对应给定中间值的属性。
""
self.fqns[节点.
名称].
添加(
完全限定名)
# 要更新 ival x,获取属性 __ival__x 并将 x 复制到 __ival__x
与 graph.
插入之后(
节点):
ival_node = graph.获取属性("__ival__" +
节点.
名称, type_expr=
节点.
类型)
ival 节点.
元数据 =
复制.
复制(
节点.
元数据)
与 graph.
插入之后(
ival 节点):
new_ival 节点 = graph.
创建节点(
调用函数,
火炬.
操作.aten.
复制_, (ival_node,
节点)
)
新 ival_node.
元数据 =
复制.
复制(
节点.
元数据)
def 创建(self,
分区,
根模块):
""
更新读取的中间值对应的属性。
最后,初始化所有读取或更新属性的模块中的属性。
对应的中间值。
""
条目 =
[
输入文本翻译为简体中文为:"",
根模块)]
为
共享子模块
在
分区:
为
条目
在
共享子模块:
条目.append((
条目.
完全限定名,
条目.
模块))
图 =
条目.
模块.
图
为
节点
在 graph.
节点:
如果
节点.
名称
在 self.
存储:
self.更新(
条目.
完全限定名, graph,
节点)
通过它读取或更新的 ival 节点名称列表
ivals = 默认字典(
列表)
为
名称, fqns
在 self.fqns.
项目():
为
完全限定名
在
完全限定名:
集合值[
完全限定名].append(
名称)
为
完全限定名,
修饰
在
条目:
为
名称
在
集合值[
完全限定名
]
ival_name = f__ival__{
名称}"
在模块调用 m 中创建名为 x 的 ival
创建属性 m.__ival__x,初始为空
setattr(模块, ival_name, self.
存储[
名称])
def _copy_graph_attrs(
gm: 火炬.fx.GraphModule,
根模块:
反扁平化模块,
已见属性:
字典[
字符串,
设置[
字符串]],
):
为 child_fqn,
名称
在
已见属性.
项目():
模块 = _get_attr(
根模块, child_fqn)
如果 child_fqn
否则
根模块
为
名称
在 names:
val = getattr(gm, 名称)
setattr(模块,
名称, val)
def _去重模块(
分区):
重定向调用索引 = {}
为
共享子模块
在
分区:
为 i,
条目
在
列举(
共享子模块):
child_fqn = _调用名称(
条目.
完全限定名,
条目.call_idx)
目标 =
_计算访问器(
条目.
父全限定名, child_fqn)
去重 =
假
遍历所有已见模块,并在可能的情况下进行去重
为
看过
在 shared_submodules
[i
]
如果
_检查图等价性(
已见.
模块,
条目.
模块):
父级 =
条目.parent_module
# 由于图是等价的,我们可以去重。
# 有两种情况。
如果
已见.
完全限定名 ==
条目.
完全限定名:
当前模块具有与已见模块相同的 fqn。
在这种情况下,我们已生成一个可以被优化掉的调用名称。
因此,我们从层次结构中删除当前模块,并用父图中已见的调用名称替换当前调用名称。
在父图中,我们将当前调用名称替换为已见的调用名称。
*前缀,
名称 =
目标.
分割(
“。”)
通过属性列表获取属性(
父节点,
前缀).
模块.
流行(
名称)
已见子全称 =
_调用名称(
已见.
完全限定名,
已见.call_idx)
已见目标 =
_计算访问器(
条目.
父全限定名,
已看到的全限定名称
)
条目.
父调用模块.
目标 =
已见目标
重定向调用索引[child_fqn] =
已见子全限定名
断开
如果...否则
不是
去重:
# 情况 2:当前模块的全限定名与已见模块不同。
在此情况下,我们将当前模块替换为已看到的模块。
因此,不再有任何东西指向当前模块,
以便可以回收垃圾。
# 注意:我们 *不* 将当前调用名替换为已看到的调用名
在父图中,因为这会丢失关于 fqn 的信息
# 实际上被称作。然而,当前调用名称
当找到另一个具有相同 fqn 的已见模块时,#将被优化掉
所以我们还没有退出循环。
父节点.
设置子模块(
目标,
已见.
模块)
去重 =
真实
返回
重定向调用索引
def _汇出参数(
模块:
火炬.
神经网络.
模块,
输入到状态:
字典[
字符串,
列表[
字符串]],
范围:
列表[
字符串
]
模块 ID 从输入中移除:
可选[
字典[int,
设置[
字符串]]] =
无,
):
将图输入中的参数、缓冲区和常量沉入 get_attr 节点。
导出的模块是纯函数式的,因此它们将参数和缓冲区作为输入传递给图。
缓冲区导入到图中。
为了复制 eager 的语义,我们需要从模块状态中获取它们,通过 get_attr 来实现。
。
模块:GraphModule,可能包含嵌套子模块。
inputs_to_state:将图输入名称映射到状态字典中的相应键。
范围:跟踪我们在模块层次结构中的位置,以便我们可以发出正确的
`getattr(self, "foo.bar")` 等调用
module_id_to_inputs_removed:记录由子模块移除的输入,将模块对象 ID 映射到子模块中占位符节点名称的列表
的模块对象 ID 到子模块中占位符节点名称列表的映射
被删除的。
""
如果
模块 ID 到输入被删除 is
无:
模块 ID 到输入被删除 =
默认字典(
设置)
如果 id(
模块)
在
模块 ID 到输入被删除:
返回 {id(
模块):
模块 ID 到输入已移除[id(
模块)]}
# 我们在这里需要使用 _modules 而不是 named_children(),因为
# 我们明确希望重复的模块在遍历中显示出来。
为
名称,
子模块
在
模块.
模块.
项目():
submod_id_to_inputs_removed = _汇出参数(
角色(
火炬.
神经网络.
模块,
子模块),
输入到状态,
范围 + [
名称
]
模块 ID 输入已移除,
)
为 k, v
在
子模块 ID 输入已移除.
项目():
模块 ID 输入已移除[k].
更新(v)
图 = getattr(
模块,
"图",
无)
如果
图 is
无
或者
长度(graph.
节点) == 0:
不是所有模块都定义了图,如果它们是空模块且没有操作(如 ParameterList)
返回
模块 ID 到输入已移除
断言 isinstance(graph,
火炬.fx.
图)
输入 =
列表(
过滤(lambda n: n.
操作符 ==
占位符, graph.
节点))
最后的输入 =
输入[-1]
# 也从 call_module 节点中删除
call_module 节点 =
过滤(lambda n: n.
操作符 ==
调用模块, graph.
节点)
为
节点
在
调用模块节点:
子模块 = _get_attr(
模块,
节点.
目标)
# 从调用模块节点的参数中移除占位符,仅在我们已经
# 删除对应 _sink_params() 调用的占位符节点后
如果
子模块 is
不是
无
和 id(
子模块)
在
移除到模块_id 的输入:
节点.args =
元组(
过滤(
lambda n: n.名称
不是
在
模块 ID 移除输入[id(
子模块
)],
节点.
参数,
)
)
# 过滤掉当前作用域对应的 inputs_to_state 输入
作用域的 inputs_to_state 输入:
字典[
火炬.fx.
节点,
列表[
字符串]] = {}
为
节点
在
输入:
如果
节点.
名称
不是
在
输入到状态:
continue
状态名称 =
无
为 sn
在
输入到状态[
节点.
名称
]
sn_split = sn.分割(
“。”)
如果 sn_split
[
长度(
范围)] == [x.
分割(
@
)0]
为 x
在
范围
]
状态名称 = sn_split
断开
# 如果作用域名称与状态名称不匹配,那么
# 必须存在多个作用域指向相同的状态名称,
意味着一些模块是共享的。在这种情况下,我们可以简单地跳过
更新当前节点,因为稍后迭代中会有另一个更新
当作用域之间有唯一匹配时,请注意处理此输入节点
确保总是发生这种情况,我们应该
强制执行不变量,即未展开的图中的所有占位符节点都不出现在 inputs_to_state 字典中,这意味着所有额外的输入节点都已处理。
图中的所有占位符节点都不出现在 inputs_to_state 字典中,这意味着所有额外的输入节点都已处理。
输入节点都已处理。
如果
州名 is
无:
continue
输入到作用域状态[
节点] =
状态名称
# 记录移除输入的名称以备返回之用。
已移除的输入:
设置[
字符串] =
设置()
为
节点,
状态名称
在
范围状态输入.
项目():
如果
长度(
节点.
用户) > 0:
属性路径 =
状态名称[
长度(
范围)
]
状态属性 =
通过属性列表获取属性(
模块,
属性路径)
断言 isinstance(
状态属性, (
火炬.
张量,
火炬.
脚本对象))
确保新创建的 get_attr 节点放置在最后一个占位符节点之后
与 graph.
插入之后(the_last_input):
新节点 = graph.
创建节点(
获取属性,
“。”.
连接(
属性路径))
节点.
替换所有引用(
新节点,
传播元数据=
是)
graph.删除节点(
节点)
移除输入.
添加(
节点.
名称)
如果 isinstance(
模块,
解释器模块):
模块.
完成()
返回 {id(
模块):
输入移除}