# mypy: 允许未类型化定义
导入
内置函数
导入 contextlib
导入
复制
导入
枚举
导入 functools
导入
检查
导入
关键词
导入
数学
导入
操作系统
导入
正则表达式
导入
打字
导入
警告
from 集合
导入 defaultdict
from collections.abc 导入
迭代器
from contextlib 导入 contextmanager
from dataclasses 导入
数据类
from 打字
导入
任何,
可调用,
直接,
命名元组,
可选,
类型检查
导入
火炬
导入 torch.utils._pytree
是 pytree
from torch._C 导入 _fx_map_arg
是 map_arg, _NodeIter
from . 导入 _pytree
是 fx_pytree
from _兼容性
导入
兼容性
from .不可变集合
导入
不可变字典
from .节点
导入
获取合格名称,
类型表示,
参数,
节点,
目标
全部 = [
Python 代码,
代码生成,
图]
如果
类型检查:
from 符号追踪
导入
跟踪器 # noqa: F401
from .图模块
导入
图模块 # noqa: F401
# 内置函数到其 `typing` 等价映射。
# (PEP585: 请参阅 D68459095 测试计划)
原始类型映射 = {
列表:
输入法.
列表,
# 无需注意:UP006
字典:
输入法.
字典,
# 无需注意:UP006
设置:
输入法.
集合,
# 无需注意:UP006
冻结集合:
输入法.
冻结集合类,
# 无需注意:UP006
元组:
输入法.
元组,
# 无需注意:UP006
}
法律操作 =
字典.fromkeys(
[调用函数,
调用方法,
获取属性,
调用模块,
占位符,
输出]
)
# 函数签名,用于转换生成的代码体(`list[str]`)
# 代码签名
TransformCodeFunc = 可调用[[
列表[
字符串]],
列表[
字符串]]
类
_自定义内建(
命名元组):
“我们添加到每个图的全局中的额外对象。
一些标准库对象的 repr()在未经修改的情况下不是有效的 Python 代码。
这是一个导入。对于这类常见对象,我们将它们打包到全局变量中
每个 FX 图中的全局变量。
""
# 如何从标准库中导入此对象。
导入字符串:
字符串
实际对象,由该导入字符串生成。
对象:
任何
# 禁用变量名组合字典,以便我们可以通过一次查找进行检查
非法名称 = {k:
对象()
为 k
在
关键词.
关键字列表}
_非法名称.
更新(builtins.
字典)
不能覆盖内置名称
_自定义内置函数:
字典[
字符串,
_自定义内置] = {}
def _注册自定义内置(
名称:
字符串,
导入字符串:
字符串,
对象:
任何):
_自定义内置函数[
名称] =
自定义内置(
导入字符串,
对象)
非法名称[
名称] =
对象
注册自定义内置(
"无穷大",
"从 math 导入无穷大",
数学.
无穷)
注册自定义内置函数(
"非数字",
"从 math 导入非数字",
数学.
纳尼)
注册自定义内置函数("NoneType", "NoneType = type(None)",
类型(
无))
_register_custom_builtin(torch, "import torch",
火炬)
注册自定义内置函数(
"设备",
从 torch 导入 device,
火炬.
设备)
注册自定义内置函数(
fx_pytree, "import torch.fx._pytree as fx_pytree", fx_pytree)
_register_custom_builtin("pytree", "import torch.utils._pytree as pytree", py 树)
def _is_magic(x: 字符串)
翻译
布尔:
返回 x.
以...开头("__")
和 x.
以...结尾("__")
def _snake_case(s: 字符串)
翻译
字符串:
""
将给定的字符串 ``s`` 转换为 Python 风格的变量名
示例:
``mod.snake_case`` -> ``mod.snake_case``
``mod.pascalCase``-> ``mod.pascal_case``
``mod.ALL_CAPS`` -> ``mod.all_caps``
""
返回 _snake_case_sub(s).
小写()
替换小写字母后跟大写字母的 occurrences
_snake_case_sub = functools.偏函数(
正则表达式.
编译(r"(?<=[a-z])([A-Z])").
子, r"_\1")
# 找出不能在 Python 标识符中出现的字符
_非法字符正则表达式 =
正则表达式.
编译("[^0-9a-zA-Z_]+")
变量名称的合并检查:
1) 检查名称不为空
2) 检查名称的第一个字符不是数字
3) 检查名称没有非法字符(_illegal_char_regex)
# 3) 移除(如果存在)数字后缀
_name_regex = 正则表达式.
编译(r"^([a-zA-Z_][0-9a-zA-Z_]*?)(?:_(\d+))?$")
# 以 torch 开头但不以 torch._dynamo.或 torch._inductor.开头
_torch 但非 dynamo =
正则表达式.
编译(
r"^torch(?:\.(?!_dynamo\.|_inductor\.)[^.]+)*$"
).fullmatch
def _is_from_torch(对象:
任何)
翻译
布尔:
模块名称 = getattr(
对象, "__module__",
无)
如果
模块名称 is
不是
无:
返回
_torch 但非 dynamo(
模块名称) is
不是
无
名称 = getattr(
对象,
"__名称__",
无)
# 排除 torch 因为 torch.torch.torch.torch 会工作。不知道 mang
如果
名称 is
不是
无
和
名称 !=
torch:
为
猜测
在 [
火炬,
火炬.
神经网络.
功能性
]
如果 getattr(
猜测,
名称,
无) is
对象:
返回
真实
返回
假
类
命名空间:
一个用于将名称唯一关联到对象的上下文。
以下不变性被强制执行:
- 每个对象获得一个唯一的名称。
- 每个名称在给定的命名空间内是唯一的。
- 生成的名称不会覆盖内置函数,除非该对象确实是那个内置函数。
""
def __init__(self):
self._obj_to_name: 字典[
任何,
字符串] = {}
self._used_names: 设置[
字符串] =
设置()
self.基础计数:
字典[
字符串, int] = {}
def 创建名称(self,
候选人:
字符串,
对象:
可选[
任何])
翻译
字符串:
创建一个独特的名称。
参数:
候选人:用作独特名称的基础,与用户相关。
obj:如果非 None,将与独特名称关联的对象。
""
如果
对象 is
不是
无
和
对象
在 self._obj_to_name:
返回 self._obj_to_name[
对象]
# 乐观地检查候选者是否已经是有效的名称
匹配 = _name_regex.
匹配(
候选人)
如果
匹配 is
无:
# 删除所有在 Python 标识符中非法的字符
候选人 =
非法字符正则表达式.
子(
“_”,
候选人)
如果
不是
候选人:
候选人 =
"_未命名"
如果
候选人[0].isdigit():
候选人 = f
"_"{
候选人}"
匹配 =
_名称正则表达式.
匹配(
候选人)
断言
匹配 is
不是
无
基础,
数字 =
匹配.
群组(1, 2)
如果
数字 is
无
或者
候选人
在 self.
已用名称:
数字 = self.
基础计数.
获取(
候选人, 0)
如果
非法名称.
获取(
候选人,
对象) is
不是
对象:
数字 += 1
候选人 = f"{
基础}_{
数字}"
# 假设非法名称不以 _\d 结尾,因此无需再次检查
else:
数字 = int(
数字)
当
候选人
在 self.
已用名称:
数字 += 1
候选人 = f"{
基础}_{
数字}"
self._used_names.添加(
候选人)
self._base_count[基础] =
数字
如果
对象 is
不是
无:
self._obj_to_name[对象] =
候选人
返回
候选人
def 将对象与名称关联(self,
名称:
字符串,
对象:
任何):
将一个唯一的名称与对象关联。
`名称`和`对象`均不应已关联。
""
可能已存在 = self._obj_to_name.setdefault(
对象,
名称)
断言
可能存在 is
名称,
对象已关联
def 重命名对象(self,
对象:
任何,
名称:
字符串):
断言
对象
在 self.
对象名称
self.对象名称[
对象] =
名称
self.已用名称.
添加(
名称)
dtype_abbrs = {
火炬.bfloat16: "bf16",
火炬.float64: "f64",
火炬.float32: "f32",
火炬.float16: "f16",
火炬.float8_e4m3fn: "f8e4m3fn",
火炬.float8_e5m2: "f8e5m2",
火炬.float8_e4m3fnuz: "f8e4m3fnuz",
火炬.float8_e5m2fnuz: "f8e5m2fnuz",
火炬.float8_e8m0fnu: "f8e8m0fnu",
火炬.complex32: "c32",
火炬.complex64: "c64",
火炬.complex128: "c128",
火炬.int8:
i8,
火炬.int16:
i16,
火炬.int32:
i32,
火炬.int64: "i64",
火炬.
布尔: "b8",
火炬.uint8: "u8",
火炬.uint16: "u16",
火炬.uint32:
u32,
火炬.uint64:
u64,
火炬.bits16:
b16,
火炬.
位 1x8:
b1x8,
}
@compatibility(兼容旧版本=
是)
@dataclass
类
Python 代码:
""
表示执行或保存图形为 Python 代码所需的所有信息。
""
前向函数定义的 Python 源代码。
源:
字符串
在执行 `src_def` 期间的全局变量值。
全局变量:
字典[
字符串,
任何]
可选的从前向函数的行号到节点索引的映射。
节点索引。
行号映射:
可选[
字典[int,
可选[int]]]
def 格式目标(
基础:
字符串,
目标:
字符串)
翻译
字符串:
元素 =
目标.
分割(
“。”)
r = 基础
为 e
在
元素:
如果
不是 e.isidentifier():
r = fgetattr({r}, "{e}
')'
else:
r = f"{r}.{e}"
返回 r
类
插入点:
def __init__(self, graph, 新插入):
self.图 =
图
self.原始插入, graph.
插入 _ = graph.
插入 _,
新插入
def __进入__(self):
通过
def __退出__(self,
类型,
值, tb):
self.graph.插入 = self.
原始插入
类
节点列表:
def __init__(self, graph: 图,
方向:
直接[
_前,
_后] =
下一个):
断言
方向
在 (
下一个,
上一个)
self.图 =
图
self.方向 =
方向
def __len__(self):
返回 self.graph._len
def __iter__(self):
返回 _NodeIter(self.graph.
根, self.
方向 ==
"_前一个")
def __倒序__(self):
返回
节点列表(self.graph,
"_下一个"
如果 self.
方向 ==
"_上一个"
否则 "_prev")
类 _PyTreeInfo(
命名元组):
""
包含在当我们使用 Pytrees 时存储的额外信息
""
orig_args: 列表[
字符串]
在规范中:
py 树.
树规范
输出规范:
可选[
py 树.
树规范]
@dataclass(冻结=
是)
类
_解析堆栈跟踪:
""
表示解析堆栈跟踪的最顶层帧
""
文件:
字符串
行号:
字符串
名称:
字符串
代码:
字符串
def 获取摘要字符串(self):
返回 f
"文件:"{self.
文件}:{self.
行号}
在{self.
名称}
,代码:{self.
代码}"
从堆栈跟踪获取文件行号代码
def 解析堆栈跟踪(
堆栈跟踪:
字符串):
如果
堆栈跟踪 is
无:
返回
无
模式 =
正则表达式.
编译(r
文件
"\"
(.+)
输入文本:
"Immersive Translate"
翻译:
沉浸式翻译, 行 (\d+),在 (.+)$")
行 =
堆栈跟踪.strip().
分割("
输入文本翻译为简体中文为:\n")
堆栈跟踪应将最内层帧放在最后,因此我们
逆向迭代以找到第一个以空行开始的行
# 与“文件”
为
索引
在
范围(
长度(
行) - 2, -1, -1):
行 =
行[
索引].strip()
匹配项 =
模式.
匹配(
行)
如果
匹配:
文件 =
匹配.
群组(1)
行号 =
匹配.
群组(2)
名称 =
匹配.
群组(3)
# 下行应为代码
代码 =
行[
索引 + 1].strip()
返回
解析堆栈跟踪(
文件,
行号,
名称,
代码)
返回
无
@compatibility(兼容旧版本=
错误)
类 CodeGen:
def __init__(self):
self._body_transformer: 可选[TransformCodeFunc] =
无
self._func_name: 字符串 =
向前
def 生成函数定义(self,
自由变量:
列表[
字符串
]
可能的返回注解:
字符串)
翻译
字符串:
""
基于自由变量和返回注解,生成 FX 函数的开始部分。
默认情况下,`gen_fn_def(['a', 'b'], '') == 'def {self._func_name}(a, b):'`
""
如果原始函数没有将 self 作为其第一个参数,我们
就会添加它。
如果
长度(free_vars) == 0
或者 free_vars[0] !=
self:
free_vars.插入(0,
self)
返回 (
fdef{self.
函数名}({
“,”.
连接(
自由变量)}){
可能的返回注解}
空字符串
)
def 生成输出(self,
输出参数:
参数)
翻译
字符串:
""
根据输出参数,生成 FX 函数的返回语句。
注意:返回的语句不应缩进。
""
返回 f
return{
表示(
输出参数)}"
def 处理输入(self, *
参数:
任何)
翻译
任何:
""
将输入转换,以便图可以将其作为参数,因为
非默认的代码生成可能导致函数的输入与图的输入不同。
如果图可以直接运行,则此不变量应该成立。
如果图可以直接运行,则此不变量应该成立。
`f.graph.process_outputs(f.graph(*f.graph.process_inputs(*inputs))) == f(*inputs)`
""
返回 args
def process_outputs(self, 输出:
任何)
翻译
任何:
""
将图输出的结果转换为与 codegen 相同。
更多详情请参阅 `process_inputs`。
""
返回
输出
def additional_globals(self) 翻译
列表[
元组[
字符串,
任何
]
""
如果你的代码生成器使用了额外的全局值,请在此处添加(标识符,值的引用)元组。
例如,返回 ['列表', typing.List] 如果您需要在全局范围内使用 `列表`。
""
返回
输入文本为空,请提供需要翻译的文本
def _gen_python_code(
self,
节点,
root_module: 字符串,
命名空间:
_命名空间,
*,
详细模式:
布尔值 =
错误,
包含步长:
布尔值 =
错误,
包含设备:
布尔值 =
错误,
彩色:
布尔值 =
错误,
) 翻译
Python 代码:
自由变量:
列表[
字符串] =
输入文本为空,请提供需要翻译的文本
主体:
列表[
字符串] =
输入文本为空,请提供需要翻译的文本
全局变量:
字典[
字符串,
任何] = {}
包装函数:
字典[
字符串,
无] = {}
将字符串包裹在列表中以通过引用传递
可能返回注释:
列表[
字符串] = [
输入文本翻译为简体中文为:""]
包含步长 =
包含步长
或者 (
os.环境.
获取(
FX 图形显示步长, "0") ==
1
)
包含设备 =
包含设备
或者 (
os.环境.
获取(
显示 FX 图形设备, "0") ==
1
)
def 添加全局(
名称提示:
字符串,
对象:
任何):
添加一个要跟踪的全局对象。
我们对引用图外对象的名称调用此操作,
如函数或类型。
返回:在生成的源代码中引用 'obj' 应使用的全局名称。
""
如果 (
_is_from_torch(对象)
和
对象 !=
火炬.
设备
): 支持注册 torch.device
# HACK:torch 自定义操作注册的解决方案的权宜之计。我们
# 无法像普通模块那样导入,因此它们必须保留其
# 完全限定名称。
返回
获取合格名称(
对象)
将名称提示规范化以获取正确的标识符
全局名称 =
命名空间.
创建名称(
名称提示,
对象)
如果
全局名称
在
全局_:
断言
全局_[
全局名称] is
对象
返回
全局名称
全局_[
全局名称] =
对象
返回 global_name
预填充全局表中的已注册内置函数。
为
名称, (_,
对象)
在 _custom_builtins.
项目():
添加全局(
名称,
对象)
def 类型表示(o:
任何):
如果 o == ():
空元组类型注解 Tuple[()]
返回 "()"
类型名 = _type_repr(o)
如果
原始类型 := getattr(o, "__origin__",
无):
# 列表[...], typing.List[...], TensorType[...]
如果 isinstance(o,
输入法.
_通用别名):
# 类型:忽略[已定义]
# 这是一个通用的预 PEP585 类型,例如 typing.List[torch.Tensor]
原始类型 = _origin_type_map.
获取(origin_type, origin_type)
origin_typename = 添加全局(
_类型表示(origin_type), origin_type)
如果
有属性(o,
"__参数__"):
# 为每个内部类型变量分配全局名称。
args = [类型表示(arg)
为
参数
在 o.
参数]
如果
长度(
参数) == 0:
# 纯类型,例如无下标的 `typing.Tuple`
# 此代码路径用于 Python < 3.9
返回
原始类型名
返回 f'{
原始类型名}[{",".
连接(
参数)}
]
else:
# 纯类型,例如没有下标的 `typing.Tuple`
# 此代码路径用于 Python 3.9+
返回
原始类型名
# 常见情况:这是一个像 'foo.bar.baz' 这样的常规模块名
返回
添加全局(
类型名, o)
如果
彩色:
红色 =
颜色函数[
红色]
暗绿色 =
_颜色函数[
"暗绿色"]
维度 =
_颜色函数[
暗淡]
暗淡蓝色 =
颜色函数[
"暗淡蓝色"]
蓝色 =
颜色函数[
"蓝色"]
else:
红色 =
身份
深绿色 =
身份
维度 =
身份
深蓝色 =
身份
蓝色 =
身份
def _get_repr(arg: 任何)
翻译
字符串:
如果 isinstance(arg,
节点):
# 首先因为常见
返回
表示(arg)
如果...否则 isinstance(arg,
元组)
和
有属性(arg,
"_字段"):
# 如果有 `_fields`,则通过 add_global 处理 NamedTuples。
qualified_name = _get_qualified_name(类型(arg))
全局名称 =
添加全局(
合法名称,
类型(arg))
返回 f"{
全局名称}{
表示(
元组(arg))}"
如果...否则 isinstance(
arg, (火炬.
操作符.
操作符重载,
火炬.
操作符.
高阶运算符)
):
qualified_name = _获取_限定名称(arg)
全局名称 =
添加全局(
合法名称, arg)
返回 f"{
全局名称}"
如果...否则 isinstance(arg,
枚举.
枚举):
类 = arg.
类
类名 =
添加全局(
类.__name__,
类)
返回 f"{clsname}.{arg.
名称}"
如果...否则 isinstance(arg,
火炬.
张量):
大小 =
列表(arg.
尺寸())
dtype = 字符串(arg.
数据类型).
分割(
“。”
)-1]
返回 f
"torch.Tensor(大小={
尺寸}
, 数据类型={
数据类型})"
如果...否则 isinstance(arg,
元组):
如果
长度(arg) == 1:
返回 f
“(”{_get_repr(arg[0])},)"
else:
返回
“(” +
“,”.
连接(_get_repr(a)
为 a
在 arg) +
)"
如果...否则 isinstance(arg,
列表):
返回
[ +
“,”.
连接(_get_repr(a)
为 a
在 arg) +
`]`
如果...否则 isinstance(arg,
切片):
返回 f
"切片("{_get_repr(arg.
开始)}, {_get_repr(arg.
停止)}, {_get_repr(arg.
步长)})"
else:
返回
蓝色(
表示(arg))
def _格式参数(
参数:
元组[
参数, ...
] kwargs:
字典[
字符串,
参数]
) 翻译
字符串:
res = [获取表示(a)
为 a
在
参数]
资源.
扩展
[f"{k} = {
获取表示(v)}"
为 k, v
在 kwargs.
项目()])
返回
“,”.
连接(
资源)
遍历反向节点并记录首次使用实例
给定节点的数量。这代表节点在程序中的*最后*使用。
程序执行顺序,我们将用它来释放未使用的
值
节点到最后一次使用:
字典[
节点,
节点] = {}
用户最后使用:
字典[
节点,
列表[
节点]] = {}
def 注册最后使用(n:
节点,
用户:
节点):
如果 n
不是
在
节点到上次使用:
节点到上次使用[n] =
用户
用户上次使用记录.setdefault(
用户,
空列表.append(n)
为
节点
在
反转(
节点):
为
输入节点
在
节点.
输入节点:
注册最后使用(
输入节点,
节点)
def 删除未使用值(
用户:
节点):
""
删除使用后的值。这确保了代码中未使用的值被释放,代码的内存使用达到最优。
代码剩余部分未使用,从而释放内存,使代码的内存使用达到最优。
代码的内存使用达到最优。
""
如果
用户.
操作符 ==
占位符:
返回
如果
用户.
操作符 ==
输出:
主体.append("
输入文本翻译为简体中文为:\n")
返回
要删除的节点 =
用户最后使用时间.
获取(
用户,
[]
如果
长度(
用户.
用户们.
键()) == 0:
# 此节点未被任何其他节点使用。然而它也没有被使用
# 由于副作用已被 DCE 移除。我们希望释放它的输出
执行后立即保存内存
要删除的节点.append(
用户)
如果
长度(
要删除的节点):
要删除的字符串 =
等于.
连接(
[表示(n)
为 n
在
要删除的节点] + [
无]
)
主体.append(f
;{
暗(
要删除的字符串)}
输入文本翻译为简体中文为:\n")
else:
主体.append("
输入文本翻译为简体中文为:\n")
prev_stacktrace = 无
def append_stacktrace_summary(节点:
节点):
""
将堆栈跟踪的摘要添加到生成的代码中。这
对调试很有用。
""
非局部
前一个堆栈跟踪
如果
节点.
操作符
不是
在 {
占位符,
输出}:
堆栈跟踪 =
节点.
堆栈跟踪
如果
堆栈跟踪:
如果
堆栈跟踪 !=
前一个堆栈跟踪:
前一个堆栈跟踪 =
堆栈跟踪
如果
解析后的堆栈跟踪 :=
解析堆栈跟踪(
堆栈跟踪):
摘要字符串 =
解析后的堆栈跟踪.
获取摘要字符串()
else:
摘要字符串 =
请提供需要翻译的文本
主体.append(f'
输入文本翻译为简体中文为:\n {
暗(f
#{
摘要字符串}")}
输入文本翻译为简体中文为:\n')
如果...否则
前一个堆栈跟踪 !=
输入文本翻译为简体中文为:"":
前一个堆栈跟踪 =
请提供需要翻译的文本
无堆栈跟踪信息 =
"# 未找到以下节点的堆栈跟踪"
主体.append(f"
输入文本翻译为简体中文为:\n{
暗(
无堆栈跟踪信息)}
输入文本翻译为简体中文为:\n")
def 将形状字符串化(
形状:
迭代器)
翻译
字符串:
返回 f"[{
“,”.
连接
[
字符串(x)
为 x
在
形状])}]"
def 发射节点(
节点:
节点):
可能类型注解 = (
请提供需要翻译的文本
如果
节点.
类型 is
无
否则 f
":"{
类型表示(
节点.
类型)}"
)
如果
详细模式:
# 覆盖注释以提供更详细的信息
from torch.fx.experimental.proxy_tensor 导入 py_sym_types
from torch.fx.passes.shape_prop 导入 TensorMetadata
meta_val = 节点.
元数据.
获取(
值,
节点.
元数据.
获取(
张量元,
节点.
元数据.
获取(
示例值,
无)),
)
# 使用字符串作为注释,使其成为有效的 Python 代码
如果 isinstance(
元数据值,
火炬.
张量)
和
元值.
布局
不是
在 (
火炬.
稀疏压缩存储格式,
火炬.
稀疏压缩存储格式,
):
步长标注 = (
f"{将形状字符串化(
元值.
步长())}"
如果
包含步长
否则
请提供需要翻译的文本
)
设备标注 = f"{
元值.
设备}"
如果
包含设备
否则
请提供需要翻译的文本
可能的类型注解 = (
f“: ”{
红色(
数据类型缩写[
元值.
数据类型])}{
蓝色(
字符串化形状(
元值.
形状))}'
f'{深蓝色(
步长标注)}{
深绿色(
设备标注)}
''
)
如果...否则 isinstance(
元值,
Python 符号类型):
可能的类型注解 = f
": \"Sym({
元值}
)"‘
如果...否则 isinstance(
元值,
张量元数据):
可能的类型注解 = f
“: ”{
数据类型缩写[
元值.
数据类型]}{
序列化形状(
元值.
形状)}
''
如果
节点.
操作符 ==
占位符:
断言 isinstance(
节点.
目标,
字符串)
可能的默认参数 = (
请提供需要翻译的文本
如果
不是
节点.args
否则 f
"等于"{
_获取表示(
节点.
参数[0])}"
)
自由变量.append(
f"{节点.
目标}{
可能的类型注解}{
maybe 默认参数}"
)
原始名称 =
节点.
目标.
替换("*",
输入文本翻译为简体中文为:"")
如果
原始名称 !=
表示(
节点):
主体.append(f"{
表示(
节点)} = {
原名}
输入文本翻译为简体中文为:\n")
返回
如果...否则
节点.
操作符 ==
调用方法:
断言 isinstance(
节点.
目标,
字符串)
主体.append(
f"{表示(
节点)}{
可能的类型注解} = {
_格式目标(_get_repr(
节点.
参数[0
)]
节点.
目标)}"
f“(”{_format_args(
节点.
参数[1
,
节点.kwargs)})"
)
返回
如果...否则
节点.
操作符 ==
调用函数:
断言
可调用(
节点.
目标)
# 美化打印运算符
如果 (
getattr(节点.
目标, "__module__",
输入文本翻译为简体中文为:"") ==
操作符
和
节点.
目标.__name__
在
魔法方法
):
断言 isinstance(
节点.
参数,
元组)
主体.append(
f"{表示(
节点)}{
可能的类型注解}
= “
f"{魔法方法[
节点.
目标.__name__].
格式(*(_get_repr(a)
为 a
在
节点.
参数))}"
)
返回
# 美化打印原地操作符;对于 jit.script 正确工作所必需
# 目前在正常 FX 图中不支持,但由 torchdynamo 生成
如果 (
getattr(节点.
目标, "__module__",
输入文本翻译为简体中文为:"") ==
操作符
和
节点.
目标.__name__
在
内置方法
):
主体.append(
f"{内置方法[
节点.
目标.__name__].
格式(*(_get_repr(a)
为 a
在
节点.
参数))}; "
f"{表示(
节点)}{maybe_type_annotation} = {_get_repr(
节点.
参数[0])}"
)
返回
qualified_name = 获取合格名称(
节点.
目标)
全局名称 =
添加全局(
合法名称,
节点.
目标)
# getattr 的特殊情况:node.args 可能是 2 参数或 3 参数
2-参数:属性访问;3-参数:默认值调用属性函数
如果 (
global_name == "getattr"
和 isinstance(
节点.
参数,
元组)
和 isinstance(
节点.
参数[1
]
字符串)
和
节点.
参数[1].isidentifier()
和
长度(
节点.
参数) == 2
):
主体.append(
f"{表示(
节点)}{
可能的类型注解} = {
_格式目标(
_获取表示(
节点.
参数[0
)]
节点.
参数[1])}"
)
返回
主体.append(
f"{表示(
节点)}{
可能的类型注解} = {
全局名称}({
_格式化参数(
节点.
参数,
节点.kwargs)})"
)
如果
节点.
元数据.
获取(
is_wrapped,
错误):
包装函数.setdefault(
全局名称)
返回
如果...否则
节点.
操作符 ==
调用模块:
断言 isinstance(
节点.
目标,
字符串)
主体.append(
f"{表示(
节点)}{
可能的类型注解}
=
f"{_格式目标(
根模块,
节点.
目标)}({
_格式参数(
节点.
参数,
节点.kwargs)})"
)
返回
如果...否则
节点.
操作符 ==
获取属性:
断言 isinstance(
节点.
目标,
字符串)
主体.append(
f"{表示(
节点)}{
可能的类型注解} = {
_格式目标(
根模块,
节点.
目标)}"
)
返回
如果...否则
节点.
操作符 ==
输出:
如果
节点.
类型 is
不是
无:
可能的返回注解[0] = f
" -> "{
类型表示(
节点.
类型)}"
主体.append(self.
生成输出(
节点.
参数[0]))
返回
提升
不支持的操作异常(f
node:{
节点.
操作} {
节点.
目标}")
为 i,
节点
在
列举(
节点):
# NOTE: emit_node 不输出带有换行的字符串。它依赖于
# delete_unused_values 来追加一个
如果
详细模式:
追加堆栈跟踪摘要(
节点)
# 发出计数注释以保持跟踪
# 节点索引,稍后将删除
# 在经过_body_transformer 后
主体.append(f
"# 计数器: "{i}
输入文本翻译为简体中文为:\n")
emit_node(节点)
delete_unused_values(节点)
如果
长度(
主体) == 0:
# 如果图没有非占位符节点,则没有用于主体的线条
# have been emitted. To continue to have valid Python code, emit a
# single pass statement
主体.append("pass
输入文本翻译为简体中文为:\n")
如果
长度(
包装函数) > 0:
wrap_name = 全局添加(
包装,
火炬.fx.
包裹)
包装语句 = "
输入文本翻译为简体中文为:\n".
连接
[f'{
包装名称}
({
名称}
')'
为
名称
在
包装函数])
else:
包裹语句 =
请提供需要翻译的文本
如果 self._body_transformer:
身体 = self._body_transformer(
主体)
为
名称,
值
在 self.additional_globals():
全局添加(
名称,
值)
前言 = self.
生成函数定义(
自由变量,
可能的返回注解[0])
移除计数器并生成行号到节点索引映射
lineno_map: 字典[int,
可选[int]] = {}
前言长度 =
前言.
数量("
输入文本翻译为简体中文为:\n") + 1
新行:
列表[
字符串] =
输入文本为空,请提供需要翻译的文本
当前索引 =
无
为
行
在
输入文本翻译为简体中文为:"".
连接(
主体).
分割("
输入文本翻译为简体中文为:\n"):
计数器 =
_计数器正则表达式.
搜索(
行)
如果
计数器 is
不是
无:
cur_idx = int(counter.群组(1))
else:
lineno_map[长度(
新行) +
前言长度] =
当前索引
新行.append(
行)
代码 = "
输入文本翻译为简体中文为:\n".
连接(
新行).
去除字符串左侧空白字符("
输入文本翻译为简体中文为:\n")
代码 = "
输入文本翻译为简体中文为:\n".
连接(" " +
行
为
行
在
代码.
分割("
输入文本翻译为简体中文为:\n"))
函数代码 = f
""
{包裹语句}
{前言}
{代码}
""
返回
Python 代码(
函数代码,
全局变量,
行号映射=
行号映射)
# 理想情况下,我们希望将所有 pytree 逻辑重构到这段 codegen 代码中
# 类。不幸的是,目前我们在 FX 中有 3 个地方需要额外的逻辑。
# 1. 在初始符号跟踪中,pytree 逻辑与`concrete_args`绑定。
在 FX 图中,我们需要访问 2 个属性 - in_spec 和 out_spec。
由于我们无法在 FX 正向中访问.graph,我们需要将属性复制到模块中。
目前无法使用 `add_global` 注册 pytree 导入 - 不确定原因
类
_PyTree 代码生成器(CodeGen):
def __init__(self, pytree 信息: _PyTreeInfo):
超级().__init__()
self.pytree 信息:
_PyTree 信息 =
pytree 信息
def 处理输入(self, *
输入:
任何)
翻译
任何:
平铺参数 =
py 树.arg_tree_leaves(*
输入)
返回
平铺参数
def 处理输出(self,
输出:
任何)
翻译
任何:
如果 self.
pytree 信息 is
无
或者 self.
pytree 信息.
输出规范 is
无:
返回 out
如果
不是 isinstance(
输出, (
列表,
元组)):
out = [输出]
断言 self.
pytree 信息.
输出规范 is
不是
无
返回
py 树.
树形展开(
输出, self.
pytree 信息.
输出规范)
def 生成函数定义(self,
自由变量,
可能返回注释):
# 给定一个用户函数/模型:
# myargs = [myargs0, myargs1]
# mykwargs = {'mykwargs0': ..., 'mykwargs1': ...}
# def forward(self, mypos, *myargs, mykey=None, **mykwargs):
#
生成的代码将所有关键词展平为 `forward()` 的位置参数
例如:`forward(self, mypos, myargs0, myargs1, mykey, mykwargs0, mykwargs1):`
#
在 `forward` 中,`tree_flatten_spec` 仍然分别解析参数和关键字参数
例如:`tree_flatten_spec(([mypos, myargs0, myargs1],`
# {'mykey':mykey, 'mykwargs0':mykwargs0, 'mykwargs1':mykwargs1}),
# self._in_spec)
#
# 如果用户函数/模型没有关键字,则字典将不会从 tree_flatten_spec 中抑制
tree_flatten_spec([mypos, myargs0, myargs1]), self._in_spec
如果 self.pytree_info is
无:
返回
超级().
生成函数定义(
自由变量,
可能返回注释)
fn_args = self.pytree_info.原始参数
有原始 self = (
函数参数[0] ==
self)
如果
长度(
函数参数) > 0
否则
假
如果
有原 self:
自由变量.
插入(0,
self)
函数定义 =
超级().
生成函数定义(
函数参数
[]
可能返回注释)
如果
长度(
自由变量) > 0:
# pytree 中有占位符
# 当存在 kwargs 时,in_spec 是 tuple(args, kwargs)
has_args_kwargs_tuple = (
self.pytree_info.在规范中.
类型 ==
元组
和 self.pytree_info.
在规范中.
子数量 == 2
和 self.pytree_info.
在规范中.
子规格[0].
类型 ==
元组
和 self.
pytree 信息.
在规范中.
子规格[1].
类型 ==
字典
)
函数参数 = "{}"
函数签名 = f"[{
“,”.
连接(
函数参数)}
],self._in_spec"
如果 has_args_kwargs_tuple:
count_args = self.pytree_info.在规范中.
子规格[0].
子数量
fn_args = self.pytree 信息.orig_args
[
计算参数数量]
函数关键字参数 = (
"{"
+ “,”.
连接(
f''{k}
':'{v}"
为 k, v
在 zip(
self.pytree_info.在规范中.
子规格[1].
上下文,
self.pytree 信息.orig_args[
计算参数个数
,
)
)
+ "}"
)
函数签名 = f
"(["{
“,”.
连接(
函数参数)}
]{
函数关键字参数}), self._in_spec"
在 Python 中,`var1: annotation1, var2: annotation2 = function_call()` 是无效的。
需要拆分为两行:
一行用于注释:`var1: annotation1; var2: annotation2;`(注意分号)
一行用于代码:`var1, var2, = function_call()`
无注释 = [x.
分割(
“:”
)0]
为 x
在
自由变量]
具有注释 = [x +
“;”
为 x
在
自由变量
如果 ":"
在 x]
如果
长度(
具有注释) > 0:
函数定义 += "
输入文本翻译为简体中文为:\n " +
输入文本翻译为简体中文为:"".
连接(
有注解) + "
输入文本翻译为简体中文为:\n"
函数定义 += f
""
{“,”.
连接(
无注释)}, = fx_pytree.tree_flatten_spec({
函数签名})"""
返回
函数定义
def 生成输出(self,
输出参数):
如果 self.
Py 树信息
和 self.
pytree 信息.
输出规范:
返回 f
"返回 pytree.tree_unflatten("{
表示(
输出参数)}, self._out_spec)"
else:
返回
超级().
生成输出(
输出参数)
类
查找节点查找表:
""
侧桌用于快速查询的图表
""
def __init__(self):
self.表格:
字典[
元组[
字符串,
可选[
目标]],
字典[
节点,
无]] =
默认字典(
字典
)
def _键(self,
节点)
翻译
元组[
字符串,
可选[
目标
]]
返回 (
节点.
操作,
节点.
目标
如果
节点.
操作符 ==
调用函数
否则
无)
def 包含(self,
节点)
翻译
布尔:
返回
节点
在 self.
表格[self.
_键(
节点)]
def 插入(self,
节点:
节点)
翻译
无:
self.表格[self.
_键(
节点)][
节点] =
无
def 删除(self,
节点:
节点)
翻译
无:
self.表格[self.
_键(
节点)].
流行(
节点)
def find_nodes(self, *, 操作:
字符串,
目标:
可选[
"目标"] =
无):
如果
操作符 ==
调用函数:
断言
目标 is
不是
无
返回 [*self.
表格
[
操作,
目标)].
键()]
如果
目标 is
无:
返回 [*self.
表格
(
操作,
无
).
键()]
# 操作是调用方法、获取属性、调用模块
返回 [
节点
为
节点
在 self.
表格
[
操作,
无
)).
键()
如果
节点.
目标 ==
目标]
[文档]@compatibility(
兼容旧版本=
是)
类
图:
""
``图``是 FX 中间表示中使用的最主要的数据结构。
它由一系列的``节点``组成,每个节点代表调用点(或其他
语法结构)。这些``节点``的列表合在一起,构成一个
有效的 Python 函数。
例如,以下代码
.. 代码块 :: python
导入 torch
导入 torch.fx
class MyModule(torch.nn.Module):
def __init__(self):
super().__init__()
self.param = torch.nn.Parameter(torch.rand(3, 4))
self.linear = torch.nn.Linear(4, 5)
def forward(self, x):
return torch.topk(
torch.sum(self.linear(x + self.linear.weight).relu(), dim=-1), 3
)
m = MyModule()
gm = torch.fx.symbolic_trace(m)
将产生以下 Graph::
打印(gm.graph)
.. 代码块:: text
graph(x):
%linear_weight : [num_users=1] = self.linear.weight
%add_1 : [num_users=1] = call_function[target=operator.add](args = (%x, %linear_weight), kwargs = {})
%linear_1 : [num_users=1] = call_module[target=linear](args = (%add_1,), kwargs = {})
%relu_1 : [num_users=1] = 调用方法[target=relu](args = (%linear_1,), kwargs = {})
%sum_1 : [num_users=1] = 调用函数[target=torch.sum](args = (%relu_1,), kwargs = {dim: -1})
%topk_1 : [num_users=1] = 调用函数[target=torch.topk](args = (%sum_1, 3), kwargs = {})
返回 topk_1
对于在“图”中表示的操作的语义,请参阅::class:`节点`。
""
[文档] @兼容性(向后兼容=True)
def __init__(
self,
owning_module: Optional["GraphModule"] = None,
tracer_cls: 可选[type["Tracer"]] = None,
tracer_extras: 可选[dict[str, Any]] = None,
):
"""
构建一个空图。
"""
self._root: Node = Node(self, "", "root", "", (), {})
self._used_names: dict[str, int] = {} # 基础名称 -> 数字
self._insert = self._root.prepend
self._len = 0
self._graph_namespace = _Namespace()
self._owning_module = owning_module
self._tracer_cls = tracer_cls
self._tracer_extras = tracer_extras
self._codegen = CodeGen()
self._co_fields: dict[str, Any] = {}
self._find_nodes_lookup_table = _FindNodesLookupTable()
@property
def 拥有模块(self):
返回 self.
_拥有模块
@owning_module.setter
def 拥有模块(self,
模块:
可选[
图模块
)]
self.拥有模块 =
修饰
@property
def 节点(self)
翻译
节点列表:
""
获取构成此图的节点列表。
注意,此 ``节点`` 列表表示形式是双向链表。突变
在迭代过程中(例如删除节点、添加节点)是安全的。
返回:
节点构成的双向链表。注意,可以调用 `reversed` 来切换迭代顺序。
此列表可以通过调用 `reversed` 来切换迭代顺序。
""
返回
节点列表(self)
[文档] @兼容性(is_backward_compatible=False)
def output_node(self) -> Node:
output_node = next(iter(reversed(self.nodes)))
assert output_node.op == "output"
return output_node
[文档] @兼容性(向后不兼容=False)
def find_nodes(
self, *, op: str, target: Optional["目标"] = None, sort: bool = True
):
"注释"
允许快速查询节点
参数:
op (str): 操作名称
目标(可选[目标]):节点的目标。对于调用函数,
目标是必需的。对于其他操作,目标为可选。
排序(布尔值):是否按节点在图上出现的顺序返回节点。
在图上。
返回值:
具有请求操作和目标的节点可迭代对象。
"""
node_list = self._find_nodes_lookup_table.find_nodes(op=op, target=target)
if sort:
return sorted(node_list)
return node_list
[文档] @兼容性(向后兼容=True)
def graph_copy(
self, g: "图", val_map: dict[节点, 节点], return_output_node=False
) -> "Optional[参数]":
"``"
从给定的图中将所有节点复制到 ``self`` 中。
参数:
g (Graph): 要从中复制节点的源图。
val_map (Dict[Node, Node]): 一个将填充映射的字典
从 ``g`` 中的节点到 ``self`` 中的节点。注意 ``val_map`` 可以传递
使用已存在的值来覆盖某些值的复制。
返回:
`self` 中的值现在等同于 `g` 中的输出值,
如果 `g` 有 `output` 节点。否则为 `None`。
"""
对于 `g.nodes` 中的每个节点:
如果节点在 val_map 中:
继续执行
如果节点.op 等于"output":
rv = map_arg(node.args[0], lambda n: val_map[n])
返回 rv,如果未设置 return_output_node,否则返回(rv, node)
val_map[node] = self.node_copy(node, lambda n: val_map[n])
返回 None
def 深拷贝(self,
描述=
无)
翻译
图:
""
显式实现__deepcopy__以防止递归深度过大
从默认实现。这使用 graph_copy 来复制节点
以迭代方式,而不是递归方式。它还填充了
缓存表以防止不必要的复制(例如引用)
节点或其他来自自定义 GraphModule 实现的图部分。
""
备忘录 =
备忘录
如果
备忘录
否则 {}
g = 图(tracer_cls=self._tracer_cls)
output_vals = g.graph_copy(self, 值映射=
描述,
返回输出节点=
是)
g.代码生成器 =
复制.
深拷贝(self.
_代码生成)
如果
输出值 is
不是
无:
断言 isinstance(
输出值,
元组)
输出值,
旧输出节点 =
输出值
新输出节点 = g.
输出(
输出值, type_expr=getattr(
旧输出节点,
类型,
无)
)
新输出节点.
元数据 =
复制.
复制(
旧输出节点.
元数据)
返回 g
[文档] @compatibility(
兼容旧版本=
是)
def 创建节点(
self,
操作:
字符串,
目标:
目标,
参数:
可选[
元组[
论点, ...]] =
无,
kwargs: 可选[
字典[
字符串,
"论点"]] =
无,
名称:
可选[
字符串] =
无,
type_expr: 可选[
任何] =
无,
) 翻译
节点:
""
创建一个 ``节点`` 并将其添加到当前插入点的 ``图`` 中。
注意,当前插入点可以通过 :meth:`Graph.inserting_before` 设置。
以及 :meth:`Graph.inserting_after`。
参数:
op (str): 该节点的操作码。'call_function'、'call_method'、'get_attr'之一。
'调用模块', '占位符', 或 '输出'。这些操作码的语义是
描述在`Graph`文档字符串中。
args (Optional[Tuple[Argument, ...]]): 是此节点的参数元组。
kwargs (Optional[Dict[str, Argument]]): 此节点的 kwargs
name (Optional[str]): 可选的字符串名称,用于 Node
这将影响 Python 生成的代码中分配给值的名称。
Python 生成的代码。
type_expr (Optional[Any]): 可选的类型注解,表示此节点输出的 Python 类型。
此节点输出的 Python 类型。
返回:
新创建并插入的节点。
""
在 Node.__init__中检查`target in _legal_ops`。
如果
不是
参数:
args = ()
else:
断言 isinstance(
参数,
元组),
"参数必须是元组"
如果
不是 kwargs:
kwargs = 不可变字典()
else:
断言 isinstance(kwargs,
字典),
"kwargs 必须是一个字典"
候选人 =
名称
如果
名称 is
不是
无
否则 self.
_目标转字符串(
目标)
名称 = self.
_图命名空间.
创建名称(
候选人,
无)
n = 节点(self,
名称,
操作,
目标,
参数, kwargs, type_expr)
如果 (
self.拥有模块 is
不是
无
和 getattr(self.
拥有模块,
_创建节点钩子,
无) is
不是
无
):
为 f
在 self.
拥有模块.
创建节点钩子:
f(n)
self.图命名空间.
将对象与名称关联(
名称, n)
self.插入 _(n)
self._find_nodes_lookup_table.插入(n)
self._len += 1
返回 n
[文档] @兼容性(向后兼容=False)
def process_inputs(self, *args):
""
处理参数以便将它们传递给 FX 图。
""
返回 self._codegen.process_inputs(*args)
[文档] @兼容性(向后兼容=False)
def process_outputs(self, out):
return self._codegen.process_outputs(out)
[文档] @兼容性(向后兼容=True)
def erase_node(self, to_erase: 节点) -> None:
```python
# 输入文本
input_text = '"""'
# 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 假设的翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
删除“图”中的“节点”。如果发生异常则抛出异常。
该节点在“Graph”中仍有用户。
Args:
要删除的 ``Node`` 对象。
"""
如果 to_erase.users 的长度大于 0:
raise RuntimeError(
尝试删除节点 {to_erase} 但它仍然有 {len(to_erase.users)} 个用户
图中的用户:{to_erase.users}!
)
如果 to_erase.graph 不等于 self
则引发 RuntimeError 异常(f"尝试从错误的图中移除 {to_erase}!")
如果 to_erase._erased
则警告(warnings.warn(f"在已删除的节点上 erase_node({to_erase})"))
返回
if (
self.owning_module 不为 None
and getattr(self.owning_module, "_erase_node_hooks", None) 不为 None
):
for f in self.owning_module._erase_node_hooks:
f(to_erase)
self._find_nodes_lookup_table.remove(to_erase)
删除._remove_from_list()
to_erase._erased = True # 迭代器可能保留对已删除节点的引用
self._len -= 1
将此 Node 的参数节点置为空,以便引用的节点
可以根据需要更新他们的 ``users``。
to_erase._update_args_kwargs(
map_arg(to_erase._args, lambda n: None),
map_arg(to_erase._kwargs, lambda n: None),
)
[文档] @兼容性(向后兼容=True)
def 插入之前(self, n: Optional[Node] = None):
"""设置 create_node 和伴随方法将插入到图中的点。
当在 'with' 语句中使用时,这将临时设置插入点,并在退出 'with' 语句时恢复它
然后当 'with' 语句退出时恢复它::
with g.inserting_before(n):
... # 在节点 n 前插入
... # 恢复到之前的状态
g.inserting_before(n) # 永久设置插入点
Args:
n (Optional[Node]): 要插入的节点之前。如果为 None,则在此处插入
整个图的开始。
返回:
一个将在 `__exit__` 上恢复插入点的资源管理器。
```python
# 输入文本
input_text = '"""'
# 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 假设的翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
如果 n 为 None:
返回 self.inserting_after(self._root)
断言 n.graph == self, "要插入的节点不在图中。"
返回 _InsertPoint(self, n.prepend)
[文档] @兼容性(向后兼容=True)
def inserting_after(self, n: Optional[Node] = None):
"""设置 create_node 和伴随方法将在图中插入的点。
当在 'with' 语句中使用时,这将临时设置插入点并
当 with 语句退出时恢复它::
使用 g.inserting_after(n):
... # 在节点 n 之后插入
... # 插入点恢复到之前的状态
g.inserting_after(n) # 将插入点永久设置
Args:
n (可选[节点]): 要插入的节点之前。如果为 None,则将在整个图的开始处插入。
在整个图的开始处。
返回值:
一个资源管理器,将在 `__exit__` 时恢复插入点。
"""
如果 n 为 None:
返回在根节点之前插入的结果
断言 n.graph 等于 self,"要插入的节点不在图中。"
返回_InsertPoint(self, n.append)
[文档] @兼容性(向后兼容=True)
def 占位符(
self,
name: str,
type_expr: 可选[任何类型] = None,
default_value: 任何类型 = inspect.Signature.empty,
) -> 节点:
"""
在图中插入一个 ``占位符`` 节点。一个 ``占位符`` 代表
一个函数输入。
参数:
name (str): 输入值的名称。这对应于名称
该函数所表示的 ``Graph`` 的位置参数。
type_expr (Optional[Any]): 表示该节点输出将具有的 Python 类型的可选类型注解。
该节点输出将具有的 Python 类型。在某些情况下,这需要用于正确的代码生成(例如,当函数被使用时)。
在某些情况下,这需要用于正确的代码生成(例如,当函数被使用时)。
随后在 TorchScript 编译中)。
默认值(任何类型):此函数参数应采用的默认值
请注意:为了允许`None`作为默认值,`inspect.Signature.empty`
应作为此参数传递以指定该参数不
默认值。
.. 注意::
此方法同样适用相同的插入点和类型表达式规则。
与 `Graph.create_node` 相同。
"````"
args = () if default_value is inspect.Signature.empty else (default_value,)
返回 self.create_node("placeholder", name, args=args, type_expr=type_expr)
[文档] @compatibility(
兼容旧版本=
是)
def 获取属性(self,
合法名称:
字符串, type_expr:
可选[
任何] =
无)
翻译
节点:
""
在 Graph 中插入一个 ``get_attr`` 节点。一个 ``get_attr`` ``Node`` 代表了
从“模块”层次结构中获取属性。
参数:
qualified_name(字符串):要检索的属性的完全限定名称。
例如,如果跟踪的模块有一个名为“foo”的子模块,
该子模块有一个名为“bar”的子模块,该子模块有一个名为“baz”的属性,
`foo.bar.baz` 应该作为 `qualified_name` 传递。
type_expr (Optional[Any]):一个表示此节点输出将具有的 Python 类型的可选类型注解。
该节点输出将具有的 Python 类型。
返回:
新创建并插入的 `get_attr` 节点。
.. 注意::
此方法同样适用相同的插入点和类型表达式规则
与 `Graph.create_node` 相同。
""
def _get_attr_reference_exists(
模块:
火炬.
神经网络.
模块,
合法名称:
字符串
) 翻译
布尔:
模块路径, _,
名称 =
合法名称.
分割(
“。”)
尝试:
子模块:
火炬.
神经网络.
模块 =
模块.
获取子模块(
模块路径)
除了
属性错误:
警告.
警告(f
"获取模块失败"{
模块路径}
!)
返回
假
如果
不是
有属性(
子模块,
名称):
返回
假
res = getattr(子模块,
名称)
如果 (
不是 isinstance(
资源,
火炬.
神经网络.
模块)
和
不是 isinstance(
资源,
火炬.
神经网络.
参数)
和
名称
不是
在
子模块.
缓冲区
):
返回
假
返回
真实
如果 self.
所属模块
和
不是 _get_attr_reference_exists(
self.所属模块, qualified_name
):
警告.
警告(
"尝试插入一个没有“
"底层引用的拥有者“
"GraphModule! 调用“
"GraphModule.add_submodule 来添加“
"必要的子模块,"
"GraphModule.add_parameter 添加的 "
"必要的参数,或 "
"nn.Module.register_buffer 添加的 "
必要的缓冲区,
栈级别=2,
)
返回 self.
创建节点(
获取属性,
合法名称, type_expr=type_expr)
[文档] @兼容性(向后兼容=True)
def 调用模块(
self,
module_name: 字符串,
args: 可选[tuple["Argument", ...]] = None,
kwargs: 可选[dict[str, "Argument"]] = None,
type_expr: 可选[Any] = None,
-> 节点:
"""
在 ``Graph`` 中插入一个 ``call_module`` ``节点``。一个 ``call_module`` 节点
表示对 ``Module`` 中的 forward() 函数的调用
层次结构。
参数:
模块名称 (str):要调用的 ``Module`` 的限定名称。在 ``Module`` 的
层次结构中。例如,如果跟踪的 ``Module`` 有一个
子模块名为 ``foo'',它有一个名为 ``bar'' 的子模块,其限定名称 ``foo.bar'' 应该作为 ``module_name'' 传递给调用该模块。
参数(可选[Argument, ...]):要传递的位置参数。
调用该模块。
args(可选[Tuple[Argument, ...]]):要传递的位置参数。
调用方法时请注意,这不应包含 ``self`` 参数。
kwargs(可选[Dict[str, Argument]]):要传递的关键字参数
调用方法
type_expr(可选[Any]):表示可选类型注解
Python 类型,此节点的输出将具有。
返回:
新创建并插入的 `call_module` 节点。
.. 注意:
此方法同样适用相同的插入点及类型表达式规则
与 :meth:`Graph.create_node` 方法相同
"""
如果 self.owning_module 不为空且 self.owning_module.get_submodule(module_name) 为空:
警告:warnings.warn()
尝试插入带有 "的 call_module 节点
无所属参考的“拥有者”
GraphModule! 调用
"GraphModule.add_submodule 添加子模块的 "
"必要的子模块"
)
返回 self.create_node(
调用模块,module_name,args,kwargs,type_expr=type_expr
)
[文档] @兼容性(向后兼容=True)
def 调用方法(
self,
method_name: str,
args: 可选[tuple["参数", ...]] = None,
kwargs: 可选[dict[str, "参数"]] = None,
type_expr: 可选[Any] = None,
) -> 节点:
“”
将一个 `call_method` `Node` 插入到 `Graph` 中。一个 `call_method` 节点
表示对 `args` 的第 0 个元素调用一个给定方法。
参数:
method_name (str): 要应用于 self 参数的方法名称。
例如,如果 args[0] 是一个表示 ``Tensor`` 的 ``Node``,
那么要对该 ``Tensor`` 调用 ``relu()``,请将 ``relu`` 传递给 ``method_name``。
args (Optional[Tuple[Argument, ...]]): 要传递的位置参数
调用方法时。请注意,这*应该*包括一个`self`参数。
kwargs(可选[Dict[str, Argument]]):要传递的关键字参数
调用方法
type_expr(可选[Any]):一个表示可选类型注解的可选类型
Python 类型,此节点的输出将具有。
返回:
新创建并插入的 ``call_method`` 节点。
.. 注意:
同样的插入点和类型表达式规则适用于此方法
与 :meth:`Graph.create_node` 相同。
"""
return self.create_node(
"调用方法", 方法名, 参数, 关键字参数, 类型表达式=类型表达式
)
[文档] @兼容性(向后兼容=True)
def 调用函数(
self,
the_function: 可调用函数类型,
args: 可选的[“参数”, ...]元组 = None,
kwargs: 可选的[字符串, “参数”]字典 = None,
type_expr: 可选的 [Any] = None,
) -> Node:
"""
在 ``Graph`` 中插入一个 ``call_function`` ``Node``。一个 ``call_function`` 节点
表示调用由 `the_function` 指定的 Python 可调用对象。
Args:
the_function (Callable[..., Any]): 要调用的函数。可以是任何 PyTorch
操作符、Python 函数,或者是 `builtins` 或 `operator` 模块中的成员
命名空间。
args(可选[Tuple[Argument, ...]]):要传递给调用函数的位置参数。
要传递给调用函数的。
kwargs(可选[Dict[str, Argument]]):要传递的关键字参数。
调用函数
type_expr (Optional[Any]): 表示此节点输出将具有的 Python 类型的可选类型注解。
该节点输出将具有的 Python 类型。
返回值:
新创建并插入的 `call_function` 节点。
.. 注意::
此方法同样适用相同的插入点和类型表达式规则。
与 :meth:`Graph.create_node` 相同。
"""
返回 self.create_node(
"调用函数", the_function, args, kwargs, type_expr=type_expr
)
[文档] @兼容性(向后兼容=True)
def node_copy(
self, node: Node, arg_transform: Callable[[Node], "Argument"] = lambda x: x
) -> Node:
""
从一个图中复制一个节点到另一个图中。`arg_transform` 需要将节点的参数从节点所在的图转换到自身的图中。示例::
将 `g` 中的所有节点复制到 `new_graph` 中
# 将 `g` 中的所有节点复制到 `new_graph` 中
g: torch.fx.Graph = ...
new_graph = torch.fx.graph()
value_remap = {}
for node in g.nodes:
value_remap[node] = new_graph.node_copy(node, lambda n: value_remap[n])
Args:
node (Node): 需要复制到 ``self`` 的节点。
arg_transform (Callable[[Node], Argument]): 一个函数,用于转换
将节点中的 ``Node`` 参数 ``args`` 和 ``kwargs`` 转换为 ``self`` 中的等效参数。在 simplest case 下,这应该
从将原始图中的节点映射到 ``self`` 的表中检索一个值。
将节点中的 ``Node`` 参数 ``args`` 和 ``kwargs`` 转换为 ``self`` 中的等效参数。在 simplest case 下,这应该
从将原始图中的节点映射到 ``self`` 的表中检索一个值。
"""
args = map_arg(node.args, arg_transform)
kwargs = map_arg(node.kwargs, arg_transform)
assert isinstance(args, tuple)
assert isinstance(kwargs, dict)
result_node = self.create_node(
node.op, node.target, args, kwargs, node.name, node.type
)
result_node.meta = copy.copy(node.meta)
return result_node
[文档] @兼容性(向后兼容=True)
def output(self, result: "参数", type_expr: Optional[Any] = None):
"""
将一个 ``output`` ``节点`` 插入到 ``Graph`` 中。一个 ``output`` 节点表示
Python 代码中的“return”语句。`result`是应该返回的值。
应返回。
Args:
返回值(参数):要返回的值。
type_expr (Optional[Any]): 可选的类型注解,表示此节点输出的 Python 类型。
Python 类型是该节点输出的类型。
.. 注意::
对于此方法,同样适用相同的插入点和类型表达式规则。
as ``Graph.create_node``.
"""
return self.create_node(
op="output", target="output", args=(result,), type_expr=type_expr
)
def 目标_to_str(self,
目标:
目标)
翻译
字符串:
如果
可调用(
目标):
操作符 =
目标.__name__
else:
断言 isinstance(
目标,
字符串)
操作符 =
目标
如果 _is_magic(
操作):
操作符 =
操作[2:-2]
操作符 =
下划线命名法(
操作)
返回
操作符
[文档] @compatibility(
兼容旧版本=
是)
def Python 代码(
self,
根模块:
字符串,
*,
详细模式:
布尔值 =
错误,
包含步长:
布尔值 =
错误,
包含设备:
布尔值 =
错误,
彩色:
布尔值 =
错误,
) 翻译
Python 代码:
""
将此 ``Graph`` 转换为有效的 Python 代码。
参数:
root_module (str): 要查找的根模块的名称
限定名称目标。这通常为'self'。
返回:
Python 代码对象,包含两个字段:
src:表示对象的 Python 源代码
globals:`src`中的全局名称字典 -> 它们引用的对象。
""
# [图命名空间]
#
# 生成的 Python 源代码中有两种符号:
# locals 和 globals。
# 本地变量是由图中的节点输出局部定义的。
全局是对外部对象的引用,例如函数或类型。
#
当生成 Python 代码时,我们需要确保命名合理。
特别地:
- 所有名称都应该是唯一的,以避免奇怪的阴影错误。
这些名称需要保持一致,例如一个对象应该始终
使用相同的名称进行引用。
#
为了做到这一点,我们为这个源创建一个新的命名空间。所有
打印出来的名称都必须来自这个命名空间。
#
为什么我们不能重用 node.name?因为它是在内部生成的
# 命名空间 `self._graph_namespace`。为了提供唯一性
# 在本地(node.name)和全局范围内都创建一个完全
# 新命名空间,用于放置所有标识符。
命名空间 =
_命名空间()
重写节点的 repr 以在命名空间内生成有效的名称。
# 由于 repr()旨在生成有效的 Python 表达式,因此
# 有意义地重用它是合理的。这样,只需调用 repr()即可打印出
# Tuple[节点, 节点]这样的内容。节点实现了__repr__方法,
# 以便允许这样做。
def 节点表示(n:
节点):
返回
命名空间.
创建名称(n.
名称, n)
@contextmanager
def 覆盖节点表示(graph:
图):
原始表示函数 = {}
为
节点
在 graph.
节点:
原始表示函数[
节点] =
节点._repr_fn
节点._repr_fn =
节点表示
尝试:
产生
无
最后:
# 恢复原始的 repr 函数
为
节点
在 graph.
节点:
节点._repr_fn =
原始 repr 函数[
节点]
与
覆盖节点 repr(self):
返回 self._python_code(
根模块,
命名空间,
详细模式=
详细模式,
包含步长=
包含步长,
包含设备=
包含设备,
彩色=
彩色,
)
def _python 代码(
self,
根模块:
字符串,
命名空间:
_命名空间,
*,
详细模式:
布尔值 =
错误,
包含步长:
布尔值 =
错误,
包含设备:
布尔值 =
错误,
彩色:
布尔值 =
错误,
) 翻译
Python 代码:
返回 self.
_代码生成._gen_python_code(
self.节点,
根模块,
命名空间,
详细模式=
详细模式,
包含步长=
包含步长,
包含设备=
包含设备,
彩色=
彩色,
)
def __str__(self) 翻译
字符串:
""
返回一个可读的(非机器可读)字符串表示
的这个图
""
占位符名称:
列表[
字符串] =
输入文本为空,请提供需要翻译的文本
# 这是一个只有一个元素的数组,以便 ``format_node`` 可以修改已关闭的
超值
可能返回类型名:
列表[
字符串] = [
输入文本翻译为简体中文为:""]
节点字符串 = [
节点.
格式节点(
占位符名称)
为
节点
在 self.
节点]
参数字符串 =
“,”.
连接(
占位符名称)
s = f"图("{
参数字符串}){
可能返回类型名[0]}
输入文本:
:"
翻译:
:
为 node_str
在 node_strs:
如果 node_str:
s += "输入文本翻译为简体中文为:\n " + node_str
返回 s
[文档] @兼容性(向后兼容=True)
def 打印表格(self):
"``"
打印图的中间表示形式为表格格式。请注意,此 API 需要安装 ``tabulate`` 模块。
格式。注意,此 API 需要安装 ``tabulate`` 模块。
格式。注意,此 API 需要安装 ``tabulate`` 模块。
"""
尝试:
从 tabulate 导入 tabulate
except ImportError:
输出文本:
打印(
`print_tabular` 依赖于库 `tabulate`,
该文件在此机器上找不到。运行 `pip "
安装 tabulate` 以安装库。
)
raise
node_specs = [[n.op, n.name, n.target, n.args, n.kwargs] for n in self.nodes]
print(
将节点规格表化,表头为["操作码", "名称", "目标", "参数", "关键字参数"]
)
[文档] @compatibility(
兼容旧版本=
是)
def 检查(self):
""
对此图进行各种检查以确保其结构良好。
特别是:
- 检查节点是否具有正确的所有权(由该图拥有)
- 检查节点是否按拓扑顺序出现
如果此图有拥有它的图模块,则检查目标是否存在
在该图模块中存在
""
检查拓扑顺序
def 检查参数(arg:
节点, n:
可选[
节点] =
无)
翻译
无:
context_str = f节点 '{n}' "
如果 n
否则
输入文本为空,请提供需要翻译的文本
如果 arg.
图 is
不是 self:
提升
运行时错误(
f"论点 '{arg}'{context_str}
不属于此图,"
f"但被用作参数!如果您正在从另一个图中复制节点,请确保在 node_copy() 上使用 ``arg_transform`` 来重置值
f"确保使用 ``arg_transform`` 在 node_copy() 上重置值
输入文本翻译为简体中文为:\n{self}"
)
如果
参数
不是
在
已见值:
提升
运行时错误(
f"参数 '"{arg}'{
上下文字符串}
在它被 " 使用之前已被使用过 "
f"已定义!请检查图中节点是否按拓扑顺序排列"
输入文本翻译为简体中文为:\n{self}"
)
已见名称:
设置[
字符串] =
设置()
已见值:
设置[
节点] =
设置()
为
节点
在 self.
节点:
如果
节点.
操作符
不是
在
法律操作:
提升
运行时错误(f
节点{
节点}
有未知操作码{
节点.
操作}
!)
如果
节点.
图 is
不是 self:
提升
运行时错误(f
节点 '{
节点}
不属于此图!)
如果
节点
不是
在 self.
_查找节点表:
提升
运行时错误(f
"节点 "{
节点}
未添加到侧表)
为
参数
在
节点.
输入节点:
检查参数(arg,
节点)
已见值.
添加(
节点)
如果
节点.
名称
在
已见名称:
提升
运行时错误(f
节点重新定义名称{
节点.
名称}
“”)
已见名称.
添加(
节点.
名称)
# 检查目标是否合法
如果 self.
拥有模块:
警告数量 = 0
最大警告数 = 5
为
节点
在 self.
节点:
如果
节点.
操作符 ==
调用函数:
如果
不是
可调用(
节点.
目标):
提升 ValueError(
f节点{
节点}
目标{
节点.
目标}
类型{
火炬.
类型名(
节点.
目标)}
但是 "
预期一个可调用对象
)
else:
如果
不是 isinstance(
节点.
目标,
字符串):
提升 ValueError(
f节点{
节点}
目标{
节点.
目标}
类型{
火炬.
类型名(
节点.
目标)}
但是 "
预期一个字符串
)
如果
节点.
操作符
在 [
获取属性,
调用模块
]
目标原子 =
节点.
目标.
分割(
“。”)
m_itr = self.拥有模块
为 i,
原子
在
列举(
目标原子):
新_m_itr = getattr(m_itr,
原子,
无)
已见 qualname =
“。”.
连接(
目标原子
[i])
如果
新的 m_itr is
无:
提升
运行时错误(
f节点{
节点}
目标{
节点.
目标}
引用不存在属性 "
f"{原子}
的{
已见的全名}"
)
如果
节点.
操作符 ==
"调用模块"
和
不是 isinstance(
新_m_itr,
火炬.
神经网络.
模块
):
提升
运行时错误(
f"节点"{
节点}
目标{
节点.
目标} {
原子}
的{
已见 qualname}
"不引用 nn.Module"
)
如果...否则 (
节点.
操作符 == "get_attr"
和
不是 isinstance(new_m_itr,
火炬.
神经网络.
模块)
和
不是 isinstance(
新_m_itr,
火炬.
神经网络.
参数)
和
原子
不是
在 m_itr.
_缓冲区
):
如果 num_warnings < MAX_WARNINGS:
# 不要过于频繁地发出此警告,
# 对于非常大的图,这可能会变得非常昂贵
从性能角度来说。
警告.
警告(
f"节点"{
节点}
目标{
节点.
目标} {
原子}
的{
已见 qualname}
"不引用 nn.Module、nn.Parameter 或 buffer,这通常是"
"‘get_attr’节点通常的目标"
)
警告数量 += 1
else:
m 迭代 =
新 m 迭代
如果
警告数量 >
最大警告数:
警告.
警告(
f"额外{
警告数 -
最大警告数}
警告 "
关于 get_attr 引用被抑制
)
[文档] @compatibility(
兼容旧版本=
是)
def 删除死代码(
self, 是不纯节点:
可选[
可调用[[
节点
]
布尔]] =
无
) 翻译
布尔:
""
删除图中所有无效代码,基于每个节点的数量
用户以及节点是否有任何副作用。图必须
在调用前进行拓扑排序。
参数:
is_impure_node (Optional[Callable[[节点], bool]]): 一个返回
判断一个节点是否不纯。如果此值为 None,则默认使用 Node.is_impure。
使用 Node.is_impure。
返回:
bool:是否由于该操作导致图发生了变化。
示例:
在消除死代码之前,`a = x + 1` 中的 `a` 没有使用者。
因此可以从图中删除,而不会产生影响。
.. 代码块 :: python
def forward(self, x):
a = x + 1
返回 x + self.attr_1
在删除死代码后,`a = x + 1` 已被移除,其余的
前向的 `forward` 仍然保留。
.. 代码块 :: python
def forward(self, x):
返回 x + self.attr_1
.. 警告::
死代码消除有一些启发式方法来避免删除
有副作用节点(见 Node.is_impure),但通常情况下覆盖率
非常糟糕,所以你应该假设这种方法是不可靠的
除非你知道你的 FX 图完全由以下内容组成
功能操作或您提供自己的自定义
用于检测副作用节点的函数。
""
首先对图进行 lint 检查,以确保其拓扑排序,否则
以下 DCE 将不会按预期行为。
self.检查()
def 有副作用(
节点):
如果
是不纯节点 is
不是
无:
返回
是不纯节点(
节点)
返回
节点.
是不纯的()
# 逆序迭代,当我们删除一个节点时,任何用作该节点输入的节点
# 的用户计数都会更新,不再反映真实情况
已删除的节点
已更改 =
假
为
节点
在
反转(self.
节点):
如果
不是
有副作用(
节点)
和
长度(
节点.
用户) == 0:
self.删除节点(
节点)
已更改 =
真实
返回
已更改
[文档] @兼容性(向后兼容=False)
def 设置_codegen(self, codegen: CodeGen):
self._codegen = codegen
[文档] @compatibility(
兼容旧版本=
错误)
def 生成代码(
self,
make_transformer: 可调用[[
可选[TransformCodeFunc]], TransformCodeFunc
]
):
"""注册生成 Python 代码时的转换器函数
参数:
make_transformer (Callable[[Optional[TransformCodeFunc]], TransformCodeFunc]):
返回一个要注册的代码转换器的函数。
此函数由`on_generate_code`调用以获取
代码转换器。
此函数也将当前作为其输入
注册的代码转换器(或如果没有注册则为 None)
如果不希望覆盖它,这很有用。
将链式代码转换器连接起来。
返回:
一个上下文管理器,当在`with`语句中使用时,会自动
恢复之前注册的代码转换器。
示例:
.. 代码块 :: python
gm: fx.GraphModule = ...
这是一个我们想要注册的代码转换器。This code
# transformer 在最前面添加了一个 pdb 导入和跟踪语句
# 生成的 torch.fx 代码开始,允许手动
使用 PDB 库进行调试。
定义插入 PDB 函数。
返回 ["import pdb; pdb.set_trace()\\n", *body]
注册`insert_pdb`并覆盖当前已注册的
# 代码转换器(由 `_` 传递给 lambda)
gm.graph.on_generate_code(lambda _: insert_pdb)
# 或者,注册一个代码转换器,它首先
# 将 `body` 通过已注册的转换器,然后
通过 `insert_pdb`:
gm.graph.on_generate_code(
lambda current_trans: (
lambda body: insert_pdb(current_trans(body) if current_trans else body)
)
)
gm.recompile()
gm(*inputs) # drops into pdb
此函数也可以用作上下文管理器,其优点是
自动恢复之前注册的代码转换器:
.. 代码块 :: python
# ...继续从上一个示例
使用 gm.graph.on_generate_code(lambda _: insert_pdb):
# 使用 `gm` 做更多的事情...
gm.recompile()
gm(*inputs) # 跳入 pdb
# 现在之前的代码转换器已恢复(但`gm`的代码与 pdb
# 仍然保留 - 这意味着您也可以在这里使用 pdb 运行`gm`,直到您
# 运行下一个`recompile()`)。
""
生成代码旧 = self.
代码生成器._body_transformer
self._codegen._body_transformer = make_transformer(代码生成旧版本)
@contextlib.contextmanager
def 生成代码上下文管理器():
尝试:
产生
最后:
self._代码生成.
_主体转换器 =
生成代码旧
返回
生成代码上下文管理器()
def _身份(x):
返回 x
def _创建颜色函数(
代码):
def f(s):
重置 = "
退格键[0m"
返回 f"{
代码}{s}{
重置}"
返回 f
_颜色代码 = {
"黄色": "
退格键[33m",
青色: "
退格键[36m",
绿色: "
退格键[32m",
蓝色: "
退格键[34m",
红色: "
退格键[31m",
暗淡: "
退格键[2m",
深蓝色: "
退格键[2m
退格键[34m",
"暗绿色": "
退格键[2m
退格键[32m",
}
_color_fns = {k: _make_color_fn(v) 为 k, v
在 _color_codes.
项目()}
_计数器正则表达式 =
正则表达式.
编译(r
计数器:(\d+))
反射魔法方法 = {
添加: "{} + {}",
减: "{} - {}",
乘: "{} * {}",
"地板除法": "{} // {}",
"真除法": "{} / {}",
"除法": "{} / {}",
"取模": "{} % {}",
"pow": "{} ** {}",
"lshift": "{}左尖括号{}",
"rshift": "{} >> {}",
"和_": "{} & {}",
"或_": "{} | {}",
"异或": "{} ^ {}",
getitem: "{}[{}
]],
矩阵乘法: "{} @ {}",
}
魔法方法 = {
eq: "{} == {}",
ne: "{} != {}",
"lt": "{} < {}",
"gt": "{} > {}",
"le": "{}≤{}",
"ge": "{}≥{}",
"正":
"+"{}",
"负":
“-”{}",
“反转”:
“~”{}",
**“可反射的魔法方法”,
}
内置方法 = {
iadd: "{} += {}",
"iand": "{} &= {}",
"ifloordiv": "{} //= {}",
"ilshift": "{} <<= {}",
"模": "{} %= {}",
"乘": "{} *= {}",
"矩阵乘": "{} @= {}",
"ior": "{}等于{}",
"ipow": "{} **= {}",
"irshift": "{} >>= {}",
"isub": "{} -= {}",
"itruediv": "{} /= {}",
"ixor": "{} ^= {}",
"setitem": "{}[{}] = {}",
}