快捷键

torch.distributed.pipelining._IR 的源代码

# mypy: 允许未类型化定义
版权所有(C)Meta Platforms,Inc. 及其关联公司
导入 复制
导入 记录日志
导入 操作符
来自 集合 导入 defaultdict
来自 枚举 导入 枚举
来自 检查 导入 参数, 签名, 签名
来自 类型 导入 方法类型
来自 打字 导入 任何, 可调用, 可选, 联合

导入 火炬
导入 torch.fx  fx
来自 torch.distributed 导入 进程组
来自 torch.export 导入 导出程序
来自 torch.export.unflatten 导入 (
    _分配属性,
    属性类型,
    _汇出参数,
    解释器模块,
)
来自 torch.fx 节点 导入 地图聚合
来自 torch.fx.passes.split_module 导入 split_module

来自 ._backward 导入 _null_coalesce_accumulate, stage_backward
来自 _unflatten 导入 _outline_submodules
来自 _utils 导入 管道信息
来自 .阶段 导入 _PipelineStage


记录器 = 记录.获取日志记录器(__name__)

# TODO:
# 1. 调查共享参数的梯度同步。DDP 是如何做到的?
# 2. 将参数移动添加到 split_module


定义 _从输出和规范中查找损失(输出值, spec_val):
    如果 spec_val  错误:
        返回 
    如果 spec_val  True:
        如果  isinstance(输出值, fx.节点):
            raise 运行时错误(
                f"损失 spec 必须指定一个动态值,但得到了"{输出值}"
            )
        返回 输出值

    如果 isinstance(spec_val, (元组, 列表)):
        如果  isinstance(输出值, (元组, 列表)):
            raise 运行时错误(
                f输出值{输出值}必须与损失指定类型匹配
                f"{spec_val}"
            )
        如果 len(输出值) != len(spec_val):
            raise 运行时错误(
                f输出值{输出值}必须与损失规格长度匹配
                f"{spec_val}"
            )
         输出, 规范  zip(输出值, spec_val):
            损失值 = 从输出和规范中查找损失(输出, 规格)
            如果 损失值   :
                返回 损失值
        raise 运行时错误(f"在规范中未找到损失值"{spec_val}")

    如果 isinstance(spec_val, 字典):
        如果  isinstance(输出值, 字典):
            raise 运行时错误(
                f"输出值"{输出值}必须匹配损失指定的类型
                f"{spec_val}"
            )
        如果 集合(输出值.()) != 集合(spec_val.()):
            raise 运行时错误(
                f"输出值"{输出值}必须匹配损失规范的关键字 "
                f"{spec_val}"
            )
         k  spec_val:
            loss_val = 从输出和规范中找到损失(输出值[k], 规范值[k]\)
            如果 损失值   :
                返回 损失值
        raise 运行时错误(f"在规范中未找到损失值"{spec_val}")

    raise 运行时错误(f"不支持类型"{类型(spec_val)}在损失规范中")


定义 _查找损失输出(模块: 火炬.神经网络.模块, g: fx., 输出损失值规范):
    输出节点 = [n  n  g.节点 如果 n.操作符 == 输出]
    断言 len(输出节点) == 1
    输出节点 = 输出节点[0]
    输出值 = 输出节点.参数[0]
    生成规范: 任何 = 

    如果 isinstance(模块, 简单损失包装器):
        # TrivialLossWrapper 是由 PiPPy 预定义的。
        # 它只有一个输出是损失,因此我们可以安全地假设第一个输出参数是损失。
        断言 len(输出节点.参数) == 1
        损失节点 = 输出值
        生成的规范 = 平凡损失包装器.损失规范
    elif 输出损失值规范  :
        使用默认规范,即搜索输出值中的“损失”
        如果 isinstance(输出值, 字典)  "损失"  输出值.():
            损失节点 = 输出值[损失]
            生成的规范 = {k: k == 损失  k  输出值}
        else:
            损失节点 = 
            生成规范 = 
    else:
        损失节点 = 从输出和规范中查找损失(输出值, 输出损失值规范)
        生成规范 = 输出损失值规范

    返回 损失节点, 输出节点, 生成规范


定义 _插入阶段符号反向(
    g: fx.,
    损失节点: fx.节点,
    输出节点: fx.节点,
):
    # 收集关于元组输出值的元数据。TODO:将其移动到 split_module 或 FX IR
    元组: 字典[fx.节点, 元组] = {}
     节点  反转(g.节点):
        如果 node.操作符 == 调用函数:
            # 在正向传播中,仅发出占位符、模块调用和
            # 获取项调用。如果在这个(仅正向)代码中我们有除获取项之外的目标,则存在一个错误。
            # (正向传播)代码中,存在一个错误。
            断言 node.目标 == 操作符.获取项, (
                "在正向传播中发现了非 getitem 调用。请向 PiPPy 报告一个错误"
            )
            断言 len(node.参数) == 2, (
                "发现了格式错误的 getitem 调用。请向 PiPPy 报告一个错误"
            )
            索引值, 节点索引 = 元组(node.参数)

            # 索引值是一个我们正在索引的集合。它可能
            存在于元组映射中,如果我们已经处理了另一个 `getitem`
            已经存在。
            existing_list_size = (
                len(tuples[索引值]\) 如果 索引值  元组 否则 -1
            )
            新列表大小 = 最大值(节点索引 + 1, 现有列表大小)

            重建后的列表 = [  _  范围(新列表大小)]

            如果存在则复制现有元素
            如果 索引值  元组:
                 i, val  列举(元组[索引值)]
                    重建列表[i] = val

            由此节点表示的值填充
            重建列表[node_idx] = 节点

            元组[索引值] = 元组(重建列表)

    # 记录支配损失节点的节点。
    # 我们只为可以贡献的节点发出反向操作。
    降至指定的损失值。
    live_nodes = {loss_node: }
    val_to_grad: 字典[fx.节点, 可选[fx.节点]] = {损失节点: }

    定义 分配或累积梯度(前向节点, 梯度值):
        如果 前向节点  估值梯度  前向节点.操作符 != 占位符:
            梯度值 = g.调用函数(
                空值合并累加,
                (值到梯度[前向节点], 梯度值),
            )
        梯度值[前向节点] = 梯度

     g.插入之前(输出节点):
         节点  反转(g.节点):
            如果 节点   活跃节点:
                continue

            定义 添加到实时节点(n):
                实时节点.setdefault(n, )

            fx.node.map_arg(node.参数, 添加到实时节点)
            fx.node.map_arg(node.kwargs, 添加到实时节点)
            如果 node.操作符 == 调用模块:
                输出梯度: 联盟[元组[可选[fx.节点], ...], 可选[fx.节点]]
                如果 节点  元组:
                    阶段输出 = 元组[node]
                    输出梯度 = 元组(val_to_grad.获取(n, )  n  元组[node]\)
                    带梯度的输出索引 = [
                        i  i, n  列举(元组[node]\) 如果 n  实时节点
                    ]
                else:
                    阶段输出 = (node,)
                    输出梯度 = 验证到梯度[node]
                    带梯度的输出索引 = [0]

                输出梯度 = (
                    (输出梯度,)
                    如果  isinstance(输出梯度, 元组)
                    否则 输出梯度
                )

                梯度调用 = g.调用函数(
                    反向阶段,
                    kwargs={
                        阶段输出: 阶段输出,
                        输出梯度: 输出梯度,
                        输入值: 列表(node.所有输入节点),
                        outputs_with_grads_idxs: outputs_with_grads_idxs,
                    },
                )
                插入反向阶段调试信息
                kwargs_copy = 字典(毕业调用.kwargs)
                毕业调用.kwargs = 关键字参数复制

                grad_call_proxy = fx.代理(grad_call)
                梯度 = grad_call_proxy.节点

                input_nodes = 列表(node.所有输入节点)
                grads 代理 = fx.代理(梯度)
                 i, 输入节点  列举(输入节点):
                    分配或累积梯度(输入节点, 梯度代理[i].node)  忽略索引

    返回 g


 管道顺序(火炬.神经网络.顺序的):
    @staticmethod
    定义 从顺序中(序列实例: 火炬.神经网络.顺序的):
        返回 管道顺序(*[复制.复制(m)  m  顺序实例]\)

    定义 前向(, 输入):
         i, 模块  列举():
            输入 = 模块(输入)
            如果 i != len() - 1:
                管道分割()
        返回 输入


 损失包装器(火炬.神经网络.模块):
    ""
LossWrapper 是一个方便的抽象类,允许您封装两者
您的模型以及其损失函数,并指定其连接性
输入、模型、损失函数和输出值。示例:

        class MyModelWrapper(LossWrapper):
            def forward(self, x, targets):
                model_out = self.module(x)
                loss_value = self.loss_fn(model_out, targets)
返回损失值

上述示例定义了一种连接性,我们期望正向/损失/反向
训练过程接受两个参数(x 和目标),将 x 传递到模块
以获取前向计算的输出,传递模型输出和
将目标值输入到损失函数中,获取并返回损失值,这将
该类将被 PiPPy 反向传播。上述类将实例化如下:

模型 = ...  # 实例化模型
loss_fn = torch.nn.MSELoss()  # 用于演示

        wrapper = MyModelWrapper(model, loss_fn)
        pipe = Pipe.from_tracing(wrapper, ...)

"文档"

    定义 初始化(, 模块, 损失函数):
        超级().初始化()
        .模块 = 模块
        .损失函数 = 损失函数

    定义 前向(, *参数, **kwargs):
        raise 不支持的操作异常(
            "此实例的 LossWrapper 没有重写"
            "forward()。请实现 forward()以指定参数,"
            模块与损失之间的连接,以及损失输出
            值。
        )


 简单损失包装器(损失包装器):
    定义 前向(, x, targets):
        模型输出 = .模块(x)
        返回 .损失函数(模型输出, targets)

    损失规范 = 真实


管道模型表示
#
可以将管道视为 `nn.Sequential++`。也就是说:它指定了
管道“阶段”的单个拓扑排序,当按顺序运行时,
构成了程序的所有操作。然而,与 `nn.Sequential` 不同,
管道允许非本地使用值,只要这些使用仍然尊重
拓扑排序。特别是:
#
# 1. 非局部激活。此类用法可能出现在,例如,跳过
连接。这些值将直接从“def”阶段传输
跳过中间阶段,对所有使用它们的阶段进行跳过。在自动微分过程中,
梯度将通过这个跳过连接反向传播,
直到激活在正向传播中传播的方式。
2. 非局部参数/模块调用。当使用参数时发生这种情况,
在它所在位置的下游阶段。这些值可以携带
#    类似于(1)进行前向,但此外可能还需要复制
#    在多个阶段上的值。这些共享参数的梯度将为
每个阶段分别累计,但将会有额外的
在优化器步骤之前进行梯度累积。


将 `_pipe_split()` 注册为 ATen 操作符。这是导出所必需的。
在图中保留此标记。
火炬.图书馆.定义(pippy::_pipe_split, () -> ())


@torch.图书馆.实现("pippy::管道分割", 后端选择)
定义 管道分割():
    返回 


@torch.图书馆.注册伪造("pippy::_pipe_split")  # 类型:忽略[重新定义]
定义 _pipe_split():  # noqa: F811
    返回 


# 为方便添加别名
aten_pipe_split_alias = 火炬.操作.pippy._pipe_split.默认

# 请导出时保留 `_pipe_split` 操作。
# 请参阅 pytorch/torch/fx/node.py 中的示例
fx.node._副作用函数.添加(aten_pipe_split_alias)


# 用户界面 API
[文档]def pipe_split(): """ 管道分割是一个特殊运算符,用于标记边界 模块中的阶段。它用于将模块拆分为阶段。它是一个 如果您的注释模块被急切地运行,则不执行任何操作。 示例: >>> # xdoctest: +SKIP >>> 定义 forward 方法 >>> x = torch.mm(x, self.mm_param) >>> x = torch.relu(x) >>> pipe_split() >>> x = self.lin(x) >>> return x 上述示例将被分为两个阶段。 "``" 返回 torch.ops.pippy._pipe_split()
多用途参数配置(枚举): 传输 = 1 复制 = 2 多用途参数规范 = 联盟[多用途参数配置, 字典[字符串, 多用途参数配置]] 断开执行器(fx.解释器): "" 特殊解释器以测试模式运行 split_gm,将所有输入分离 模块调用。这是必要的,以便边界处的值 自动微分执行中的叶子模块。 "文档" 定义 初始化(, 模块, 垃圾回收值=True): 垃圾回收值 = 超级().初始化(模块, 垃圾回收值) .值重映射 = {} 定义 run(, *参数, 初始环境=): # 类型:忽略[覆盖] .值重映射 = {} 返回 超级().run(*参数, 初始环境=初始环境) 定义 调用模块(, 目标, 参数, kwargs): 定义 断开张量(a): 如果 isinstance(a, 火炬.张量) a.需要梯度: 如果 a .值重映射: 新值 = a.detach().需要梯度_(True) .值重映射[a] = 新值 返回 .值重映射[a] else: 返回 a "" def dont_traverse_size(a): return type(a) != torch.Size "文档" args = map_aggregate( 参数, detach_tensors, # dont_traverse_size ) kwargs = map_aggregate( kwargs, 断开张量, # 不遍历大小 ) 返回 超级().调用模块(目标, 参数, kwargs) 定义 调用函数(, 目标, 参数, kwargs): # 将保存的输入张量重定向到 detach()后的版本 如果 目标 == 阶段反向: kwargs = 字典(kwargs) kwargs["输入值"] = [ .值重映射.获取(v, v) v kwargs[输入值] ] 返回 超级().调用函数(目标, 参数, kwargs) 节点引用: 定义 初始化(, 名称): .名称 = 名称 名称: 字符串 线性节点列表: 定义 初始化(, 节点列表): .序列化节点列表 = 输入文本为空,请提供需要翻译的文本 节点 节点列表: node_args 节点参数 = fx.node.map_arg(node.参数, lambda n: 节点引用(n.名称)) # type: ignore[arg-type,return-value] 节点参数 = fx.node.map_arg(node.kwargs, lambda n: _节点引用(n.名称)) # type: ignore[arg-type,return-value] 序列化节点 = fx.节点( =, # type: ignore[arg-type] 名称=node.名称, 操作=node.操作, 目标=node.目标, 参数=节点参数, # type: ignore[arg-type] kwargs=节点关键字参数, # type: ignore[arg-type] 返回类型=node.类型, ) 序列化节点.元数据 = 复制.复制(node.元数据) .序列化节点列表.追加(序列化节点) 定义 转换为图形(): = fx.() 引用字符串到节点: 字典[字符串, fx.节点] = {} 定义 引用到节点(arg): 如果 isinstance(arg, _节点引用): 返回 引用字符串到节点[arg.名称] else: 返回 参数 节点 .序列化节点列表: 节点参数 = map_aggregate(node.参数, 节点引用) 节点关键字参数 = map_aggregate(node.kwargs, 节点引用) 删除节点 = .创建节点( 操作=node.操作, 目标=node.目标, 参数=节点参数, # type: ignore[arg-type] kwargs=节点关键字参数, # type: ignore[arg-type] 名称=node.名称, type_expr=node.类型, ) 引用字符串到节点[node.名称] = 反序列化节点 返回 定义 _直接序列化反序列化(主体, 节点): "" 自定义 `__reduce__` 方法进行序列化。 照我说的做——别照我做的做。这违反了以下原则: GraphModules 通过代码导出和重新跟踪进行序列化。我们允许 因为管道阶段不应持久化 磁盘 -- 仅通过 RPC 传输**。持久化 这些实例写入磁盘将暴露内部实现 `fx.Graph`的详细信息及相关数据结构 不建议。 "文档" DummyModule(火炬.神经网络.模块): 定义 初始化(, 主体): 超级().初始化() .字典.更新(主体) 模拟 = 模拟模块(主体) 返回 fx.图模块(虚拟, 节点.to_graph()) 定义 _direct_serialization_reduce(): serialization_dict = 字典(.字典) serialization_dict.流行(_图) 返回 ( 直接序列化反序列化, (序列化字典, _线性节点列表(..节点)), ) 定义 _修改图操作设备( gm: 火炬.fx.图模块, 新设备: 火炬.设备, ): "" 修改图中所有 "call_function" 节点的设备参数。这 适用于将图移动到不同的设备。特别是对于 生成器操作,如 torch.ones。 "文档" 修改的 = 节点 gm..节点: 如果 node.操作符 == 调用函数: 如果 "设备" node.kwargs node.kwargs["设备"] != 新设备: 日志记录器.调试( f"更改节点设备"{node.名称}来自{node.kwargs[设备]}{新设备}" # 无需注意:G004 ) node.更新关键字参数("设备", 新设备) 修改 = 真实 elif node.操作符 == 调用模块: 递归修改子模块中的 "device" 子模块 = gm.获取子模块(node.目标) 如果 isinstance(子模块, 火炬.fx.图模块): _修改图操作设备(子模块, 新设备) elif isinstance(子模块, 解释器模块): 如果已经执行了展开,我们需要通过 `.graph_module` 访问其图模块 _修改图操作设备(子模块.图模块, 新设备) # type: ignore[arg-type] else: 日志记录器.警告( f跳过子模块的设备修改{node.目标}因为它是一个{类型(子模块)}" # noqa: G004 ) 如果 修改的: gm.重新编译()
[文档] 管道(火炬.神经网络.模块): 定义 初始化( , split_gm: fx.图模块, num_stages 阶段数: 整数, 有损失和反向传播: 布尔, 损失规范, ): # TODO: 有没有不硬编码初始化的方法? 火炬.神经网络.模块.初始化() .分割 GM: fx.图模块 = 分割_gm .执行器: 分离执行器 = 分离执行器(.分割_gm) .num_stages 阶段数: 整型 = 阶段数量 .有损失和反向传播 = 有损失和反向传播 .损失规范 = 损失规范 节点 split_gm 拆分 gm..节点: 断言 ( node.操作符 {调用模块, 占位符, 输出} 或者 (node.操作, node.目标) == (调用函数, 操作符.获取项) 或者 (node.操作, node.目标) == ("调用方法", "向后") 或者 (node.操作, node.目标) == (调用函数, 后退阶段) 或者 (node.操作, node.目标) == (调用函数, 空值合并累加) ), 节点 # 检测重复的参数,以便我们知道需要进行额外的 allreduce 操作 # 在应用优化器之前 # # 处理单个模块从不同阶段的调用情况,无论该模块调用是否由上述逻辑处理。 # 将参数值映射到字典,该字典将用户管道模块 # 映射到字典,该字典将用户管道模块 # 将参数值映射到字典,该字典将用户管道模块 # 到该模块内的本地 qualname 用户参数: 字典[火炬.神经网络.参数, 字典[字符串, 字符串]] = {} m_类名, 修饰 .分割_gm.命名子项(): p_类名, 参数 模块.命名参数。(): 参数到用户映射.setdefault(参数, {}) params_to_users 参数传递给用户[参数]]翻译m_qualname 翻译] = p_qualname .复制的参数: 列表[字典[字符串, 字符串]] = [ 使用映射 _, 使用映射 将参数传递给用户.项目() 如果 len(使用映射) > 1 ] 我们必须断开复制参数之间的别名关系,以确保在参考运行中数值的正确性。如果不这样做,分阶段执行的自动微分带将引用相同的张量值,并错误地多次应用梯度更新。因此,对于每个复制的参数集,我们为每个复制的参数集执行 deepcopy。 如果我们不这样做,分阶段执行的自动微分带将引用相同的张量值,并错误地多次应用梯度更新。 因此,对于每个复制的参数集,我们为每个复制的参数集执行 deepcopy。 因此,对于每个复制的参数集,我们为每个复制的参数集执行 deepcopy。 # values so that we have separate instances. 参数映射 .复制参数: 子模块名称, 参数全名 参数映射.项目(): 子模块 = getattr(.分割 GM, 子模块名称) 原子 = 参数全名.分割(“点”) 原子 原子[-1] 子模块 = getattr(子模块, 原子) setattr(子模块, 原子[-1], 复制.深拷贝(getattr(子模块, 原子[-1))))) 定义 (, *参数, **kwargs): raise 运行时错误( 要在本地运行管道,请直接调用 Pipe 对象,而不是`split_gm` ) .split_gm.前向传播 = 使子模块使用自定义直接序列化的 GraphModule i = 0 True: try: 名称 = fsubmod_{i}" 子模块 = getattr(.split_gm, 名称) 子模块..__reduce__ = _direct_serialization_reduce i += 1 除了 属性错误: 断开 定义 前向(, *参数, **kwargs): 执行器参数 = args 如果 len(kwargs) > 0: 参数 = 输入文本为空,请提供需要翻译的文本 节点 .split_gm 拆分 gm..节点: 如果 node.操作符 == 占位符: 如果 node.args len(node.参数) > 0: 参数.追加( 参数( node.目标, 参数.位置或关键字, 默认=node.参数[0], ) ) else: 参数类型 = 参数.位置或关键字 参数名称 = node.目标 如果 node.目标.以...开头("**"): 参数类型 = 参数.变量关键字 # 类型:忽略[赋值] 参数名称 = ` 的类型为 List[torch.Tensor][2] elif node.目标.以...开头("*"): 参数类型 = 参数.变量位置参数 # 类型:忽略[赋值] 参数名称 = ` 的类型为 List[torch.Tensor][1] 参数.追加(参数(` 的类型为 List[torch.Tensor], 参数类型)) 签名 = 签名(参数) ba = 签名.绑定(*参数, **kwargs) ba.应用默认值() 执行器参数 = ba.参数.() # 类型:忽略[赋值] res = .执行器.run(*执行器参数) 返回 res 定义 获取阶段模块(, 阶段索引: 整数) -> 火炬.神经网络.模块: "" 返回对应 `pipe` 中 `stage_idx` 的阶段模块。 "文档" 如果 stage_idx < 0 或者 stage_idx >= .num_stages 阶段数: raise ValueError(f"无效的阶段索引{stage_idx}) 返回 getattr(.split_gm, f"submod_"{阶段索引}") @staticmethod 定义 _向前阶段的数量和计数(gm: fx.图模块): 阶段数量 = 0 找到索引: 字典[整数, ] = {} 节点 gm..节点: 如果 node.操作符 == "调用模块" node.目标.以...开头(submod_): node.元数据[stage_idx] = 整数(node.目标[len(submod_) ]) found_idxs.setdefault(node.元数据["stage_idx"]\) 阶段数量 += 1 # 这条断言会在插入到第一层之前的位置插入分割点时失败,这会创建一个空的第一子模块 # 更新:以下断言可能对某些 torch 版本 >= # 2.2.0,因为: # 子模块_0, 子模块_1, 子模块_2, ... # 可能命名为 # 子模块_0, 子模块_2, 子模块_4, ... # TODO: 调查 断言所有(i in found_idxs for i in range(num_stages)) 返回 阶段数量 @staticmethod 定义 _from_traced( 模块: 火炬.神经网络.模块, 导出的程序: 导出程序, 多用途参数规范: 可选[多用途参数规范] = , 输出损失值规范=, 分割策略: 可选[ 可调用[[火炬.fx.图模块], 火炬.fx.图模块] ] = , ): "" 此外,可以指定 `output_loss_value_spec` 值以消除歧义 输出 `forward` 中的哪个值是 PiPPy 应用的损失值 反向传播。例如,如果你的 `forward` 返回一个元组 `(loss, model_out)`, 你可以指定 `output_loss_value_spec=(True, False)`。或者,如果你的 `forward` 返回 一个字典 `{'loss': loss_value, 'model_out': model_out}`,你可以指定 `output_loss_value_spec={'loss': True, 'model_out': False}` "文档" 追踪 = 导出的程序.模块() 如果 分割策略 : 日志记录器.信息(自动拆分模型) 追踪 = 分割策略(跟踪的) # type: ignore[arg-type] 日志记录器.调试(跟踪的.打印可读(打印输出=错误)) # type: ignore[operator] # 去重 `get_attr` 节点,这些节点引用相同的参数。移动参数的下游代码依赖于参数访问只发生一次的假设。这并不一定是情况(特别是自定义跟踪器),所以在这里修复它。 # 参数访问只发生一次的假设。这并不一定是情况(特别是自定义跟踪器),所以在这里修复它。 # 参数访问只发生一次的假设。这并不一定是情况(特别是自定义跟踪器),所以在这里修复它。 获取属性节点: 字典[字符串, fx.节点] = {} 节点 跟踪的..节点: # 类型:忽略[联合属性] 如果 node.操作符 == 获取属性: 获取属性节点.setdefault(node.目标, node) 如果 获取属性节点[node.目标] != node: node.替换所有引用(获取属性节点[node.目标]\) 跟踪的..删除节点(node) # 类型:忽略[运算符,联合属性] 避免查看下一个节点,通过跟踪上一个管道分割 prev_pipe_split_idx = -1 pipe_split_nodes_to_erase = 集合() i, 节点 列举(跟踪的..节点): # 类型:忽略[arg-type, union-attr] 如果 (node.操作, node.目标) == (调用函数, 管道分割): 如果 prev_pipe_split_idx == i - 1: pipe_split_nodes_to_erase.添加(node) prev_pipe_split_idx = i 节点 pipe_split_nodes_to_erase: 跟踪的..删除节点(node) # 类型:忽略[运算符,联合属性] 跟踪的.重新编译() # type: ignore[operator] part_idx = 0 定义 split_callback(n: fx.节点): 非局部 part_idx 如果 (n.操作, n.目标) == ( 调用函数, aten_pipe_split_alias, ): 日志记录器.调试(f"Found pipe_split"{part_idx}") # 无需注意:G004 部分索引 += 1 返回 部分索引 # TODO:split 在模块调用中做什么?它是否会移动模块? 放入子模块中? 分割 = 分割模块(跟踪的, 模块, 分割回调) # type: ignore[arg-type] # 一个(自定义)的跟踪器可能会产生死代码,例如孤立的 get_attr 节点 分割..删除死代码() # 优化孔以移除 pipe_split 子模块 分割.模块(): 如果 isinstance(子模块, fx.图模块): 节点 子模块..节点: 如果 (node.操作, node.目标) == ( 调用函数, aten\_pipe\_split\_alias, ): 子模块..删除节点(node) 子模块.重新编译() 名称, 子模块 分割.命名子项(): 如果 isinstance(子模块, fx.图模块): 新子模块 = _子模块概要(子模块.) # 替换旧子模块 分割.注册模块(名称, 新子模块) # TODO: 将此回滚到 split_module 定义 删除用户引用(node, 用户): "" 删除 `node` 从 `user` 的参数列表中的引用。 参数: - node: 根目录处的 `get_attr` 节点。 - 用户:使用 `node` 的子模块节点。 "文档" 断言 len(用户.kwargs) == 0 使用 idxs = [i i, 参数 列举(用户.参数) 如果 参数 == node] 断言 len(使用索引) == 1 参数复制 = 列表(用户.参数) 参数复制.流行(使用索引[0]\) 用户.args = 元组(args_copy) 日志记录器.调试( f"已删除"{node}来自用户{用户},参数索引 = {使用 idxs[0]}" # noqa: G004 ) # 延迟删除的参数引用列表。 # 将累积到 `move_param_to_callee` 中。 要删除 = [] 定义 _递归获取父级属性(模块, 完全限定名): # 返回给定嵌套完全限定名(FQN)的 getattr 调用及其最后一个父级 原子 = 完全限定名.分割(“点”) 原子 原子[-1] 如果 有属性(模块, 原子): 返回 , 修饰 = getattr(模块, 原子) 如果 有属性(模块, 原子[-1)] 返回 模块, 属性 = getattr(模块, 原子[-1]\) 返回 模块, 属性 定义 将参数移动到调用者( , 被叫方名称, 参数全限定名, ): "" 将参数从根模块移动到子模块。 参数: root:根模块。 被调用者名称:要将参数移动到的子模块名称。 param_fqn:要移动的参数的完全限定名称。 "文档" `atoms` 是一个表示原始模型中参数路径的字符串列表。 # parameter in the original model 原子 = 参数全称.分割(“点”) 模块迭代, 参数值 = _递归获取父级属性(分割, 参数全称) # 检查参数是否为缓冲区或参数 是缓冲区 = 原子[-1] mod_itr._缓冲区 检查参数是否为张量 断言 isinstance(param_val, 火炬.张量), ( f"预期 '{全局参数名}{火炬.张量}但是得到了{类型(参数值)} + ( f"可能发生,如果模块 '"{param_fqn}' 被传递给某个 '叶子函数' f"(请参阅 https://maskerprc.github.io/docs/stable/fx.html#fx.wrap)。请检查 " f"用法示例"{param_fqn}在追踪图中使用。 如果 isinstance(param_val, 火炬.神经网络.模块) 否则 请提供需要翻译的文本 ) ) 获取子模块 被叫方 = .获取子模块(被叫方名称) 断言 有属性(被叫方, 全局参数名称), ( f模块{被调用者名称}已经有一个名为{param_fqn}" ) # 将参数分配给子模块 如果 是缓冲区: _分配属性( param_val, 被调用者, 全局参数名称, 属性类型=属性类型.缓冲区, 持久性=True, # TODO: 处理非持久化缓冲区 ) else: _分配属性( 参数值, 被调用者, param_fqn, 属性类型=属性类型.参数, ) 日志记录器.调试(f"移动参数"{param_fqn}{调用者名称}") # noqa: G004 下一步是将子模块的占位符替换为 get_attr。 这些占位符是由每个子模块内的 split_module 创建的。 子模块。 更新:此步骤现在已移动到_sink_params。 `_sink_params` 可以递归地完成(例如,对于模块内的子模块) 子模块) 删除.追加((模块迭代器, 原子[-1])) 获取根模块中所有参数的列表 属性节点 = 列表(过滤(lambda n: n.操作符 == 获取属性, 分割..节点)) 节点 属性节点: 检查参数是否仅在子模块中使用 如果 len(node.用户们) > 1: 日志记录器.信息( f"参数"{node.目标}在多个阶段使用:{node.用户们} # 无需注意:G004 ) 用户 node.用户们: 断言 用户.操作符 == "调用模块" 将参数移动到子模块中 移动参数到调用者( 分割, 用户.目标, node.目标, ) # [别名] 存储张量 ID -> 由状态字典构建的全限定名称列表 也分配非持久缓冲区 id_to_fqns: 字典[整数, 集合[字符串]] = 默认字典(集合) 完全限定名, 张量 模块.state_dict(保留变量=True).项目(): id_to_fqns[id(张量)].添加(完全限定名) 完全限定名, 张量 模块.命名缓冲区(): id_to_fqns[id(张量)].添加(完全限定名) # 在将参数移动到它们对应的层级后,我们还需要 # 将 `get_attr` 节点从图根移动到那些 # 层级。 # [别名] 使用 id -> fqn 映射列出所有有效的 FQN 输入到状态: 字典[字符串, 列表[字符串]] = {} 属性 属性节点: _, 张量 = _带父节点递归获取属性(模块, 属性.目标) fqns = 列表(id 转完全限定名[id(张量)]]) 如果 fqns: 输入到状态[属性.名称] = fqns elif 属性.目标 导出的程序.常量: # 提升的常量 输入到状态[属性.名称] = [属性.目标] # 每个子模块分割时,为 FQN 全限定名分配可能使用的属性。 # 我们根据 FQN 属性父元素是否存在来确定这一点。 # 例如,如果最后一个子模块存在,则分配该属性。 添加的属性: 字典[字符串, 列表[字符串]] = 默认字典(列表) 完全限定名, 张量 模块.state_dict(保留变量=True).项目(): 名称, 子模块 分割.命名子项(): 如果 isinstance(子模块, fx.图模块): 父节点, 儿童 = 递归获取父级属性(子模块, 完全限定名) 如果 ( 父级 儿童 ): # 父级存在,属性不存在 -> 分配 添加属性[名称].追加(完全限定名) setattr(父节点, 完全限定名.分割(“点”)-1], 张量) # 延迟删除:从原始属性(到 params)中移除 # 根 GraphModule 模块迭代, 最后的原子 删除: try: delattr(模块迭代, 最后的原子) 除了 属性错误: 如果参数在多个阶段使用,则预期如此 通过 这是由每个子模块的 `_sink_params` (1) 完成的 名称, 子模块 分割.命名子项(): 如果 isinstance(子模块, fx.图模块): _汇出参数(子模块, 输入到状态, [] 子模块..检查() 子模块.重新编译() # [别名] 这一步并非特别必要,但有助于减少参数使用/内存。 # 在运行完 _sink_params() 程序后,清理我们之前添加的未使用属性。 # 根据 get_attr 节点确定这一点 - 如果未使用,则删除。 名称, 属性 添加的属性.项目(): 子模块 = getattr(分割, 名称) 未使用的属性 = 集合(属性) # 在子模块中跟踪使用属性,对子图层次结构运行 DFS = [输入文本翻译为简体中文为:"", 子模块)] # (作用域,子模块) : 范围, _mod = .流行() 如果 isinstance(_mod, (fx.图模块, 解释器模块)): 节点 _mod..节点: 如果 node.操作符 == 获取属性: # 获取_attr 可能获取更深层次的属性 完全限定名 = 范围 + "." + node.目标 如果 范围 否则 node.目标 未使用属性.丢弃(完全限定名) 名称, _子模块 _模块.命名子项(): .追加((范围 + "." + 名称 如果 范围 否则 名称, _子模块)) # 删除未使用的属性 属性 未使用属性: 模拟迭代, 原子 = 子模块, 属性.分割(“点”) 原子 原子[-1] mod_itr = getattr(mod_itr, 原子) delattr(mod_itr, 原子[-1]\) 节点 属性节点: # 并且(2):从子模块的参数列表中移除 `get_attr` 节点 用户 复制.复制(node.用户们): 断言 用户.操作符 == "调用模块" 删除用户引用(node, 用户) # 并且(3):从根图中移除 `get_attr` 节点 分割..删除节点(node) 分割.删除所有未使用的子模块() 分割..检查() 分割.重新编译() 阶段数量 = 管道._阶段数量及正向阶段(分割) 具有损失和反向 = 生成损失规范 = 输出损失值规范 如果 输出损失值规范 : 损失节点, 输出节点, 生成损失规范 = _查找损失输出( 模块, 分割., 输出损失值规范 ) 如果 损失节点 : _插入阶段符号反向( 分割., 损失节点, 输出节点, ) 分割.重新编译() 具有损失和反向 = 真实 日志记录器.调试("管道处于训练模式,生成了反向传播") else: raise 运行时错误( f"根据输出未找到任何损失值"{output_loss_value_spec=}" ) else: 日志记录器.调试("管道处于推理模式,未生成反向传播") 日志记录器.调试(f全管道模型:输入文本翻译为简体中文为:\n{分割}") # noqa: G004 返回 管道( 分割, num_stages 阶段数, 有损失和反向传播, 生成的损失规范, ) 定义 打印可读(): "" 以人类可读的格式打印管道。 这将打印根管道以及每个阶段模块。 "文档" .split_gm.打印可读() @staticmethod 定义 _trace_with_export( 模块: 火炬.神经网络.模块, 示例参数: 元组[任何, ...], 示例关键字参数: 可选[字典[字符串, 任何]] = , ) -> 导出程序: 日志记录器.信息("追踪模型……") try: ep = 火炬.导出.导出用于训练( 模块, 示例参数, 示例关键字参数, ) 除了 异常 e: raise 运行时错误( "似乎我们无法将您的模型作为一个完整的图来捕获。" "常见原因包括图断裂、数据/形状相关。" "控制流程,或者缺少自定义算子的元内核。" "您可以使用我们的手动管道接口,或者尝试修复。" "图断开,请参阅 https://maskerprc.github.io/docs/stable/export.html" ) 来自 e 返回 ep @staticmethod 定义 from_tracing( 模块: 火炬.神经网络.模块, 示例参数: 元组[任何, ...], 示例关键字参数: 可选[字典[字符串, 任何]] = , 分区策略: 可选[可调用[[fx.图模块], fx.图模块]] = , ): 如果一个参数将在多个管道阶段中使用,我们默认将策略设置为在各个阶段之间 REPLICATE'ing(复制)该参数,而不是 TRANSMIT'ing(传输)它 stages(阶段)而不是 TRANSMIT'ing(传输)它 多用途参数规范 = 多用途参数配置.REPLICATE # 确定哪个输出来自 output_chunk_spec 的输出是损失 输出损失值规范: 任何 = # 已废弃 "" 如果 output_chunk_spec 不为 None: output_loss_value_spec = map_aggregate( output_chunk_spec, lambda v: isinstance(v, _LossReducer) ) "文档" # 跟踪导出 导出程序 = 管道._trace_with_export( 模块, 示例参数, 示例关键字参数, ) 管道 = 管道._from_traced( 模块, 导出的程序, 多用途参数规范, 输出损失值规范=输出损失值规范, 分割策略=分割策略, ) # 用户希望第一个管道阶段接受 kwargs,如果原始程序 # 支持。这由图中的 `_codegen` 字段控制, # 这里我们进行复制。注意:我们只想获取输入规范,因为输出规范是最后阶段的。 # 可能是 TODO?还没有确定。 # TODO? 分割 = 管道.分割_gm 追踪 = 导出的程序.模块() 子模块 0 = 下一(迭代(分割.儿童())) submod0_sign = 签名(submod0.前向) model_sign = 签名(跟踪的.前向) 如果 len(model_sign.参数) != len(submod0_sign.参数): # 我们不改变第一阶段如果它需要 # 与原始模型不同的参数数量 日志记录器.信息( f"原始模型需要"{len(模型签名.参数)}参数,但是 " # noqa: G004 f"第一个管道阶段是"{len(submod0_sign.参数)}. 请为各个管道阶段提供参数。 ) else: # 支持第一阶段的 kwargs submod0..代码生成器 = 复制.深拷贝(跟踪的..代码生成器) # 类型:忽略[联合属性] `_replace`实际上并不是“私有”或“内部”的。根据此文档: 为了防止与字段名称冲突,方法和属性名称 应以下划线开头 submod0..代码生成器.Py 树信息 = ( # 类型:忽略[联合属性] 子模块 0..代码生成器.pytree 信息.替换(输出规范=) # 类型:忽略[运算符,联合属性] ) 子模块 0.重新编译() 返回 管道 定义 __str__(): 返回 .分割_gm.__str__() 定义 __repr__(): 返回 .分割_gm.__repr__() 定义 信息() -> 管道信息: "" 获取管道信息。 返回 ------- 管道信息 包含关于管道信息的数据类。 "文档" 返回 管道信息( =.split_gm., num_stages 阶段数=.num_stages 阶段数, 具有损失和反向传播=.有损失和反向传播, ) 定义 构建阶段( , 阶段索引: 整数, 设备: 火炬.设备, 群组: 可选[流程组] = , ) -> _管道阶段: "" 根据阶段索引和分布式组创建一个 `管道阶段`。 `PipelineStage` 可以与 `PipelineSchedule`s 一起运行。 "文档" # 查找阶段模块 阶段模块 = .获取阶段模块(阶段索引) # 将操作参数移动到设备 # 今天 PT2 跟踪器不将 `x.device` 视为符号设备; # 相反,跟踪时间的设备被烧录到了生成的代码中 # 代码。这里我们为用户提供了一个手动修改“device”参数的解决方案。 # 操作的“device”参数可能包括:`torch.ones`、`torch.zeros`、`torch.rand`等。 # `torch.ones`、`torch.zeros`、`torch.rand`等。 如果 isinstance(stage_module, 火炬.fx.图模块): 修改图操作设备(阶段模块, 设备) else: 日志记录器.警告( f预期得到一个 `torch.fx.GraphModule`,但得到了{类型(阶段模块)}" 注释 ) 断开管道信息 注意:请小心包含在 `pipe_info` 中的内容。我们不希望保留 `Pipe` 或 `Pipe.split_gm` 的引用,这会阻止 Python 回收它们。当 Python 回收它们时,其他阶段模块(它们) #(与当前排名无关的)可以自动释放。 管道信息 = .信息() 返回 _管道阶段(阶段模块, 阶段索引, 管道信息, 设备, 群组)
[文档]class 分割点(Enum): """ 枚举表示在子模块执行过程中可以发生拆分的点。 属性: BEGINNING:表示在`forward`函数执行某个子模块之前添加拆分点。 END:表示在`forward`函数执行某个子模块之后添加一个分割点。 """ BEGINNING = 1 END = 2
为了向后兼容,我们保留了 PipeSplitWrapper 类,因为`SplitPoint`曾经定义在这个类中。 `SplitPoint`曾经定义在这个类中。
管道分割包装器: 为 BC 创建一个类别名 分割点 = 分割点 定义 _split_before_forward(, *参数, **kwargs): 管道分割() 返回 ._orig_forward(*参数, **kwargs) 定义 _split_after_forward(, *参数, **kwargs): try: 返回 ._orig_forward(*参数, **kwargs) 最后: 管道分割() 定义 标注分割点(模块: 火炬.神经网络.模块, 规格: 字典[字符串, 分隔点)] # TODO: 使此实现成为原地操作? qualname, 分割类型 规格.项目(): 原子 = qualname.分割(“点”) 前置模块 = 修饰 i, 原子 列举(原子[-1)] try: 前置模块 = getattr(前驱模块, 原子) 除了 属性错误 e: raise 属性错误( f"指定的目标"{qualname}引用的 f"不存在的模块"{.连接(原子[ i + 1])}" ) 来自 e 包裹模块 = getattr(前置模块, 原子[-1]\) 包裹模块._原转发 = 包装模块.前向传播 如果 分割类型 == 分割点.开始: 包裹模块.前向传播 = 方法类型(_向前分割前, 包裹模块) elif 分割类型 == 分割点.结束: 包装模块.前向传播 = 方法类型(_在正向后分割, 包装模块) else: raise ValueError("未知分割点类型。")
[文档]定义 管道( 模块: 火炬.神经网络.模块, mb 参数: 元组[任何, ...], mb 参数: 可选[字典[字符串, 任何]] = , 分割规范: 可选[字典[字符串, 分割点]] = , 分割策略: 可选[可调用[[fx.图模块], fx.图模块]] = , ) -> 管道: "" 根据规范拆分模块。 更多详情请参阅 `管道`。 参数 --------- 模块: 要分割的模块。 mb_args: 示例位置参数输入,以微批形式。 mb_kwargs: 示例关键词输入,以微批形式呈现。(默认:`None`) split_spec: 使用子模块名称作为分割标记的字典。(默认:`None`) split_policy: 用于拆分模块的策略。(默认:`None`) 返回 ------- 类 `Pipe` 的管道表示。 "文档" 如果 split_spec split_policy : raise ValueError( "不能同时指定 `split_spec` 和 `split_policy`。请只使用其中一个。" ) 如果 split_spec : # 根据用户指定在模块中标注分割点 annotate_split_points(模块, 分割规范) 返回 管道.跟踪来源( 模块=模块, 示例参数=MB 参数, 示例关键字参数=mb_kwargs, ) else: 使用拆分策略 返回 管道.来自跟踪( 模块=模块, 示例参数=多参数, 示例关键字参数=多关键字参数, 分区策略=分区策略, )

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源