快捷键

torch.distributed.pipelining.stage 的源代码

# mypy: 允许未类型化定义
版权所有(C)Meta Platforms,Inc. 及其关联公司
导入 记录日志
导入 操作符
来自 abc 导入 ABC, 抽象方法
来自 打字 导入 任何, 可调用, 角色, 可选, 联合

导入 火炬
导入 torch.distributed  dist
导入 torch.fx  fx
导入 torch.nn  神经网络
来自 torch._subclasses.fake_tensor 导入 模拟 Tensor
来自 torch.distributed.fsdp 导入 FSDP 模块, 全部分片
来自 torch.fx 节点 导入 参数, 地图聚合
来自 torch.nn.parallel 导入 分布式数据并行
来自 torch.utils._pytree 导入 树图模式

来自 ._backward 导入 后退阶段, 阶段后向输入, 阶段后退权重
来自 ._调试 导入 map 调试信息
来自 _utils 导入 展平参数, 管道信息, 验证张量元数据


__all__ = [
    管道阶段,
    构建阶段,
]

日志记录器 = 记录日志.获取日志记录器(__name__)


def _normalize_model_output_as_tuple(输出: 任何) -> 元组[任何]
    """[注意:管道模型输出类型]"""

模型传递给管道的输出可以是任何类型,由用户控制。

然而,有两个 API 界面使这个问题变得复杂。
(1) 中间阶段的输出通过 Send/Recv 操作传递给后续阶段。隐含的假设是每个输出元素都是一个张量。否则,Send/Recv 将不被支持。例外情况是模型的最后一层,它可以输出任何东西,而这些东西不会通过 Send/Recv 进行通信。
是的,每个输出元素都是一个张量。否则,Send/Recv 将不被支持。例外情况是模型的最后一层,它可以输出任何东西,而这些东西不会通过 Send/Recv 进行通信。
是的,每个输出元素都是一个张量。否则,Send/Recv 将不被支持。例外情况是模型的最后一层,它可以输出任何东西,而这些东西不会通过 Send/Recv 进行通信。
(2) 模型的最后一层的输出返回给用户,或者传递给损失函数。
损失函数可以以任何方式编写,使其输入与模型的输出相匹配。

如果我们可以严格指定包装模型的管道阶段的输出签名,那就方便了,
但我们不想对用户提供的模型施加不必要的约束。

目前,我们允许用户提供的模型从每个阶段返回张量或张量元组。由于
torch.export 导出跟踪,编译后的模型也可能返回一个列表而不是元组,我们将将其规范化回
元组用于一致性。

待办:我们是否应该更严格地断言阶段模块(中间和输出)只返回 Tensor
值?
"文档"
    if 类型(输出)  列表:
        这是一个为了解决导出创建列表格式输出的临时解决方案
        # output in list format
        输出 = 元组(输出)

    将输出形式统一为元组,以便与 `act_send_info` 容易对应
    # `act_send_info`
    输出元组 = 输出 if 类型(输出)  元组 否则 (输出,)
    返回 输出元组


 根参数占位符:
    ""
模型级别输入的占位符。
"文档"

    定义 初始化(, 张量):
        .元数据 = 张量.(元数据)


 接收信息:
    ""
表示一个阶段输入。
"文档"

    定义 初始化(
        self,
        输入名称: 字符串,
        : 整数,
        缓冲区: 火炬.张量,
    ):
        # 此输入的名称
        self.输入名称 = 输入名称
        # 此输入源的阶段索引
        self. = 
        缓冲区接收输入
        self.缓冲区 = 缓冲区

    定义 __repr__(self):
        返回 f"_RecvInfo(input="{self.输入名称}, 来源={self.}, 形状={self.缓冲区.尺寸()})"


# 输入可以是接收到的激活或模型输入
输入信息 = 联盟[接收信息, 根参数占位符]


定义 从元数据创建张量(
    示例: 联盟[PyTorch.张量, 假 Tensor],
    设备: PyTorch.设备,
) -> PyTorch.张量:
    ""
从一个张量创建一个真实张量。
"文档"
    返回 PyTorch.空的(
        示例.尺寸(),
        数据类型=示例.数据类型,
        布局=示例.布局,
        设备=设备,
    )


 _PipelineStageBase(ABC):
    ""
管道阶段的基类。
定义或实现 `_PipelineStage` 使用的常用方法
手动前端使用的跟踪前端和 `PipelineStage`。
"文档"

    定义 初始化(
        self,
        子模块: PyTorch.神经网络.模块,
        stage_index: 整数,
        阶段数量: 整数,
        设备: PyTorch.设备,
        群组: 可选[距离.流程组] = ,
        dw_builder: 可选[可调用[] 可调用[..., ]]] = ,
    ):
        ""
参数:
子模块(torch.nn.Module):在此阶段要执行模块。
stage_index(int):此阶段的索引。
num_stages(int):此管道中的阶段总数。
device (torch.device):运行此阶段的设备。
group(可选 dist.ProcessGroup):用于通信的进程组。
如果为 `None`,则使用默认进程组。
默认:`None`。
dw_builder(可选[Callable[[], Callable[..., None]]]):如果提供,dw_builder 是一个构建函数
该函数将构建一个新的 dw_runner 函数,用于运行模块反向过程中有意跳过的部分
构建函数必须在每个阶段运行模型反向后调用,并且阶段应保存最新的 dw_runner 以在权重传递(W)期间运行
模型反向,并且阶段应保存最新的 dw_runner 以在权重传递(W)期间运行
如果未提供,将通过遍历 autograd 图自动生成 dw_runner。
当与仅包含 F 和 B 步骤的调度一起使用时,将调用新的 dw_runner 函数作为 I(输入反向)的一部分。
当与 F、I、W 调度一起使用时,dw_runner 函数实现'W'。
"文档"
        超级().初始化()
        如果 阶段索引 >= 阶段数量:
            抛出 ValueError(
                f"阶段索引"{stage_index}超出了范围{阶段数量}"
            )

        self.子模块 = 子模块
        self.阶段索引 = 阶段索引
        self.阶段数量 = 阶段数量
        self.设备 = 设备
        self. = 

        self.dw_builder = dw_builder

        # 向后状态
        self.backward_state: 字典[整数, 元组[任意, ...]] = {}

        # 将 dw_runner 存储在每个 microbatch_id 中
        self.dw_runner: 字典[整数, 可调用[..., ]] = {}

        # `group_rank` 是在进程组 `group` 中的排名。
        self.组排名 = 距离.获取排名(self.群组)
        self.组大小 = 距离.获取世界大小(.群组)
        如果 .组大小 > .阶段数量:
            raise 运行时错误(
                f"管道组大小"{.组大小}不能大于阶段数量{.阶段数量}"
            )

        运行时状态
        ._输出元数据: 可选[元组[火炬.张量, ...]] = 
        # 将微批处理 ID 映射到前向张量参数列表
        .前向缓存: 字典[整数, 元组[任何, 列表[火炬.张量]]] = {}
        将微批处理 ID 映射到反向梯度张量参数列表
        .bwd_cache: 字典[整数, 元组[可选[火炬.张量], ...]] = {}
        # 缓存块输出以进行最终输出合并或缩减
        .输出块: 列表[任何] = []

        # 初始化 has_backward 为 false;如果 loss 函数传递给 pipeline schedule,则将其设置为 true
        # 函数传递给 pipeline schedule
        .has_backward = 
        # 日志前缀
        .日志前缀 = f"[阶段"{.stage_index}]"

        # 前向基础设施
        .args_recv_info: 字典[整数, 元组[输入信息, ...]] = {}
        .发送信息操作: 字典[整数, 列表] = {}

        后向基础设施将懒加载创建
        .grad_recv_info: 字典 = {}
        .毕业发送信息: 可选[列表] = 

        将由调度器稍后填充
        .数据块们: 可选[整数] = 
        .阶段索引到组排名: 字典[整数, 整数] = {
            i: i % .组大小  i  范围(自身.阶段数)
        }

    @property
    def 有后向(自身) -> 布尔:
        ""
该阶段是否有反向传播。
"文档"
        返回 .是否有反向操作

    @has_backward.设置器
    定义 has_backward(, has_backward: 布尔):
        .是否有反向操作 = has_backward

    @property
    定义 is_first():
        ""
返回 true,如果这个阶段是管道中的第一个阶段。
"文档"
        返回 .阶段索引 == 0

    @property
    定义 是最后一个():
        ""
如果这个阶段是管道中的最后一个阶段,则返回 true。
"文档"
        返回 .阶段索引 == .阶段数量 - 1

    定义 检查块 ID(, chunk_id: 整数):
        if .数据块  :
            raise 运行时错误(
                尝试在配置块之前访问 chunk_id。
            )
        if chunk_id >= .数据块们:
            raise 运行时错误(
                f块 ID{chunk_id}超出范围 [0,]{.数据块们})"
            )

    定义 配置输出元数据(self, 输出元数据: 元组[火炬.张量, ...)]
        ""
跟踪此阶段的输出形状/数据类型,因为它们决定了必须匹配的发送操作(们)
下一阶段的接收操作。下一阶段_将_基于其初始状态冻结其接收缓冲区。
配置,因此冻结/验证输出端也很重要,以避免任何发送/接收不匹配
这可能导致挂起、静默损坏或其他错误。
"文档"
        断言 self._outputs_meta  , (
            "尝试重新配置 output_meta,这是不支持的"
        )
        self._outputs_meta = 元组(输出元数据)  # 类型:忽略[赋值]

    定义 获取输出元数据(self) -> 元组[PyTorch.张量, ...]
        获取表示此阶段输出的输出元数据(元张量)
        断言 self._outputs_meta   , (
            尝试在未配置输出元数据的情况下获取_outputs_meta()
        )
        返回 self._outputs_meta

    定义 _create_grad_send_info(
        self,
        args_recv_info: 元组,
    ) -> 列表[可选[整数]]
        ""
创建要发送梯度的阶段索引列表。
"文档"
        毕业发送信息: 列表[可选[整数]] = []

        定义 map_recv_to_send(a):
            # 注意:只要在之前阶段,我们就将梯度发送回前一个阶段
            # forward 它是一个接收到的输入,无论它是否需要
            # grad. 由前一阶段负责丢弃此梯度。
            如果 isinstance(a, 接收信息):
                毕业发送信息.追加(a.)
                返回 a.
            else:
                毕业发送信息.追加()
                返回 

        map_aggregate(args_recv_info, map_recv_to_send)

        记录器.调试("%sGrad 发送信息:%s", self.log_prefix, 毕业发送信息)
        返回 grad_send_info

    @abstractmethod
    定义 准备前向基础设施(
        self,
        num_microbatches: 整数,
        参数: 元组[任意, ...],
        kwargs: 可选[字典[字符串, 任意]] = ,
    ) -> 元组[任意, ...]
        抛出 未实现异常

    定义 _准备反向基础设施(self, num_microbatches: 整数):
        # TODO:这是用于 backward_maybe_with_nosync 所必需的
        self.数据块 = 微批次数

         mb 索引  范围(num_microbatches):
            `grad_recv_info` 是 `act_send_info` 的镜像
            self.grad_recv_info[mb 索引] = self.创建梯度接收信息(
                self.发送信息操作
            )

    @abstractmethod
    定义 创建梯度接收信息(
        self,
        发送信息操作: 字典,
    ) -> 元组[接收信息, ...]
        抛出 未实现异常

    定义 _获取接收操作(
        self,
        接收信息: 元组[输入信息, ...],
    ) -> 列表[距离.P2P 操作]
        ""
`get_fwd_recv_ops` 和 `get_bwd_recv_ops` 共享的辅助函数。
返回与接收信息对应的操作列表。
"文档"
        操作: 列表[距离.P2P 操作] = []
         信息  接收信息:
            如果  isinstance(信息, 接收信息):
                继续

            伙伴排名 = .阶段索引到组排名[信息.]
            伙伴全局排名 = (
                伙伴排名
                如果 .  
                否则 距离.获取全球排名(.群组, 同侪排名)
            )
            操作.追加(
                距离.P2P 流(距离.接收, 信息.缓冲区, 全球排名, .群组)
            )

        返回 ops

    "[注意:V-排期特殊情况]"

V-排期存在一个特殊情况,即相邻的 stage_id 的 2 个阶段位于同一等级。

例如:2 个等级,4 个阶段形成一个简单的 V:
rank0:阶段 0                   阶段 3
rank1:          阶段 1  阶段 2

阶段 0,1 和 2,3 使用 send/recv 正常通信,但阶段 1,2 不需要
使用通信操作。相反,它们应通过函数调用直接传递张量数据。

set_local_fwd_input 和 (get_local_bwd_output + set_local_bwd_input) 有助于这种优化,并且
应在管道调度适当的时间调用(在正向或反向执行之后)。
"文档"

    定义 设置本地前向输入(, 前一阶段输出: 任何, mb 索引: 整数) -> :
        ""
将'prev_stage_outputs'从同一 rank 的另一个阶段移动到本阶段的输入位置。避免
复制张量数据或使用 send/recv 操作。断开原始张量并设置 requires_grad。
张量可以作为 autograd 的叶子节点,在反向传播过程中可以从中收集梯度。
"文档"
        接收信息: 元组[输入信息, ...] = .args_recv_info[mb 索引]

        # 查看 [备注:管道模型输出类型]
        前一阶段输出 = _normalize_model_output_as_tuple(前一阶段输出)

         信息, 张量  zip(接收信息, 前一阶段输出):
            断言 isinstance(张量, 火炬.张量), (
                f预期从前一阶段输出的张量值,得到{类型(张量)}"
            )
            断言 isinstance(信息, 接收信息), (
                "仅在非第一阶段调用 set_local_Fwd_input,第一阶段应始终具有 RecvInfo"
            )

            # 这里我们不需要进行数据复制,因为我们可以直接从
            # 一个阶段传递激活张量引用到下一个阶段。然而,我们需要将激活标记为叶子张量,因为它将
            作为创建全新自动微分图的输入张量,不属于前一阶段的自动微分图的一部分。
            TODO:确认,我们是否将此激活作为前一阶段的反向调用的根?脱离(detach)对此有何影响?
            脱离(detach)对此有何影响?
            信息.缓冲区 = 张量.detach().需要梯度_(True)

    定义 获取局部反向输出(, mb 索引):
        “”
返回该阶段的输入梯度张量,这些张量对应于前向过程中的阶段输入。
"源代码"
        断言 self.有后向, (
            无法在当前阶段没有反向时窃取_bwd_input
        )
        断言  .is_first, "如果这个阶段是第一个,则无法获取 bwd 输出"

        .检查块 ID(mb 索引)
        返回 .bwd_cache.流行(mb 索引)

    定义 设置本地反向输入(
        , 下一个阶段的反向输出: 元组[可选[火炬.张量], ...], mb 索引: 整型
    ) -> :
        ""
将'grad input'张量从下一个阶段移动到当前阶段的'grad_output',避免复制或发送/接收。
不分离或设置 '_requires_grad'。
"文档"
        断言 isinstance(下一个阶段的反向输出, 元组), (
            f预期得到元组,却得到了{类型(下一个阶段的反向输出)}"
        )

        断言 .has_backward, (
            无法设置反向输入,如果此阶段没有反向功能
        )
        断言  .是最后一个, 如果这个阶段是最后一个,则不能设置反向输入
        接收信息 = .grad_recv_info[mb 索引]
         信息, 张量  zip(接收信息, 下一个阶段的反向输出):
            断言 isinstance(张量, 火炬.张量), (
                f预期从前一阶段输出的张量值,得到{类型(张量)}"
            )
            断言 isinstance(信息, 接收信息), (
                f预期接收信息,却得到了{类型(信息)}"
            )
            信息.缓冲区 = 张量

    定义 获取前向接收操作(self, 前向块 ID: 整数) -> 列表[距离.P2P 流]
        ""
返回接收输入参数所需的操作列表

"文档"
        接收信息: 元组[输入信息, ...] = .args_recv_info[前向块 ID]

        返回 ._获取接收操作(接收信息)

    定义 get_bwd_recv_ops(, bwd_chunk_id: 整数) -> 列表[距离.P2P 流]
        ""
返回此阶段所需的接收梯度的操作列表

"文档"
        如果  .has_backward 或者 .是最后一个:
            返回 []

        接收信息 = .grad_recv_info[bwd_chunk_id]
        返回 ._获取接收操作(接收信息)

    定义 获取前向发送操作(, 前向块 ID: 整数) -> 列表[距离.P2P 流]
        ""
获取当前阶段的正向激活发送操作。
"文档"
        输出 = .输出块[前向块 ID]
        将输出形式统一为元组,以便与 `act_send_info` 容易对应
        # `act_send_info`
        输出元组 = 输出 如果 类型(输出)  元组 否则 (输出,)

        操作: 列表[距离.P2P 流] = []

         索引, out  列举(输出元组):
            目标阶段 = .发送信息操作[索引]
             dst  目标阶段:
                如果 dst  :
                    继续
                记录器.调试(
                    "%s将张量发送到阶段%s: %s",
                    .log_prefix,
                    目标,
                    输出.尺寸(),
                )
                伙伴排名 = .阶段索引到组排名[目标]
                伙伴全局排名 = (
                    伙伴排名
                    if .  
                    否则 距离.获取全球排名(.群组, 同侪排名)
                )
                操作.追加(距离.P2P 流(距离.isend, 输出, 全球排名, self.群组))

        返回 ops

    定义 获取反向发送操作(self, bwd_chunk_id: 整数) -> 列表[距离.P2P 操作]
        ""
获取当前阶段的反向梯度发送操作。
"文档"
        self.检查块 ID(bwd_chunk_id)

        如果  self.has_backward 或者 self.is_first:
            返回 []

        懒惰地创建反向发送基础设施
        如果 self.grad_send_info  :
            # 向反向过程中输入梯度发送信息:
            # 对应输入梯度的目的地列表
            # 如果输入没有梯度,则可以是 None
            `grad_send_info` 是 `args_recv_info` 的镜像
            self.grad_send_info = self._create_grad_send_info(.args_recv_info[0])

        操作: 列表[距离.P2P 操作] = []
        梯度输入 = .bwd_cache.弹出(bwd_chunk_id)
         研究生, grad_recv_stage  压缩(输入梯度, .毕业发送信息):
            如果 isinstance(研究生, PyTorch.张量)  grad_recv_stage   :
                记录器.调试(
                    "%s将梯度发送到阶段%s: %s",
                    .log_prefix,
                    grad_recv_stage,
                    研究生.尺寸(),
                )
                伙伴排名 = .阶段索引到组排名[grad_recv_stage]
                伙伴全局排名 = (
                    伙伴排名
                    如果 .  
                    否则 距离.获取全球排名(.群组, 同侪排名)
                )
                操作.追加(距离.P2P 操作(距离.isend, 研究生, 全球排名, .群组))
            否则:
                如果  (梯度    grad_recv_stage  ):
                    抛出 运行时错误(
                        f"[{self.stage_index}] 用于块{bwd_chunk_id}有渐变{研究生} "
                        f并期望将渐变发送到阶段{grad_recv_stage}"
                    )
        返回 ops

    def 清除运行时状态() -> :
        “”
清除舞台的运行状态。
"文档"
        # 将微批处理 ID 映射到前向张量参数列表
        .前向缓存.清晰()
        # 缓存块输出以进行最终输出合并或缩减
        .输出块.清晰()

        清除计划步骤之间的输入缓冲区。这是因为在
        `torch.autograd.backward() 将将梯度累积到叶子节点`
        默认情况下为张量。为了使梯度能够回传到前面的阶段,我们
        不希望有这种积累。
         接收元组  .args_recv_info.():  # 遍历所有块
             a  接收元组:  遍历所有输入参数
                如果 isinstance(a, 接收信息):
                    将其设置为 None 是清除梯度的新方法,比`zero_()`更推荐。
                    查看 https://github.com/pytorch/pytorch/pull/92731
                    a.缓冲区.梯度 = 

    def _map_tensor_from_recv_info(
        ,
        接收信息: 元组[输入信息, ...],
    ):
        “”
将接收信息中的张量映射到一个列表中。
"文档"

        def 获取接收张量(信息):
            如果 isinstance(信息, 接收信息):
                返回 信息.缓冲区
            else:
                提升 断言错误(f预期 _RecvInfo 但得到{类型(信息)}")

        返回 map_aggregate(角色(参数, 接收信息), 获取接收张量)

    def _检索接收到的激活(, 前向块 ID: 整数):
        ""
在前向过程中检索当前阶段的接收到的激活。
"文档"
        接收信息 = self.args_recv_info[前向块 ID]
        激活 = ._map_tensor_from_recv_info(接收信息)
        返回 激活

    def _检索接收梯度(
        ,
        bwd_chunk_id: 整数,
    ):
        “”
在反向传播过程中检索当前阶段的接收到的梯度。
"文档"
        接收信息 = .grad_recv_info[bwd_chunk_id]
        梯度 = ._map_tensor_from_recv_info(接收信息)
        返回 梯度

    def 可能不同步转发(, *参数, **kwargs):
        # 如果子模块被 DDP 包装,我们使用`no_sync`上下文管理器
        # 避免每个微批次的梯度全量归一化
        如果 isinstance(.子模块, 分布式数据并行):
             self.子模块.不同步():  # type: ignore[operator]
                输出值 = .子模块(*参数, **kwargs)
        else:
            输出值 = .子模块(*参数, **kwargs)
        返回 输出值

    def 梯度缩放(, 梯度缩放因子: 整数) -> :
        将模型梯度按`grad_scale_factor`缩放,该因子应与`scale_factor`一起指定
损失函数用于流水线。对于执行'平均'损失减少的损失函数,`grad_scale_factor`
应设置为 num_microbatches。对于使用`sum`缩减的损失函数,`grad_scale_factor`应
设置为 1。

应该在每个管道调度步骤中只调用一次,在所有反向传递完成后。
""

        # 仅对其自身的贡献(微批)进行 PP 缩放,但依赖于 DP 进行进一步缩放。
        # 对于 DP 度。
        如果 梯度缩放因子 != 1:
             p  .子模块.参数():
                如果 p.梯度   :
                    p.研究生.div_(梯度缩放因子)

    def 可能反向,无同步(
        ,
        反向类型,
        bwd 参数: 字典,
        最后一次反向: 布尔类型 = 错误,
    ) -> 元组[元组[可选[PyTorch.张量], ...], 可选[列表[字典[字符串, 任意]]]]:
        ""
无论使用 PP 与 FSDP 还是 DDP,在最后一个反向步骤中都有一些运行时差异
其他步骤。具体来说,我们需要在之前的步骤中累积梯度,并在最后一步中减少它们,
根据所使用的数据并行度,可能还有额外的状态变量和性能考虑因素。
此辅助函数应适应任何管道并行调度,以便与常见的/支持的数据并行库一起工作。
""

        def 执行反向操作(
            反向类型,
        ) -> 可调用[
            []
            元组[元组[可选[PyTorch.张量], ...], 可选[列表[字典[字符串, 任意]]]],
        ]
            如果 反向类型 == 全部:
                返回 lambda: (
                    后退阶段(
                        bwd 参数[阶段输出],
                        bwd 参数[输出梯度],
                        bwd 参数[输入值],
                    ),
                    ,
                )
            elif 反向类型 == 输入:
                返回 lambda: 阶段后向输入(
                    bwd 参数[阶段输出],
                    bwd 参数[输出梯度],
                    bwd 参数[输入值],
                    .子模块.参数(),
                )
            elif 反向类型 == 权重:
                返回 lambda: (
                    阶段向后权重(
                        .子模块.参数(), bwd 参数["参数组"]
                    ),
                    ,
                )
            else:
                抛出 运行时错误(f"未知反向类型:"{反向类型}")

        # 如果子模块被 DDP 包装
        如果 isinstance(.子模块, 分布式数据并行):
            如果 最后一次反向:
                # 最后的块,准备进行梯度缩减
                # HACK:在这里深入到 DDP 实现细节。有更好的方法吗?
                .子模块.简化器.准备反向操作(  # type: ignore[union-attr, operator]
                    列表(
                        PyTorch.神经网络.并行.分布式.查找张量(  # 类型: 忽略[attr-defined]
                            bwd 参数[阶段输出]
                        )
                    )
                )
                结果 = 执行反向操作(反向类型)()
            否则:
                 .子模块.不同步():  # type: ignore[operator]
                    结果 = 执行反向操作(反向类型)()
        如果子模块是 FSDP 模块
        elif isinstance(.子模块, FSDP 模块):
            .子模块.设置为最后一个反向操作(错误)
            .子模块.设置反向操作后重新分片(错误)
            .子模块.设置需要梯度同步(错误)
            结果 = 执行反向操作(反向类型)()
            如果 最后一次反向:
                # 手动调用 FSDP 的 post backward
                def 运行后向回(fsdp 模块: FSDP 模块) -> :
                    fsdp 模块.设置为最后一个反向操作(True)
                    fsdp 模块.设置反向操作后重新分片(True)
                    fsdp 模块.设置需要梯度同步(True)
                    fsdp 状态 = 全分片.状态(fsdp 模块)  # 类型: 忽略[attr-defined]
                     状态  fsdp 状态._状态上下文.所有状态:
                        如果 状态._fsdp 参数组:
                            状态._fsdp 参数组.反向传播后()

                    # 如果反向调用.backward 能够触发自动微分钩子,那就好多了。
                    # 那样 DDP/FSDP 等模块才会按预期工作。目前我们只能暂时解决这个问题,
                    # 我们还需要调用这个,以确保 FSDP 能够将梯度缩减操作同步回默认流。
                    fsdp 状态._root_post_backward_final_callback()

                运行后向回(self.子模块)

        否则:
            非 DP 子模块,常规反向
            结果 = 执行反向操作(反向类型)()

        梯度, 参数组 = 结果
        返回 梯度, 参数组

    def 前进一个块(
        ,
        前向块 ID: 整数,
        参数: 元组[任意, ...],
        kwargs: 可选[字典[字符串, 任意]] = ,
    ):
        ""
在该阶段使用一个微批处理执行前向传递。
`args` 和 `kwargs` 是外部输入到该阶段的参数。
截至 2024 年 9 月:
`- `args` 仅适用于第一阶段,其他阶段接收 args`
通过激活传输。
`- `kwargs` 可以通过各自的 `step` 调用传递给所有阶段。`
""

        如果 .is_first:
            # 第一阶段无需接收任何内容
            复合参数 = args
        否则:
            # 接收此块激活
            # 激活以参数形式传入
            复合参数 = ._检索接收到的激活(前向块 ID)

        composite_kwargs = kwargs  {}

        self._验证正向输入(参数, kwargs)

        计算正向
        try:
            输出 = 自身.可能不同步转发(*复合参数, **组合关键字参数)

        除了 异常 作为 e:
            exc_msg = f""
            {自身.log_prefix}运行前向失败:
参数:{map 调试信息(复合参数)}
kwargs: 参数字典{map 调试信息(组合关键字参数)}
"源代码"
            抛出 运行时错误(exc_msg) 来自 e

        # 查看 [备注:管道模型输出类型]
        输出元组 = _normalize_model_output_as_tuple(输出)

        准备最终输出合并或缩减
        自身.输出块.追加(输出)

        保存激活和输入以供反向传播
        平铺参数 = 展平参数(复合参数)
        平坦参数 = 展平参数(组合关键字参数)
        展平输入张量 = 平铺参数 + 平坦参数
        自身.前向缓存[前向块 ID] = (
            输出元组,  # 阶段输出
            展平输入张量,  输入值
        )

        记录器.调试(
            "%s转发块%s,输出:%s",
            自身.log_prefix,
            前向块 ID,
            map 调试信息(输出),
        )
        自身._验证前向输出(输出元组)

        # 返回原始用户提供的输出,不转换为元组格式。
        # 查看 [备注:管道模型输出类型]
        返回 输出

    def 向后移动一个块(
        self,
        bwd_chunk_id: 整数,
        损失=,
        全部后向: 布尔类型 = True,
        最后一次反向=错误,
    ):
        ""
在模块上执行反向传播。
应该在每个微批次中只调用一次。

如果 full_backward 为 True(默认值),则将运行包括权重和输入梯度的完整反向传播,
调用 `backward_weight_one_chunk` 对于此 bwd_chunk_id 是错误的。

如果 full_backward 为 False,则在 __init__ 时向 PipelineStage 提供的 `dw_runner` 是可选的
需要随后调用 `backward_weight_one_chunk` 以调用 dw_runner 并完成反向操作。

last_backward 由调度和 DP 组间的梯度同步信号控制
在最后向后之后。
"源代码"
        self.检查块 ID(bwd_chunk_id)

        (
            阶段输出,
            输入值,
        ) = self.前向缓存.弹出(bwd_chunk_id)

        # 反向计算
        如果 self.是最后一个:
            # 最后阶段从损失中计算梯度,没有来自下一阶段的梯度
            # 下一阶段
            bwd_kwargs = {
                阶段输出: 损失,
                输出梯度: ,
                输入值: 输入值,
            }
        否则:
            # 否则,接收下一阶段的梯度
            grads_output = self._检索接收梯度(bwd_chunk_id)
            # 如果管道的输入需要梯度,
            `torch.autograd.backward` 将梯度累加到这样的输入的 `.grad` 字段中
            `.grad` 字段中
            bwd_kwargs = {
                阶段输出: 阶段输出,
                输出梯度: 梯度输出,
                输入值: 输入值,
            }

        输入梯度: 元组[可选[PyTorch.张量], ...] = ()

        自定义反向函数
        如果 self.dw_builder:
            # TODO: 我们可能想要改变我们的语义,以便我们可以忽略
            # 'dw_builder' 并在它是 full_backward 操作时直接调用 full_backward。
            输入梯度, _ = self.可能反向,无同步(
                全部,
                bwd 参数,
                最后一次反向=最后一次反向,
            )
            如果 全部后向:
                self.dw_builder()()
            否则:
                self.dw_runner[bwd_chunk_id] = self.dw_builder()
        否则:
            如果 全部后向:
                输入梯度, _ = self.可能反向,无同步(
                    全部, bwd 参数, 最后一次反向=最后一个反向
                )
            否则:
                参数组: 列表[字典[字符串, 任意]] |  = 
                跳过第一阶段的后向传播,因为我们将在 backward_weight_one_chunk 中执行权重更新
                在 backward_weight_one_chunk 中执行 autograd.backward
                如果  self.is_first:
                    如果 isinstance(bwd 参数[阶段输出], PyTorch.张量):
                        bwd 参数[阶段输出] = (bwd 参数[阶段输出],)

                    # 对输入执行自定义的逆向操作
                    # 当 "阶段输出" 是损失时,它是一个张量,否则它是一组张量
                    输入梯度, 参数组 = self.可能反向,无同步(
                        输入, bwd 参数, 最后一次反向=最后一个反向
                    )

                # TODO: 我们不需要保存这个,添加到 dw_runner 吗?
                self.backward_state[bwd_chunk_id] = (
                    bwd 参数[输入值],
                    参数组,
                    bwd 参数[阶段输出],
                    bwd 参数[输出梯度],
                )
                保存 dw_runner 的占位符
                self.dw_runner[bwd_chunk_id] = lambda: 

        self.bwd_cache[bwd_chunk_id] = 梯度输入

        如果 self.是最后一个   self.is_first:
            # 自动微分依赖:
            #    rest_of_autograd_graph -> stage_output -> loss
            # stage_output 在最后阶段不再使用,仅用于回溯
            # 返回给用户以合并输出块
            # 应将其分离以释放 autograd 图上下文并提前释放内存
             t  阶段输出:
                如果  t._is_view():  # 视图不可就地分离
                    t.断开连接()

        记录器.调试("%s退回的块%s", self.log_prefix, bwd_chunk_id)

    def 向后权重一个块(self, bwd_chunk_id: 整数, 最后一次反向=错误):
        断言 bwd_chunk_id  self.dw_runner, (
            f"{self.log_prefix}尝试运行 backward_weight_one_chunk 为 chunk{bwd_chunk_id}"
            "在没有先调用 `backward_one_chunk(full_backward=False)` 的情况下"
        )

        如果 self.dw_builder   :
            self.dw_runner.弹出(bwd_chunk_id)()
        否则:
            (
                输入值,
                参数组,
                阶段输出,
                输出梯度,
            ) = self.backward_state.弹出(bwd_chunk_id)

            如果 self.阶段索引 != 0:
                bwd_kwargs = {
                    阶段输出: 阶段输出,
                    "参数组": 参数组,
                }
                self.可能反向,无同步(
                    权重, bwd 参数, 最后一次反向=最后一个反向
                )
            否则:
                # TODO: 想出一个更好的方法来做这件事:
                # 如果输入不需要梯度,
                # 阶段反向输入期间参数组将无法完全捕获
                # 在这种情况下,我们需要直接在参数上调用 grad
                # 解决方案:让输入函数先进行交集计算,然后在 W 阶段完成
                bwd_kwargs = {
                    阶段输出: 阶段输出,
                    输出梯度: 输出梯度,
                    输入值: 输入值,
                }
                self.可能反向,无同步(
                    全部, bwd 参数, 最后一次反向=最后一个反向
                )

    def _验证正向输入(self, 参数, kwargs):
        如果输入参数/关键字参数的形状与该阶段配置的形状不匹配,则引发 RuntimeError 异常。

        如果 self.is_first:
            # TODO 为什么每个管道块都有一个单独的接收信息?
            避免向此函数传递 `fwd_chunk_id`,我们
            将所有块与 args_recv_info[0] 进行检查
            预期参数 = self.args_recv_info[0]
        否则:
            # 假设它们不接受非 0 阶段,我们不检查输入
            用户输入在标准管道场景中
            返回

        如果 len(kwargs):
            # TODO-需要将 kwarg 映射到 self.args_recv_info 中的位置
            没有它,我们无法 100%确定如何匹配 args 和
            expected_args。
            返回

        # TODO-需要将 kwarg 映射到 self.args_recv_info 中的位置
        # 可能无法判断长度不匹配是因为
        # (a) 用户传递了多余的参数或遗漏了参数
        # (b) 用户未传递具有默认值的 kwarg
        预期张量元数据 = [
            e.元数据 如果 isinstance(e, 根参数占位符) 否则 e.缓冲区
             e  预期参数
        ]
        验证张量元数据(
            f阶段{self.阶段索引}向前输入, 预期张量元数据, args
        )

    def _验证前向输出(self, 输出: 元组[PyTorch.张量, ...)]
        如果这个阶段产生了意外形状/数据类型的输出,将引发 RuntimeError。
这很可能是由于用户错误地指定了输出形状,或者是因为在原始模型上进行了形状推断,但在运行时模型被包装成了混合精度,从而改变了输出数据类型。
例如,形状推断是在原始模型上进行的,但在运行时模型被包装成了混合精度,这会改变输出数据类型。
这可能导致输出数据类型发生变化。
"源代码"
        预期张量元数据 = self.获取输出元数据()
        验证张量元数据(
            f阶段{self.阶段索引}前向输出, 预期张量元数据, 输出
        )


 _PipelineStage(_PipelineStageBase):
    定义 初始化(
        self,
        阶段模块: PyTorch.神经网络.模块,
        阶段索引: 整数,
        管道信息: 管道信息,
        设备: PyTorch.设备,
        群组: 可选[距离.流程组] = ,
    ):
        ""
根据要包装的 stage_module 创建一个管道阶段
并描述管道阶段关系的 `pipe_info`

参数:
stage_module (torch.nn.Module):要被此阶段包装的模块
stage_index (int):此阶段在管道中的索引
pipe_info (PipeInfo):关于管道的信息,可以通过 `pipe.info()` 获取
device (torch.device):此阶段使用的设备
group (Optional[dist.ProcessGroup]):此阶段使用的进程组
"源代码"
        _PipelineStageBase.初始化(
            self,
            阶段模块,
            阶段索引,
            管道信息.阶段数,
            设备,
            群组,
        )
        self.管道信息 = 管道信息

        在图中查找阶段节点
        子模块节点 = [
            节点  节点  管道信息..节点 如果 node.操作符 == "调用模块"
        ]
        如果 len(子模块节点) != self.阶段数:
            提升 断言错误(
                f管道图中子模块的数量{len(子模块节点)}阶段数量不匹配{self.阶段数}"
            )

        在图中查找我的阶段节点
        self.节点 = 子模块节点[self.阶段索引]
        self.名称 = self.node.名称
        记录器.信息(
            "[%s创建管道阶段%s对于%s",
            self.群组排名,
            阶段索引,
            self.名称,
        )

        将舞台名称映射到舞台索引
        self.子模块到阶段索引: 字典[字符串, 整数] = {}
         i, 节点  列举(子模块节点):
            self.子模块到阶段索引.setdefault(node.名称, i)

        # 将子模块转换为设备
        self._move_submod_to_device()

    定义 _move_submod_to_device(self):
        尝试将子模块移动到指定的设备上(如果可能)
        注意:由于元张量不支持 to()方法,因此无法将元模块移动到真实设备上
        在这种情况下,需要就地交换张量
        
        具有元参数 = 任何(
            isinstance(p, 假 Tensor)  p.是否是元数据  p  self.子模块.参数()
        )
        如果 具有元参数:
            记录器.调试("%s发现元参数!, self.log_prefix)
        否则:
            self.子模块.(self.设备)

    定义 准备前向基础设施(
        self,
        num_microbatches: 整数,
        参数: 元组[任意, ...],
        kwargs: 可选[字典[字符串, 任意]] = ,
    ) -> 元组[任意, ...]
        ""
创建激活(在正向过程中)的发送/接收基础设施
"源代码"
        # TODO(whc)
        # 此方法一旦实现懒加载缓冲区分配后应删除
        # 目前,它忽略 args/kwargs,因为它不应需要进行形状推断
         数据块  范围(num_microbatches):
            self.args_recv_info[数据块] = self.创建活动接收信息()

        每次激活时发送信息
        self.发送信息操作 = self.创建活动发送信息()
        返回 元组()

    定义 获取子模块的阶段索引(
        self,
        子模块名称: 字符串,
    ):
        ""
给定子模块名称,返回子模块的阶段索引。
"源代码"
        如果 子模块名称   self.子模块到阶段索引:
            提升 断言错误(f阶段 ID 为{子模块名称}未找到)

        返回 self.子模块到阶段索引[子模块名称]

    定义 创建活动接收信息(
        self,
    ):
        ""
为阶段的输入创建一个 `_RecvInfo` 元组。
"源代码"

        定义 创建接收张量(占位符, 节点参数):
            ""
为占位符创建接收缓冲区。
"源代码"
            示例值 = 占位符.元数据[]
            如果 节点参数.操作符 == 占位符:
                # 这是一个根级占位符,因此是整个模型的输入参数。
                # 我们可能处于 0 阶段,因此不需要创建接收缓冲区。
                返回 根参数占位符(示例值)

            确定此输入的源阶段
             节点参数.目标  操作符.获取项:
                如果输入是 getitem,我们需要进一步深入
                端节点 = 节点参数.参数[0]

            断言 节点参数.操作符 == 调用模块, (
                f预期调用模块,却得到了{节点参数.操作}"
            )
            源阶段 = self.获取子模块的阶段索引(arg_node.名称)

            为此占位符创建接收缓冲区
            记录器.调试(
                "%s创建接收缓冲区为输入 '%s' : %s, %s",
                自身.log_prefix,
                占位符.名称,
                示例值.shape,
                示例值.数据类型,
            )
            缓冲区 = 从元数据创建张量(示例值, 自身.设备)
            如果存在反向传播,则设置接收缓冲区的 requires_grad
            # 在第一个箭头之前
            如果 自身.有后向:
                缓冲区.需要梯度_(True)

            返回 接收信息(
                节点参数.名称,
                源阶段,
                缓冲区,
            )

        args_recv_info: 列表[输入信息] = []
        # 从 `self.submod`(一个 GraphModule)中过滤掉占位符节点
        占位符 = 过滤(  # type: ignore[var-annotated]
            lambda node: node.操作符 == 占位符,  # type: ignore[arg-type]
            self.子模块..节点,  忽略参数类型和属性联合
        )
        占位符是子模块内部的节点。
        `self.node.args`是外部图中的依赖节点。
        这两个是一对一的关系。
         占位符, 端节点  压缩(占位符, 自身.node.参数):
            为此占位符创建接收缓冲区
            recv_info = 创建接收张量(占位符, 节点参数)
            args_recv_info.追加(接收信息)

        记录器.调试(
            "%s激活接收 / 参数信息:%s", self.log_prefix, 参数接收信息
        )
        `args` 是一个元组,因此我们将返回一个 Tuple[InputInfo]
        返回 元组(args_recv_info)

    def 查找目标排名(
        自身,
        用户: fx.节点,
    ) -> 可选[整数]
        ""
找到`用户`节点的目标排名。
如果`用户`不是子模块,则可能返回`None`。
"源代码"
        如果 用户.操作符 == 调用模块:
            用户处于舞台(调用模块)
            返回 自身.获取子模块的阶段索引(用户.名称)
        否则:
            # - 如果用户.op == "output":
            无需发送回 0 号排名
            - 如果用户的目标是 stage_backward:
            - 假设子模块输出已本地存储或
            - 应该在激活检查点的情况下重新计算
            返回 

    def 创建活动发送信息(自身):
        ""
创建激活信息的字典。
该字典的形式为:
        {
            output_index: [dst_rank_0, dst_rank_1, ...],
            ...
        }
其中`dst_rank`列表涵盖了输出值可能的情况。
可被多个阶段消耗。
"源代码"
        # 输出索引:接收方进程列表
        发送信息操作: 字典[整数, 列表] = {}
        out_idx = 0

         用户  自身.node.用户们:
            如果 用户.目标  操作符.获取项:
                递归查找实际目的地
                gi_dsts = 发送信息操作.setdefault(out_idx, []
                 gi_user  用户.用户们:
                    目标排名 = 自身.查找目标排名(gi_user)
                    如果 目标排名   :
                        目标 gi.追加(目标排名)
                下一个`getitem`将指向下一个输出索引
                out_idx += 1
            否则:
                如果只有一个输出值,则`out_idx`不会增加
                dsts = 发送信息操作.setdefault(out_idx, []
                目标排名 = 自身.查找目标排名(用户)
                如果 目标排名   :
                    目标排名列表.追加(目标排名)

        输出节点 = 自身._获取输出节点()
        输出值: 元组[PyTorch.张量] = 元组(
            v.元数据[]  v  展平参数(输出节点.参数)
        )
        自身.配置输出元数据(输出值)

        记录器.调试("%s发送信息:%s", 自身.log_prefix, 发送信息操作)
        返回 发送信息操作

    def _获取输出节点(自身):
        输出节点 = [节点  节点  自身.子模块..节点 如果 node.操作符 == 输出]  # 类型:忽略[联合属性]
        断言 len(输出节点) == 1
        输出节点 = 输出节点[0]
        返回 输出节点

    def 创建梯度接收信息(
        自身,
        发送信息操作: 字典,
    ) -> 元组[接收信息, ...]
        ""
创建一个用于梯度的 `_RecvInfo` 元组。
"源代码"
        # Dict[输出索引, _RecvInfo]
        grad_recv_info: 字典[整数, 接收信息] = {}
        输出节点 = 自身._获取输出节点()

        # 输出节点可能接受多个参数,意味着子模块有多个输出值。
        输出值 = 展平参数(输出节点.参数)

         out_idx, 目标列表  发送信息操作.项目():
            如果  目标列表:
                没有实际接收器用于激活,因此没有返回梯度
                继续

            输出 = 输出值[out_idx]
            示例值 = 输出.元数据[]
            记录器.调试(
                f"{自身.log_prefix}创建输出接收缓冲区{输出.名称} "  # 无需注意:G004
                f":"{示例值.shape}, {示例值.数据类型}"
            )

            # TODO: 否则需要梯度累积
            断言 len(目标列表) == 1, "跳过连接的反向尚未支持"
            梯度源 = 目标列表[0]
            grad_recv_info[out_idx] = 接收信息(
                f"{}",  # 无需注意:G004
                ,
                从元数据创建张量(示例值, 自身.设备),
            )

        将其转换为元组,以便在 get_ops 和检索张量时方便使用
        grad_recv_info_tuple = 元组(grad_recv_info.())
        记录器.调试("%s Grad recv info: %s", 自身.log_prefix, grad_recv_info_tuple)
        返回 grad_recv_info_tuple


基于追踪的管道信息创建管道阶段的辅助函数
[文档]def build_stage( 阶段模块: torch.nn.Module, 阶段索引: int, pipe_info: 管道信息, device: 火炬设备, group: 可选的 dist.ProcessGroup = None, ) -> _PipelineStage: ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 创建一个由该阶段包装的阶段模块的管道阶段 管道信息。 参数: stage_module (torch.nn.Module): 需要被此阶段包装的模块 stage_index (int): 此阶段在流水线中的索引 pipe_info (PipeInfo): 流水线信息,可以通过 `pipe.info()` 获取 device (torch.device): 此阶段使用的设备 group(可选[dist.ProcessGroup]):此阶段所使用的进程组 返回值: _PipelineStage:一个可以与`PipelineSchedules`一起运行的管道阶段 """ return _PipelineStage( stage_module, stage_index, pipe_info, 设备, 组, )
[文档] 管道阶段(_PipelineStageBase): "" 代表管道并行设置中管道阶段的类。 PipelineStage 假设模型进行顺序划分,即模型被分割成块,其中一块的输出馈入下一块的输入,没有跳过连接。 PipelineStage 会自动通过传播 stage0 的输出到后续阶段,执行运行时形状/数据类型推断。 PipelineStage 通过传播 stage0 的输出到后续阶段,自动执行运行时形状/数据类型推断。 阶段 1 以及类似,按线性顺序。要绕过形状推断,请将`input_args`和`output_args`传递给每个 PipelineStage 实例。 参数: 子模块(nn.Module):此阶段包装的 PyTorch 模块。 stage_index(int):此阶段的 ID。 num_stages (int): 阶段总数。 device (torch.device): 此阶段所在设备。 input_args (Union[torch.Tensor, Tuple[torch.tensor]], optional): 子模块的输入参数(可选)。 output_args (Union[torch.Tensor, Tuple[torch.tensor]], optional): 子模块的输出参数。 group (dist.ProcessGroup, optional): 分布式训练的进程组。如果为 None,则使用默认组。 dw_builder (Optional[Callable[[], Callable[..., None]]): 如果提供,dw_builder 将构建一个新的 dw_runner 函数 用于 F、I、W (Fwd、Input、Weight) 零气泡调度计划的 W 操作(输入权重)。 "源代码" def 初始化( 自身, 子模块: 神经网络.模块, 阶段索引: 整数, 阶段数: 整数, 设备: PyTorch.设备, 输入参数: 可选[联盟[PyTorch.张量, 元组[PyTorch.张量, ...]]] = , 输出参数: 可选[联盟[PyTorch.张量, 元组[PyTorch.张量, ...]]] = , 群组: 可选[距离.流程组] = , dw_builder: 可选[可调用[] 可调用[..., ]]] = , ): 超级().初始化(子模块, 阶段索引, 阶段数, 设备, 群组, dw_builder) 自身.输入: 可选[列表[PyTorch.张量]] = 自身.输入元数据: 可选[元组[PyTorch.张量, ...]] = # 注意:输入和子模块理想情况下应在元设备上。我们决定(目前)不强制执行此要求。 # 可能会破坏现有用户。 如果 输入参数 : 断言 output_args , ( "如果指定输出参数,则必须也指定输入参数。" "否则,将在运行时执行形状推断" ) 否则: 自身.输入元数据 = ( (输入参数,) 如果 isinstance(输入参数, PyTorch.张量) 否则 输入参数 ) 如果 output_args : 记录器.警告( "弃用警告:传递 input_args 和执行初始化时形状推断已被弃用。" "PipelineStage 现在支持使用调度步骤()提供的实际输入进行运行时形状推断。" "或者删除 `input_args` 参数以 `PipelineStage` 选择运行时形状推断," "或者额外传递 `output_args` 到 `PipelineStage` 以完全覆盖形状推断。" ) try: PyTorch.不梯度(): output_args = 子模块(*自身.输入元数据) output_args = 仅树映射( PyTorch.张量, lambda x: x.(元数据), output_args ) 除了 异常 作为 e: 提升 运行时错误( 执行管道形状推理失败 - 您的输入是否与模块位于同一设备上? ) 来自 e 断言 output_args , ( 如果传递 input_args,也请传递 output_args 以覆盖形状推断 ) 自身.配置输出元数据( (输出参数,) 如果 isinstance(输出参数, PyTorch.张量) 否则 output_args ) 这些是在反向发送/接收中使用的缓冲区,它们稍后分配 自身.输出梯度: 列表[PyTorch.张量] = [] dbg_str = ( f管道阶段初始化完成,{自身.阶段索引=}, {自身.is_first=}," # 无需注意:G004 f"{自身.是最后一个=}, {自身.阶段数=}," ) 如果 自身.输入元数据 : dbg_str += ( f"inputs: "{[输入.形状 输入 自身.输入元数据]}," f"output: "{[输出.形状 输出 自身.获取输出元数据空括号]}" ) 否则: dbg_str += "运行时进行形状推断" 记录器.调试(dbg_str) def 形状推理( 自身, 参数: 元组[任意, ...] kwargs: 可选[字典[字符串, 任意]] = , ): 如果 kwargs : kwargs = {} 断言 args , "参数可能是一个空元组,但不能是 None" # 如果我们是第一阶段,或者前一个阶段在同一 rank 上,则跳过 recv 通信 # 并且可以将输出形状作为 args 传入,而不是使用 send/recv。 如果 ( 自身.是第一个 如果非第一阶段,则检查前一阶段是否在同一等级 自身.阶段索引到组排名[自身.阶段索引 - 1] == 自身.组排名 ): 记录器.调试( 形状推断:阶段%s跳过接收,因为通过`args`传递了形状信息, 自身.阶段索引, ) args = 仅树映射(PyTorch.张量, lambda x: x.(元数据), 参数) 否则: 断言 len(参数) == 0, ( "无法为非第一阶段提供形状推理输入参数" ) 物体 = [] 记录器.调试( 形状推断:阶段%s接收来自阶段%s", 自身.阶段索引, 自身.阶段索引 - 1, ) 距离.接收对象列表( 对象, =距离.获取全球排名( 自身. 距离.分布式_c10d.获取默认组(), 自身.阶段索引到组排名[自身.阶段索引 - 1] ), 群组=自身.群组, 设备=自身.设备, ) recv_args = 对象[0] 断言 isinstance(recv_args, 元组), 类型(recv_args) args = recv_args # 缓存输入形状,用于 recv 缓冲区分配期间 自身.输入元数据 = args args = 仅树映射( PyTorch.张量, lambda x: PyTorch.与...相同形状的零(x, 设备=自身.设备), args ) 设置所需的属性以进行正向操作 PyTorch.不梯度(): 输出 = 自身.子模块(*参数, **kwargs) 如果是单个张量,则转换使其始终为列表 如果 isinstance(输出, PyTorch.张量): 输出 = [输出] 传达元输出而不是实际输出有两个原因 1 - 它更快(尤其是由于对象收集序列化张量数据!) # 2 - 避免在接收端反序列化时为 src rank 激活 CUDA 上下文! 输出元数据 = 元组( 仅树映射(PyTorch.张量, lambda x: x.(元数据), 输出) ) 记录器.调试( 形状推断:阶段%s输入%s,输出%s", 自身.阶段索引, 自身.输入元数据, 输出元数据, ) 自身.配置输出元数据(输出元数据) # 将输出传递到下一阶段: # 两种情况 # 1. 通常:使用 send/recv 通信传递输出 # 2. 特殊情况:对于 V 调度,2 个相邻的阶段(例如 8 阶段 4 排 V 中的阶段 3、4) # 通过返回值和函数参数传递形状信息,而不是发送/接收。 如果 ( 自身.是最后一个 # 如果不是最后一个阶段,则检查下一个阶段是否在同一排 自身.阶段索引到组排名[自身.阶段索引 + 1] == 自身.组排名 ): # 上述情况(2):通过返回值传递形状信息,调用者将其作为参数传递给下一个阶段的 # _shape_inference 调用 记录器.调试( 形状推断:阶段%s跳过发送到下一阶段, 自身.阶段索引, ) 否则: # 情况(1):通过发送操作发送形状,并确保不返回给调用者 记录器.调试( 形状推断:阶段%s发送到阶段%s", 自身.阶段索引, 自身.阶段索引 + 1, ) 距离.发送对象列表( [输出元数据] 目标=距离.获取全球排名( 自身. 距离.分布式_c10d.获取默认组(), 自身.阶段索引到组排名[自身.阶段索引 + 1] ), 群组=自身.群组, 设备=自身.设备, ) 输出元数据 = 元组() 返回 输出元数据 def 准备前向基础设施( 自身, num_microbatches: int, 参数: 元组[任意, ...] kwargs: 可选[字典[字符串, 任意]] = , ) -> 元组[任意, ...] # TODO 将 self.device 从 step API 的输入张量移动到一个参数中? 断言 微批次数 , "TODO 修复 num_microbatches" 输出: 元组[任意, ...] = 元组() 如果 自身.输入元数据 : 输出 = 自身.形状推理(参数, kwargs) 断言 自身.输入元数据 # 接收前向信息 # TODO: 懒惰地创建 args_recv_info?(PipelineStage 同样需要) chunk_id 范围(num_microbatches): 如果 自身.is_first: # 我们假设总是从阶段 - 1 接收 接收信息 = 元组( [ 接收信息( frecv_for_{自身.阶段索引}_from_{自身.阶段索引 - 1}", 自身.阶段索引 - 1, 从元数据创建张量(输入, 自身.设备), ) 输入 自身.输入元数据 ] ) 如果存在反向传播,则设置接收缓冲区的 requires_grad 如果 自身.有后向: r 接收信息: r.缓冲区.需要梯度_(True) 自身.args_recv_info[chunk_id] = 接收信息 否则: 自身.args_recv_info[chunk_id] = 元组( [根参数占位符(i) i 自身.输入元数据] ) 每次激活时发送信息 # 只需要发送的排名 自身.发送信息操作: 字典[int, 列表] = {} 索引 范围(len(自身.获取输出元数据())): # 假设我们总是发送到下一阶段 + 1 如果 自身.是最后一个: 自身.发送信息操作[索引] = [自身.阶段索引 + 1] 否则: 自身.发送信息操作[索引] = [] 返回 输出 def 创建梯度接收信息( 自身, 发送信息操作: 字典, ) -> 元组[接收信息, ...] grad_recv_info: 元组[接收信息, ...] = () 如果 自身.是最后一个: 不支持从多个来源接收梯度 因此我们只取第一个目的地 grad_recv_info = 元组( [ 接收信息( frecv_grad_for_{自身.阶段索引}_from_{目标列表[0]}", 目标列表[0] 从元数据创建张量( 自身.获取输出元数据()[索引] 自身.设备 ), ) 索引, 目标列表 发送信息操作.项目() ] ) 返回 grad_recv_info

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源