快捷键

torch.distributed.pipelining.schedules 的源代码

# mypy: 允许未类型化定义
© Meta Platforms, Inc. 及其关联公司

导入 复制
导入 csv
导入 itertools
导入 记录日志
导入 正则表达式
来自 abc 导入 ABC, 抽象方法
来自 集合 导入 计数器, defaultdict
来自 枚举 导入 枚举
来自 打字 导入 任何, 可调用, 命名元组, 可选, 类型检查, 联合

导入 火炬
导入 torch.distributed  dist
来自 torch._dynamo 导入 优化模块
来自 torch.distributed.fsdp 导入 FSDP 模块, 解碎片处理
来自 torch.nn.modules.loss 导入 损失
来自 torch.profiler 导入 记录函数

来自 _utils 导入 生成阶段到排名映射
来自 .微批 导入 合并块, 将参数和关键字参数分割成块, 张量块规范
来自 .阶段 导入 _管道阶段基类


if 类型检查:
    来自 torch.distributed 导入 工作

全部 = [
    "获取调度类",
    "单条管道调度",
    "PipelineScheduleMulti",
    "Schedule1F1B",
    "ScheduleGPipe",
    "ScheduleInterleaved1F1B",
    "循环 BFS 调度",
    "交错零泡调度",
    "ZBV 零泡调度",
]

记录器 = 记录.获取日志记录器(__name__)


 计算类型(枚举):
    # TODO(whc) 重命名为 _ActType?
    前进 = 1
    反向输入 = 2
    反向权重 = 3
    未碎片化 = 4
    重置 = 5
    发送_F = 6
    接收_F = 7
    发送_B = 8
    接收_B = 9
    全部倒退 = 10

    定义 __str__(self):
        字符串映射表 = {
            计算类型.前向: F,
            计算类型.反向输入: "我",
            计算类型.反向权重: "W",
            计算类型.未碎片化: 解散,
            计算类型.重新碎片化: 重新碎片化,
            计算类型.发送_F: 发送_F,
            计算类型.接收_F: "接收_F",
            计算类型.发送_B: "发送_B",
            计算类型.接收_B: 接收_B,
            计算类型.全部反向: "B",
        }
        返回 字符串映射[self]

    @staticmethod
    定义 from_str(动作):
        if 行动 == F:
            返回 计算类型.前进
        elif 行动 == "我":
            返回 计算类型.反向输入
        elif 行动 == "W":
            返回 计算类型.反向权重
        elif 行动 == 解散:
            返回 计算类型.未碎片化
        elif 行动 == 重新碎片化:
            返回 计算类型.重置
        elif 行动 == 发送_F:
            返回 计算类型.发送_F
        elif 行动 == "接收_F":
            返回 计算类型.接收_F
        elif 行动 == "发送_B":
            返回 计算类型.发送_B
        elif 行动 == 接收_B:
            返回 计算类型.接收_B
        elif 行动 == "B":
            返回 计算类型.全部倒退
        else:
            raise 运行时错误(f无效的计算类型{动作}")


前进 = 计算类型.前进
反向输入 = 计算类型.反向输入
反向权重 = 计算类型.反向权重
未碎片化 = 计算类型.未碎片化
重置 = 计算类型.重置
发送_F = 计算类型.发送_F
接收_F = 计算类型.接收_F
发送_B = 计算类型.发送_B
接收_B = 计算类型.接收_B
全部倒退 = 计算类型.全部倒退

仅用于'简单调度格式'中的计算动作的便利简写
F = 前进
I = 反向输入
W = 反向权重
B = 全部倒退

解析动作字符串(如 1F0)到(阶段索引,计算类型,微批索引)元组的辅助工具
_action_regex = 正则表达式.编译(
    r"(\d+)(F|I|B|W|UNSHARD|RESHARD|SEND_F|RECV_F|SEND_B|RECV_B)(\d*)"
)


 _动作(命名元组):
    stage_index: 整型
    计算类型: _ComputationType
    微批索引: 可选[整数] = 

    定义 __repr__(self):
        表示 = 字符串(self.stage_index)
        表示 += 字符串(.计算类型)
        如果 .微批索引   :
            表示 += 字符串(.微批索引)
        返回 表示

    @staticmethod
    定义 from_str(action_string: 字符串):
        ""
__repr__ 的反转

字符串应格式化为 [阶段][动作类型][(微批次)]
例如 `2F0`,`1UNSHARD`,`3SEND_F1`
"文档"
        动作字符串 = 动作字符串.strip()
        如果 匹配 := _动作正则表达式.匹配(action_string):
            stage_index, 计算类型, 微批索引 = 匹配.群组()
            返回 _动作(
                整数(stage_index),
                计算类型.from_str(计算类型),
                整数(微批索引) 如果 len(微批索引) 否则 ,
            )
        elif 动作字符串 == 输入文本翻译为简体中文为:"":
            返回 
        raise 运行时错误(
            f"无效的操作字符串:"{action_string},应格式化为[阶段][动作类型][(微批次)]例如 2F0
        )


定义 _format_pipeline_order(
    管道顺序: 字典[整数, 列表[可选[_动作]],
    错误步骤编号: 可选[整数] = ,
) -> 字符串:
    ""
以时间步(行)x 排名(列)网格的形式格式化管道顺序
并返回格式化后的字符串。

如果传入 `error_step_number`,则将添加一个额外的标签以表示它是在哪个步骤出错。
出错的步骤。
"文档"

    不要修改原始内容。
    流水线顺序 = 复制.深拷贝(管道顺序)

    # 替换 None 为 ""
     排名  管道顺序:
         i  范围(len(管道顺序[排名])):
            如果 管道顺序[排名]
[i]  :
                # TODO 将 'None action' 实现为打印空字符串并让 mypy 满意
                管道顺序[排名]
[i] = 请提供需要翻译的文本  # type: ignore[call-overload]

    计算所有层级的最大步数
    步骤数量 = 最大值(len(动作)  动作  管道顺序.())
    步骤标签 = [
        步骤 + 字符串(i).填充(len(字符串(步骤数量 - 1)))  i  范围(步骤数量)
    ]
    # 按键对字典进行排序并按该顺序检索值
    排行操作 = [
        管道顺序.获取(, [输入文本翻译为简体中文为:""] * 步骤数量)  key  排序(管道顺序)
    ]
    将列表的列表(行转列)
    转置动作 = 列表(itertools.zip_longest(*排行操作, 填充值=输入文本翻译为简体中文为:""))
    生成排名的列标签
    排名数量 = len(管道顺序)
    排名标签 = ["排名 " + 字符串(i)  i  范围(排名数量)]
    计算每列的最大长度,考虑标签
    最大长度 = [
        最大值(len(字符串(项目)) 如果 项目    否则 0  项目  )
           压缩(步骤标签, *转置动作)
    ]
    # 格式化标题行,带有排名标签
    标题行 = 输入文本为空,请提供需要翻译的文本 * (长度(步骤标签[0]\) + 2) + 输入文本为空,请提供需要翻译的文本.连接(
        f"{标签:<{最大长度[i]}}"  i, 标签  列举(排名标签)
    )
    # 格式化每行及其对应标签
    格式化行 = [
        f"{标签} 
        + 输入文本为空,请提供需要翻译的文本.连接(f"{字符串(项目):<{最大长度[i]}}"  i, 项目  列举())
        + (
            "错误位置"
            如果 错误步骤号   
             整数(标签.分割()[1]\) == 错误步骤号
            否则 请提供需要翻译的文本
        )
         标签,   zip(步骤标签, 交换动作)
    ]
    # 将行合并为单个字符串
    格式化表格 = 表头行 + "输入文本翻译为简体中文为:\n" + "输入文本翻译为简体中文为:\n".连接(格式化行) + "输入文本翻译为简体中文为:\n"
    返回 格式化表格


 _管道调度(ABC):
    定义 初始化(
        ,
        微批次数量: 整数,
        损失函数: 可选[可调用[..., 火炬.张量]] = ,
        参数块规范: 可选[元组[张量块规范, ...]] = ,
        kwargs_chunk_spec: 可选[字典[字符串, 张量块规范]] = ,
        输出合并规范: 可选[联盟[字典[字符串, 任何], 元组[任何]]] = ,
        梯度缩放: 布尔类型 = True,
    ):
        # From arguments
        ._n_微批次 = n_微批次
        ._loss_fn = 损失函数

        # 请参阅 `PipelineScheduleSingle` / `PipelineScheduleMulti` 中的文档
        .缩放梯度 = 缩放梯度

        # 位置输入的 Chunking 规范(默认:`None`)
        ._args_chunk_spec = args_chunk_spec
        # 关键词输入的分块规范(默认:`None`)
        ._kwargs_chunk_spec = kwargs_chunk_spec
        .输出合并规范 = output_merge_spec
        ""
# args_chunk_spec 和 kwargs_chunk_spec 指定了如何分块输入。
用于在 `step(x)` 中将批处理转换为微批处理。请参阅
`TensorChunkSpec` 以获取创建它们的辅助方法。
"文档"

        派生的
        .是否有反向操作 = ._loss_fn   

        # 每个微批次的损失。
        ._内部损失: 列表[火炬.张量] = []
        日志记录器.信息("使用"%s", ..__name__)

    定义 _可能计算损失(, 阶段, 输出, 目标_mbs, mb 索引):
        如果 阶段.是最后一个  ._has_backward:
            损失 = ._compute_loss(输出, 目标_mbs[mb 索引]\)  忽略索引
            ._内部损失.追加(损失)

    定义 可能会损失(, 阶段, mb 索引):
        有效索引 = 0  mb 索引 < len(._内部损失)
        如果 阶段.是最后一个  .是否有反向操作  有效索引:
            返回 ._内部损失[mb 索引]
        elif len(self._internal_losses) != 0   有效索引:
            raise 运行时错误(
                f"微批次的损失{mb 索引}不可用。 "
                f"微批次的可用损失:"{.内部损失}"
            )
        else:
            返回 

    定义 更新损失(, 阶段, 损失):
        ""
将损失更新为内部状态中的那些
"文档"
        # 如果阶段不是一个列表,则将其转换为列表
        如果  isinstance(阶段, 列表):
            阶段 = [阶段]
        包含最后一个阶段 = 任何(阶段.是最后一个  阶段  阶段)

        如果传入容器,则返回损失
        如果 包含最后一个阶段  损失   :
            如果 len(._内部损失) != .微批次数量:
                raise 运行时错误(
                    f预期{.微批次数量}损失但得到了{len(._内部损失)}"
                )

            首先清洁外部容器
            损失.清晰()
            将内部损失复制到外部容器
            损失.扩展(._内部损失)

        ._内部损失.清晰()

    @abstractmethod
    定义 _微批次步骤(
        ,
        参数_mbs: 可选[列表] = ,
        关键字参数_mbs: 可选[列表] = ,
        目标_mbs: 可选[列表] = ,
        损失: 可选[列表] = ,
    ):
        ""
运行一次包含微批次的流水线调度。
将根据调度遍历所有微批次。
实现。

参数:
微批次:微批次参数列表。
"文档"
        raise 未实现异常

    @abstractmethod
    定义 步长(, *参数, 目标=, 损失: 可选[列表] = , **kwargs):
        ""
运行一次使用 *全批次* 输入的管道调度。
将输入自动分割成微批次,并按照调度实现进行遍历。
根据调度实现遍历微批次。

args:传递给模型的定位参数(如在非管道情况下)。
kwargs:模型的关键字参数(如非管道情况)。
target:损失函数的目标。
losses:存储每个微批次的损失的列表。
"文档"
        raise 未实现异常

    定义 _检查输入(
        ,
        参数_mbs: 可选[列表] = ,
        关键字参数_mbs: 可选[列表] = ,
        目标_mbs: 可选[列表] = ,
        损失: 可选[列表] = ,
    ):
        ""
预处理/检查输入
"文档"

        定义 检查类型和长度(微批次数, 名称: 字符串):
            如果  isinstance(微批次数, 列表):
                raise 类型错误(f"{名称}必须是一个列表,但得到了一个{类型(微批次数)}")
            如果 len(微批次数) != .微批次数量:
                raise ValueError(
                    f预期{self._n_个微批次} {名称}但是得到了{len(mbs)}"
                )

        if arg_mbs   :
            检查类型和长度(参数_mbs, "arg_mbs")
        else:
            arg_mbs = [] * ._n_微批次

        如果 kwarg_mbs   :
            检查类型和长度(关键字参数_mbs, kwarg_mbs)
        else:
            kwarg_mbs = [{}] * ._n_微批次

        如果 目标_mbs   :
            检查类型和长度(目标_mbs, "目标_mbs")

        如果 损失   :
            如果  isinstance(损失, 列表):
                raise 类型错误(f"损失必须是一个列表,但得到了一个"{类型(损失)}")

        返回 参数_mbs, kwarg_mbs

    定义 _compute_loss(, 输出, 目标):
        返回 ._loss_fn(输出, 目标)  # 类型:忽略[杂项]

    定义 _split_inputs(
        ,
        参数: 元组[任何, ...],
        kwargs: 可选[字典[字符串, 任何]] = ,
    ):
        ""
将全批次输入分割成块(即微批次)并返回

"文档"
        如果 args 或者 kwargs:
            参数分割, 关键字参数分割 = 将参数和关键字参数分割成块(
                参数,
                kwargs,
                .微批次数量,
                ._args_chunk_spec,
                ._kwargs_chunk_spec,
            )
            返回 参数分割, 关键字参数分割
        else:
            # 空输入(例如在调用中间阶段时)
            # 返回与块长度匹配的空元组/字典列表
            返回 [] * .微批次数量, [{}] * ._n_微批次

    定义 合并输出(, 输出块: 列表[任意]) -> 任意:
        ""
将输出块合并回批次状态。
如果 output_merge_spec 为 None,则工具将按维度 0(批次维度)合并输出块。
""
        返回 合并块(
            输出块,
            .输出合并规范,
        )


定义 批量 P2P(P2P 操作: 列表[距离.P2P 流], desc: 可选[字符串] = ):
    ""
简单封装了 torch.distributed 中的 batch_isend_irecv,只是在上面添加了描述性日志。
"文档"
    if len(P2P 操作) == 0:
        返回 
    描述字符串 = f"{desc}," if 描述 否则 请提供需要翻译的文本
    日志记录器.调试("批量点对点"%s%s", 描述字符串, P2P 操作)
    返回 距离.批量发送_接收(点对点操作).弹出()


def _sorted_batch_p2p(
    p2p_ops: 列表[距离.P2P 流], desc: 可选[字符串] = 
) -> 字典[整数, 距离.工作]
    ""
按照节点排名对 P2P 操作列表进行排序,然后调用
batch_isend_irecv。返回按节点排名的工作字典。此函数
帮助我们避免跳过连接时的挂起。
"文档"
    # 按照对等节点排名排列 p2p_ops:
    #   int 表示节点排名;
    #   List 表示对节点的操作列表
    ops_by_peer: 字典[整数, 列表[距离.P2P 操作]] = defaultdict(列表)
    work_by_peer: 字典[整数, 距离.工作] = {}
    if len(P2P 操作) == 0:
        返回 对等工作

    将操作按对等排名分类
     操作符  P2P 操作:
        ops_by_peer[操作.同行].追加(操作)

    # 按节点排序调用 batch_isend_irecv,以避免挂起
     同行, ops  排序(ops_by_peer.项目()):
        work_by_peer[同行] = 批量 P2P(操作, desc=desc)

    返回 对等工作


[文档] PipelineScheduleSingle(_管道调度): "" 单阶段调度的基类。 实现了 `step` 方法。 派生类应实现 `_step_microbatches`。 梯度根据 `scale_grads` 参数按 num_microbatches 缩放,默认为 True。此设置 应与您的 loss_fn 配置相匹配,loss_fn 可能会平均损失(scale_grads=True) 或求和损失(scale_grads=False)。 "文档" 定义 初始化( , 阶段: _PipelineStageBase, 微批次数量: 整数, 损失函数: 可选[可调用] = , 参数块规范: 可选[元组[张量块规范, ...]] = , kwargs_chunk_spec: 可选[字典[字符串, 张量块规范]] = , 输出合并规范: 可选[联盟[字典[字符串, 任何], 元组[任何]]] = , 梯度缩放: 布尔类型 = True, ): # 初始化父类 超级().初始化( 微批次数量=微批次数量, 损失函数=损失函数, 参数块规范=参数块规范, kwargs_chunk_spec=kwargs_chunk_spec, 输出合并规范=输出合并规范, 梯度缩放=梯度缩放, ) # 自定义属性 ._阶段 = 阶段 ._num_个阶段 = 阶段.阶段数量 设置阶段对象的 has_backward 标志相同 ._阶段.has_backward = .是否有反向操作 ._阶段已初始化 = if n_微批次 < ._num_stages: raise ValueError( f"微批次数("{微批次数量})必须大于或等于阶段数(\ {._num_stages}). ) 定义 _初始化阶段(, 参数, kwargs): ._阶段.准备前向基础设施(.微批次数量, 参数, kwargs) if ._has_backward: ._阶段._准备反向基础设施(.微批次数量) ._阶段已初始化 = 真实
[文档] def step(self, *args, target=None, losses: Optional[list] = None, **kwargs): """ 运行一次使用 *全批次* 输入的管道调度。 将自动将输入分割成微批次,并按照调度实现进行遍历。 根据调度实现遍历微批次。 args:传递给模型的定位参数(如在非管道情况下)。 kwargs:模型的关键字参数(如在非管道情况下)。 目标:损失函数的目标。 losses:存储每个微批次的损失的列表。 ```python # 输入文本 input_text = '"""' # 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API) def translate_to_simplified_chinese(text): # 假设的翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 清理每轮数据 self._stage.clear_runtime_states() 将输入拆分为微批次 args_split, kwargs_split = self._split_inputs(args, kwargs) 将目标拆分为微批次 如果目标不为空: targets_split = list(torch.tensor_split(target, self._n_microbatches)) 如果为空: targets_split = None # 运行微批次 self._step_microbatches(args_split, kwargs_split, targets_split, losses) # 返回按原始格式合并的结果 如果 self._stage.is_last 为真: 返回 self._merge_outputs(self._stage.output_chunks) 否则: 返回 None
仅前向调度(PipelineScheduleSingle): "" 仅前向的调度。 将遍历所有微批次并仅执行前向传递。 "文档" 定义 _微批次步骤( , 参数_mbs: 可选[列表] = , 关键字参数_mbs: 可选[列表] = , 目标_mbs: 可选[列表] = , 损失: 可选[列表] = , ): "" 运行一次管道调度迭代 "文档" if 目标_mbs 或者 损失 : raise 运行时错误( "单向进度表不支持损失计算" ) 参数_mbs, kwarg_mbs = ._检查输入(参数_mbs, 关键字参数_mbs, 目标_mbs, 损失) if ._阶段已初始化: ._初始化阶段(参数_mbs[0], 关键字参数_mbs[0]\) # 延迟发送等待 等待的正向发送数量: 列表[距离.工作] = [] 运行微批次 i 范围(.微批次数量): 记录功能(f前进{i}"): ops = self._阶段.获取前向接收操作(i) 工作 = _排序批次点对点(操作, desc=fwd_recv) 工作 工作.(): 工作.等待() self._阶段.前进一个块(i, 参数_mbs[i], 关键字参数_mbs[i]\) 忽略索引 ops = self._阶段.获取前向发送操作(i) 工作 = _排序批次点对点(操作, desc=前端发送_) 等待的正向发送数量.扩展(工作.()) 记录器.调试("[%s] 转发微批次%s", self._阶段.stage_index, i) 等待所有前向发送完成 这不应该有性能影响,因为到第一 向后到达时,所有正向发送应该已经完成。 工作 等待的正向发送数量: 工作.等待()
[文档] 安排 GPipe(PipelineScheduleSingle): "" GPipe 的调度。 将按填充-排空的方式遍历所有微批次。 "文档" 定义 _微批次步骤( self, 参数_mbs: 可选[列表] = , 关键字参数_mbs: 可选[列表] = , 目标_mbs: 可选[列表] = , 损失: 可选[列表] = , ): "" 运行一次包含微批次的流水线调度。 将根据 GPipe 调度遍历所有微批次。 参数: 微批次:微批次参数列表。 "文档" 参数_mbs, kwarg_mbs = self._检查输入(参数_mbs, 关键字参数_mbs, 目标_mbs, 损失) 如果 self._阶段已初始化: self._初始化阶段(参数_mbs[0], 关键字参数_mbs[0]) # 延迟发送等待 等待的正向发送数量: 列表[距离.工作] = [] 运行微批次 i 范围(self._n_个微批次): 记录功能(f前进{i}"): ops = self._阶段.获取前向接收操作(i) 工作 = _排序批次点对点(操作, desc=fwd_recv) 工作 工作.(): 工作.等待() 输出 = self._阶段.前进一个块(i, 参数_mbs[i], 关键字参数_mbs[i]) 忽略索引 ops = self._阶段.获取前向发送操作(i) 工作 = _排序批次点对点(操作, desc=前端发送_) 等待的正向发送数量.扩展(工作.()) 记录器.调试("[%s] 转发微批次%s", self._阶段.stage_index, i) self._可能计算损失(self._阶段, 输出, 目标_mbs, i) 等待所有前向发送完成 这不应该有性能影响,因为到第一 向后到达时,所有正向发送应该已经完成。 工作 等待的正向发送数量: 工作.等待() 无损失函数,无需运行反向传播 如果 self._has_backward: 返回 # 运行反向传播 # 延迟发送等待 bwd_sends_to_wait: 列表[距离.工作] = [] i 范围(self._n_个微批次): 记录功能(f向后{i}"): ops = self._阶段.get_bwd_recv_ops(i) 工作 = _排序批次点对点(操作, desc=后向接收器) 工作 工作.(): 工作.等待() 损失 = self.可能会损失(self._阶段, i) self._阶段.向后移动一个块( i, 损失=损失, 最后一次反向=i == self._n_微批次 - 1, ) ops = self._阶段.获取反向发送操作(i) 工作 = _排序批次点对点(操作, desc=bwd_send) bwd_sends_to_wait.扩展(工作.()) 记录器.调试("[%s微批回退%s", self._阶段.stage_index, i) self._阶段.梯度缩放( 梯度缩放因子=self._n_微批次 如果 self.缩放梯度 否则 1 ) 如果传入容器,则返回损失 self.更新损失(self._阶段, 损失) # 等待所有反向发送完成 工作 bwd_sends_to_wait: 工作.等待()
[文档] 调度 1F1B(PipelineScheduleSingle): "" 1F1B 调度 将在稳态下对微批次进行一次正向和一次反向操作。 "文档" 定义 _微批次步骤( self, 参数_mbs: 可选[列表] = , 关键字参数_mbs: 可选[列表] = , 目标_mbs: 可选[列表] = , 损失: 可选[列表] = , ): "" 运行一次包含微批次的流水线调度。 将根据 1F1B 调度遍历所有微批次。 参数: 微批次:微批次参数列表。 "文档" 参数_mbs, kwarg_mbs = self._检查输入(参数_mbs, 关键字参数_mbs, 目标_mbs, 损失) 如果 self._阶段已初始化: self._初始化阶段(参数_mbs[0], 关键字参数_mbs[0]) # 最后阶段有 1 次预热,倒数第二阶段有 2 次预热,... # 第一阶段 `num_stages` 次预热 预热块 = 最小( self._n_个微批次, self._num_个阶段 - self._阶段.stage_index, ) # 块计数器 fwd_mb_index = 0 bwd_mb_index = 0 # 预热阶段 发送工作 = 前向发送 = [] _ 范围(预热块): # 接收激活 前向接收者 = self._阶段.获取前向接收操作(fwd_mb_index) 如果 接收工作 := 批量 P2P(前向接收者, desc=fwd_recv): 接收工作.等待() 计算机计算 输出 = self._阶段.前进一个块( fwd_mb_index, 参数_mbs[fwd_mb_index], 关键字参数_mbs[fwd_mb_index] ) 忽略索引 清除前一个块的前向发送(希望它们已经完成,否则,我们将严重受限于通信,在这种情况下,它不会为计算下一个块带来很多好处) # finished, otherwise, we are heavily communication bound, in which # case it doesn't create a lot of benefit to compute next chunk # eagerly either) 如果 发送工作: 发送工作.等待() 发送激活 前向发送 = self._阶段.获取前向发送操作(fwd_mb_index) 如果 fwd_mb_index != 预热块 - 1: # 安全发射 发送工作 = 批量 P2P(前端发送, desc=前端发送_) 否则: 最后一个前向发送保留与下面 1B1F 中的第一个 1B 融合 计算损失 self._可能计算损失(self._阶段, 输出, 目标_mbs, fwd_mb_index) fwd_mb_index += 1 # 现在我们应该还有剩余的发送操作,将与下面的 1B1F 阶段的第一 1B 融合。 # 1B1F 阶段 True: # 别担心,我们中间有休息时间 # 我们实际上先做 1B,正如`1B1F`的名字所指示的,所以准备它的接收操作 bwd_recvs = self._阶段.get_bwd_recv_ops(bwd_mb_index) 现在,我们需要一起触发 fwd_sends 和 bwd_recvs 如果 融合工作 := 批量 P2P(前向发送 + 后向接收, desc=前向发送后向接收): 融合工作.等待() 向后移动一个块 损失 = self.可能会损失(self._阶段, bwd_mb_index) self._阶段.向后移动一个块( bwd_mb_index, 损失=损失, 最后一次反向=bwd_mb_index == self._n_微批次 - 1, ) # 获取反向发送操作,但不触发,将与下面的 1F 融合 反向发送 = self._阶段.获取反向发送操作(bwd_mb_index) bwd_mb_index += 1 如果 fwd_mb_index == self._n_个微批次: # 我们完成了 1B1F,所以留下一些剩余的 bwd_sends 后中断 断开 # 我们准备`1B1F`的 1F 前向接收者 = self._阶段.获取前向接收操作(fwd_mb_index) 将其与上面的 bwd_sends 合并 如果 融合工作 := 批量 P2P(反向发送 + 前向接收者, desc="后向发送前向接收"): 融合工作.等待() 现在进行前向操作 输出 = self._阶段.前进一个块( fwd_mb_index, 参数_mbs[fwd_mb_index], 关键字参数_mbs[fwd_mb_index] ) 忽略索引 计算损失 self._可能计算损失(self._阶段, 输出, 目标_mbs, fwd_mb_index) # 获取前向发送操作,但不触发,留待下一迭代(循环) 前向发送 = self._阶段.获取前向发送操作(fwd_mb_index) fwd_mb_index += 1 记得我们在休息后还有一些 bwd_sends 没发出去吗?现在该发射了 发送工作 = 批量 P2P(bwd_sends, desc=bwd_send) # 冷却时间 bwd_mb_index < self._n_个微批次: # 准备反向接收操作 bwd_recvs = self._阶段.get_bwd_recv_ops(bwd_mb_index) 如果 接收工作 := 批量 P2P(后向接收, desc=后向接收器): 接收工作.等待() 向后移动一个块 损失 = self.可能会损失(self._阶段, bwd_mb_index) self._阶段.向后移动一个块( bwd_mb_index, 损失=损失, 最后一次反向=bwd_mb_index == self._n_微批次 - 1, ) # 清除前一个块的反向发送(希望它们已经完成得很好) 如果 发送工作: 发送工作.等待() # 获取反向发送操作,执行它 反向发送 = self._阶段.获取反向发送操作(bwd_mb_index) 发送工作 = 批量 P2P(bwd_sends, desc=bwd_send) bwd_mb_index += 1 self._阶段.梯度缩放( 梯度缩放因子=self._n_微批次 如果 self.缩放梯度 否则 1 ) 等待最后一次反向发送完成 如果 发送工作: 发送工作.等待() 如果传入容器,则返回损失 self.更新损失(self._阶段, 损失)
定义 添加未分片重分片( 计算动作: 列表[可选[_动作]], 最大活跃阶段数: 整型 = 3, ) -> 列表[_动作] 给定仅涉及计算动作(F,B,W)的基本调度,为 FSDP 添加 UNSHARD/RESHARD 动作。 UNSHARD 指的是获取 FSDP 分片层的全部内容,需要执行全聚合操作。 RESHARD 执行相反操作,释放内存(但不进行通信) 在降低过程中,我们放弃了“时间步锁定” max_active_stages 控制我们允许的预取数量。它应以 MB 为单位进行测量,并且可调整,但在实践中 3 个阶段可能是我们想要的吗? (考虑到有一个 f 和一个 b 处于激活状态,还有其他内容正在预取?) "文档" 定义 next_stage_indices( 数量: 整数, 下一个动作: 列表[可选[_动作]] ) -> 列表[整数] "删除重复项(相同阶段,不同微批次),找到下一个将要进行计算的 'count' 个阶段。" 已见: 集合[整数] = 集合() 返回: 列表[整数] = [] a 下一个动作: 如果 a a.阶段索引 已见: 已见.添加(a.stage_index) 返回.追加(a.阶段索引) 如果 len(返回) == 数量: 断开 返回 返回 active_stages: 集合[整数] = 集合() fsdp 感知动作: 列表[_动作] = [] 定义 _unshard(stage_index: 整数): active_stages.添加(stage_index) fsdp 感知动作.追加(_动作(stage_index, 未碎片化, )) 定义 重分片(stage_index: 整数): active_stages.删除(stage_index) fsdp 感知动作.追加(_动作(stage_index, 重新碎片化, )) i, 行动 列举(计算动作): 如果 行动 : 继续 # 我们预取下一个 N 个将要看到的阶段,以腾出空间,丢弃现有阶段 next_n = next_stage_indices(最大活跃阶段数, 计算动作[i]) # 获取操作需要正确排序,因此不要使用集合 获取 = 列表(过滤(lambda s: s 活跃阶段, next_n)) # 对于驱逐的最佳策略尚不明确,但我们可以维持秩序,所以我们可以 驱逐 = 列表(过滤(lambda s: s next_n, active_stages)) # logger.debug( # "_add_unshard_reshard 步 %d 活动:%s 获取:%s,驱逐:%s", # i, # active_stages, # fetch, # evict, # ) 阶段 驱逐: 重塑(阶段) 阶段 获取: _unshard(阶段) fsdp 感知动作.追加(动作) 返回 fsdp 感知动作 def 合并两边( 计算动作: 列表[可选[_动作]], ) -> 列表[_动作] 给定只涉及计算操作(F,I,W)的基本调度,将相邻的 I 和 W 操作合并为 B 操作。 (注意:I = BACKWARD_INPUT,W = BACKWARD_WEIGHT,B = FULL_BACKWARD) B 表示运行整个反向操作(不分离 grad_input 和 grad_weight),在某些情况下可能更高效。 在某些情况下可能更高效。 "" 合并操作 = [] 计算动作: 行动 = 计算动作.弹出(0) 如果 行动 : 继续 len(计算动作) (下一个动作 := 计算动作[0]) : # 删除 '动作' 和 '下一个动作' 之间的任何 None 动作 计算动作.弹出(0) 如果 ( 动作.计算类型 == 反向输入 下一个动作 下一个动作.计算类型 == 反向权重 动作.阶段索引 == 下一个动作.阶段索引 动作.微批索引 == 下一个动作.微批索引 ): 合并动作.追加( _动作(动作.stage_index, 全部反向, 动作.微批索引) ) 计算动作.弹出(0) else: 合并动作.追加(动作) 返回 合并操作 def _添加发送接收( 计算动作: 字典[整数, 列表[_动作]], 阶段到排名: 可调用[[整数], 整数], 阶段数量: 整数, ) -> 字典[整数, 列表[_动作]] 通信操作: 字典[整数, 列表[_动作]] = {排名: [] 排名 计算动作} 前置操作: 字典[整数, 集合[_动作]] = {排名: 集合() 排名 计算动作} def _通讯(动作: _动作) -> 布尔: 如果 行动.计算类型 == F: 返回 动作.阶段索引 != 阶段数量 - 1 阶段到排名( 动作.阶段索引 + 1 ) != 阶段到排名(动作.stage_index) elif 动作.计算类型 (反向输入, 全部反向): 返回 动作.阶段索引 != 0 阶段到排名( 动作.阶段索引 - 1 ) != 阶段到排名(动作.stage_index) 返回 def _get_comms(动作: _动作) -> 元组[动作, 动作] 断言 _通讯(动作), f"{动作}不是一个有效的通讯动作 stage_idx = 动作.阶段索引 类型 = 动作.计算类型 mb 索引 = 动作.微批索引 发送 = _动作(stage_idx, 发送_F 如果 类型 == F 否则 发送_B, mb 索引) 接收阶段索引 = stage_idx + 1 如果 类型 == F 否则 stage_idx - 1 接收 = _动作(接收阶段索引, 接收_F 如果 类型 == F 否则 接收_B, mb_idx) 返回 发送, 接收 def _准备调度( 动作: 可选[_动作], 前置操作: 集合[_动作] ) -> 布尔: 我们不把我们的接收操作放入调度中,我们让另一个 rank 上的发送者来放置我们的接收操作。 这有助于确保发送和接收的正常(非挂起)顺序。 但这也意味着我们可能还不能安排下一次计算操作。 "文档" 如果 行动 : 返回 真实 elif 动作.计算类型 == F 动作.阶段索引 == 0: 如果 ( _动作(动作.stage_index, 接收_F, 动作.微批索引) 前动作 ): 返回 真实 elif ( _动作(动作.阶段索引 - 1, F, 动作.微批索引) 前动作 ): 返回 真实 返回 elif ( 动作.计算类型 (反向输入, 全部反向) 动作.阶段索引 == 阶段数量 - 1 ): 如果 ( _动作(动作.stage_index, 接收_B, 动作.微批索引) 前动作 ): 返回 真实 elif ( _动作(动作.阶段索引 + 1, 反向输入, 动作.微批索引) 前动作 ): 返回 真实 elif ( _动作(动作.阶段索引 + 1, 全部反向, 动作.微批索引) 前动作 ): 返回 真实 返回 否则: 返回 真实 计算动作: 进度 = # 按排名顺序进行,即使字典键未排序 排名 排序(计算动作): 断言 len(计算动作[排名]) > 0, ( f"{排名=}, {len(计算动作[排名])=}" ) 行动 = 计算动作[排名] [0] 如果 _准备调度(动作, 前置操作[排名)] 继续 如果 行动 : 通信操作[排名].追加(动作) 前置操作[排名].添加(动作) 如果 _通讯(动作): 发送, 接收 = _get_comms(动作) # TODO 我们可以避免在两个阶段位于同一 rank 时发送/接收。 # 我们应该在运行时还是在这里避免这种情况? 通信操作[排名].追加(发送) 前置操作[排名].添加(发送) 通信操作[阶段到排名(接收.stage_index)].追加(接收) 前置操作[阶段到排名(接收.stage_index)].添加(接收) 计算动作[排名].弹出(0) 如果 len(计算动作[排名]) == 0: 删除 计算动作[排名] 进度 = 真实 断言 进度, "计算调度格式错误,无法安排发送/接收" 返回 通信动作 def _验证调度( 动作: 字典[整数, 列表[可选[_动作]], 算法组大小: 整数, 阶段数量: 整数, num_microbatches: 整数, ) -> 字典[整数, 整数] 断言 len(动作) == 算法组大小, ( f"调度中排名数量不正确 - 预期"{算法组大小}实际{len(动作)}" ) 排名 范围(算法组大小): 断言 排名 动作, f"缺少为 rank 指定的动作"{排名}" # 我们将统计每个阶段的动作总数,并确保它们按有效顺序发生 # (例如,F 在(B, I)之前 W 对于给定的微批) 阶段动作: 字典[整数, 字典[计算类型, 集合]] = { 阶段 ID: { F: 集合(), B: 集合(), I: 集合(), W: 集合(), } stage_id 范围(阶段数量) } stage_index_to_rank_mapping = {} 排名 动作: 行动 动作[排名] 如果 行动 : 继续 断言 isinstance(动作, _动作), ( f"获取了一个无效的动作:{动作}预期的实例为 _Action ) s_id = 动作.阶段索引 类型 = 动作.计算类型 mb_id = 动作.微批索引 如果 类型 == F: 阶段动作[s_id] [F].添加(mb_id) elif 类型 == B: 断言 mb_id 阶段动作[s_id] [F], ( f为阶段运行完整反向{s_id},微批{mb_id}先运行“正向” ) 阶段动作[s_id] [B].添加(mb_id) elif 类型 == I: 断言 mb_id 阶段动作[s_id] [F], ( f"向后输入阶段"{s_id},微批{mb_id}先运行“正向” ) 阶段动作[s_id] [I].添加(mb_id) elif 类型 == W: 断言 mb_id 阶段动作[s_id] [I], ( f阶段向后运行权重{s_id},微批{mb_id}未运行反向输入 ) 阶段动作[s_id] [W].添加(mb_id) 如果 s_id 阶段索引到排名映射: 阶段索引到排名映射[s_id] = 排名 否则: 已存在排名 = 阶段索引到排名映射[s_id] 断言 排名 == 现有排名, ( f阶段{s_id}被分配给两个等级{排名}和等级{现有排名}" ) s_id 阶段动作: f_mb = len(阶段动作[s_id] [F]) b_mb = len(阶段动作[s_id] [B]) i_mb = len(阶段动作[s_id] [I]) w_mb = len(阶段动作[s_id] [W]) 断言 f_mb == num_microbatches, ( f"已获取"{f_mb} {F}阶段微批{s_id},预期{num_microbatches}" ) 断言 b_mb + (i_mb + w_mb) // 2 == num_microbatches, ( f"无效的向后微批处理阶段"{s_id}预期{num_microbatches}完全相反,\ 但得到了 B={b_mb}, I={i_mb}, W={w_mb}" ) 返回 stage_index_to_rank_mapping
[文档] 管道调度多(_管道调度): "" 多阶段调度的基类。 实现了 `step` 方法。 梯度根据 `scale_grads` 参数按 num_microbatches 缩放,默认为 True。此设置 应与您的 loss_fn 配置相匹配,loss_fn 可能会平均损失(scale_grads=True) 或求和损失(scale_grads=False)。 "" def 初始化( , 阶段: 列表[_PipelineStageBase], 微批次数量: 整数, 损失函数: 可选[可调用] = , 参数块规范: 可选[元组[张量块规范, ...]] = , kwargs_chunk_spec: 可选[字典[字符串, 张量块规范]] = , 输出合并规范: 可选[联盟[字典[字符串, 任意], 元组[任意]]] = , 使用完整反向: 可选[布尔] = , 梯度缩放: 布尔类型 = True, ): # 初始化父类 超级().初始化( 微批次数量=微批次数量, 损失函数=损失函数, 参数块规范=参数块规范, kwargs_chunk_spec=kwargs_chunk_spec, 输出合并规范=输出合并规范, 梯度缩放=梯度缩放, ) # 自定义属性 ._阶段 = 阶段 ._num_个阶段 = 阶段[0].阶段数量 .pp_group_size = 阶段[0].组大小 .排名 = 阶段[0].组排名 设置管道阶段状态 .阶段索引到组排名 = 生成阶段到排名映射( .算法组大小, ._num_个阶段 ) 阶段 .阶段: 阶段.阶段索引到组排名 = .阶段索引到组排名 设置阶段对象的 has_backward 标志相同 阶段 .阶段: 阶段.has_backward = .是否有反向操作 .阶段初始化 = 避免在 lambda 中放置对'self'的引用,它会导致循环引用 has_loss: 布尔类型 = self._loss_fn self.应该计算损失 = lambda 阶段: 阶段.是最后一个 是否有损失 这将在派生计划初始化时设置 self.管道顺序: 字典[整数, 列表[可选[_动作]]] = {} 如果 使用完整反向 : 记录器.警告( "弃用警告:'use_full_backward' 已不再受支持。" "只需停止传递它,一切应该仍然正常工作。" ) def 初始化阶段(自身, 参数: 元组[任意, ...], kwargs): # 可能是 'none' 值(如果此阶段通过 P2P 将其输出形状发送到下一阶段) # 或真实值(如果此阶段和下一阶段在同一设备上) 下一个阶段参数: 元组[任意, ...] = 元组() 阶段 .阶段: 如果 阶段.is_first: 下一个阶段的参数 = 阶段.准备前向基础设施( self._n_个微批次, 参数, kwargs ) 否则: 下一个阶段的参数 = 阶段.准备前向基础设施( self._n_个微批次, 下一个阶段参数, kwargs ) 如果 self._has_backward: 阶段._准备反向基础设施(自身._n_个微批次) 自身.阶段初始化 = 真实 def _validate_and_set_stage_mapping( 自身, 动作: 字典[整数, 列表[可选[_动作]]] ) -> : "" 分配阶段索引到排名映射,这是通信所需的 "源代码" 自身.阶段索引到组排名 = _验证调度( 动作, .算法组大小, ._num_stages, self._n_个微批次, ) 阶段 self.阶段: 阶段.阶段索引到组排名 = self.阶段索引到组排名 def _导出 CSV(self, 文件名): 将排程的 CSV 表示形式输出到提供的文件名文件中。 打开(文件名, w, 换行符=输入文本翻译为简体中文为:"") 作为 CSV 文件: 作者 = csv.作者(CSV 文件) 排名 self.管道顺序: 作者.写入行(self.管道顺序[排名]) def 加载 CSV(self, 文件名, 格式="仅计算"): "从提供的文件名中加载调度计划的 CSV 表示形式。" 此 API 可能会被重命名/重构,因此目前标记为内部使用。 对于 PipelineScheduleMulti,格式必须是"仅计算"。 "源代码" 断言 格式 == 仅计算 打开(文件名, 换行符=输入文本翻译为简体中文为:"") 作为 CSV 文件: 读者 = csv.读取器(CSV 文件) 排名, 列举(读取器): self.管道顺序[排名] = [_动作.from_str(s) s ] # 验证管道操作的顺序并推断阶段到排名映射。 # 这将覆盖构造函数中创建的默认阶段到排名映射。 自身._validate_and_set_stage_mapping(自身.管道顺序)
[文档] def step(self, *args, target=None, losses: Optional[list] = None, **kwargs): """ 使用*全批次*输入运行一次管道调度迭代。 将自动将输入分块为微批次,并依次通过 根据计划实施微批次。 参数:模型的位置参数(如在非管道情况下)。 kwargs:模型的关键字参数(如在非管道情况下)。 目标:损失函数的目标。 losses: 存储每个微批次的损失列表。 """ # 清理每迭代一次 for stage in self._stages: stage.clear_runtime_states() # 将输入分割成微批次 args_split, kwargs_split = self._split_inputs(args, kwargs) # 将目标分割成微批次 如果目标不是 None: targets_split = list(torch.tensor_split(target, self._n_microbatches)) 否则: targets_split = None # 运行微批次 # 对分拆的参数、关键字和目标以及损失进行合并 # 返回按原始格式合并的结果 # 遍历 self._stages 中的每个阶段 if stage.is_last: return self._merge_outputs(stage.output_chunks) # 不包含最后一个阶段 return None
def _微批次步骤( 自身, 参数_mbs
: 可选[列表] = , 关键字参数_mbs: 可选[列表] = , 目标_mbs: 可选[列表] = , 损失: 可选[列表] = , ): "" 在循环调度上对微批次进行操作(每个 rank 有多个阶段)。 TODO:未使用 sorted_batch_isend_irecv()。因此,此调度不支持具有跳转连接的模型。 不支持具有跳转连接的模型。 "源代码" 参数_mbs, kwarg_mbs = 自身._检查输入(参数_mbs, 关键字参数_mbs, 目标_mbs, 损失) 如果 自身._stages_initialized: 自身.初始化阶段(参数_mbs[0], 关键字参数_mbs[0]) # 根据在 __init__ 中创建的第 1 步计划: # 2. 根据 pipeline_order 执行通信 阶段索引到阶段: 字典[整数, _PipelineStageBase] = { 阶段.stage_index: 阶段 阶段 自身._阶段 } # 根据哪些排名相邻确定 prev_rank 和 next_rank # 基于 pipeline_order 中的阶段 所有前名次: 集合[整数] = 集合() all_next_ranks: 集合[整数] = 集合() 阶段索引 阶段索引到阶段.(): # TODO: 假设阶段之间只从+1/-1 的距离通信(没有跳转连接) 如果 阶段索引 > 0: 所有前名次.添加(自身.阶段索引到组排名[阶段索引 - 1]) 如果 阶段索引 < 自身._num_个阶段 - 1: all_next_ranks.添加(自身.阶段索引到组排名[阶段索引 + 1]) # 将 full_backward 或 backward_weight 之一计数,以确定何时同步 DP 梯度 backward_counter: 计数器[整数] = 计数器() time_step, 行动 列举(自身.管道顺序[自身.排名)] try: 操作: 列表[距离.P2P 操作] = [] 如果 行动 : 计算类型 = 动作.计算类型 mb 索引 = 动作.微批索引 阶段索引 = 动作.阶段索引 断言 mb 索引 , ( 所有当前支持的操作类型都需要有效的微批处理索引 ) 如果 计算类型 == 计算类型.前向: 执行前向计算 阶段 = 阶段索引到阶段[stage_index] 输出 = 阶段.前进一个块( mb 索引, 参数_mbs[mb 索引], 关键字参数_mbs[mb 索引] ) 自身._可能计算损失(阶段, 输出, 目标_mbs, mb 索引) 操作.扩展(阶段.获取前向发送操作(mb 索引)) elif 计算类型 == 计算类型.全部反向: 执行反向计算 阶段 = 阶段索引到阶段[stage_index] 损失 = self.可能会损失(阶段, mb 索引) backward_counter[stage_index] += 1 最后一个反向 = ( backward_counter[stage_index] == 自身._n_微批次 ) 梯度缩放因子 = ( 自身._n_微批次 如果 自身.缩放梯度 否则 1 ) 阶段.向后移动一个块( mb 索引, 损失=损失, 全部后向=True, 最后一次反向=最后一次反向, ) 如果 最后一次反向: 阶段.梯度缩放(梯度缩放因子) 操作.扩展(阶段.获取反向发送操作(mb 索引)) elif 计算类型 == 计算类型.反向输入: 执行反向计算 阶段 = 阶段索引到阶段[stage_index] 损失 = self.可能会损失(阶段, mb 索引) 阶段.向后移动一个块( mb 索引, 损失=损失, 全部后向=错误, 最后一次反向=错误, ) 操作.扩展(阶段.获取反向发送操作(mb 索引)) elif 计算类型 == 计算类型.反向权重: 执行权重更新 阶段 = 阶段索引到阶段[stage_index] backward_counter[stage_index] += 1 最后一个反向 = ( backward_counter[stage_index] == self._n_微批次 ) 梯度缩放因子 = ( self._n_微批次 如果 self.缩放梯度 否则 1 ) 阶段.向后权重一个块( mb 索引, 最后一次反向=最后一次反向, ) 如果 最后一次反向: 阶段.梯度缩放(梯度缩放因子) 否则: 抛出 ValueError(f"未知计算类型"{计算类型}") 查看当前时间步的相邻排名,并确定当前排名是否需要进行任何接收通信 # this current rank needs to do any recv communication 前一个排名 所有前名次: 前一排名操作 = self.管道顺序[前一排名] 前一排名操作 = 如果 时间步长 < len(前一排名操作): 前一排名操作 = 前一排名操作[time_step] 如果 前一排名操作 : 计算类型 = 前一排名操作.计算类型 mb 索引 = 前一排名操作.微批索引 阶段索引 = 前一排名操作.阶段索引 断言 mb 索引 , ( 所有当前支持的操作类型都需要有效的微批处理索引 ) 仅处理来自前一排名的前向发送 如果 计算类型 == 计算类型.前向: 如果不是最后一个阶段,则接收前向激活 如果 阶段索引 + 1 阶段索引到阶段: # TODO: 我们假设阶段将从阶段-1 接收 # 然而,这并不一定适用于 get_fwd_recv_ops 阶段 = 阶段索引到阶段[阶段索引 + 1] 操作.扩展(阶段.获取前向接收操作(mb 索引)) elif 计算类型 ( 全部反向, 反向输入, 反向权重, ): # 前一排名进行反向操作对当前排名正向接收无影响 通过 否则: 抛出 ValueError( f"未知计算类型"{计算类型}" ) 下一个等级 all_next_ranks: 下一个等级操作 = self.管道顺序[下一个等级] 下一个等级动作 = 如果 时间步长 < len(下一个等级操作): 下一个等级动作 = 下一个等级操作[time_step] 如果 下一个等级动作 : 计算类型 = 下一个等级动作.计算类型 mb 索引 = 下一个等级动作.微批索引 阶段索引 = 下一个等级动作.阶段索引 断言 mb 索引 , ( 所有当前支持的操作类型都需要有效的微批处理索引 ) 仅处理来自下一个等级的后向接收 如果 计算类型 (前向, 反向权重): 下一个等级进行前向或权重更新对当前等级的向后接收没有影响 通过 elif 计算类型 (反向输入, 全部反向): 如果不是第一阶段,则接收反向梯度 如果 阶段索引 - 1 阶段索引到阶段: # TODO: 我们假设阶段总是从阶段+1 接收 # 然而,这不一定适用于 get_bwd_recv_ops 阶段 = 阶段索引到阶段[阶段索引 - 1] 操作.扩展(阶段.get_bwd_recv_ops(mb 索引)) 否则: 抛出 ValueError( f"未知计算类型"{计算类型}" ) # 进行通信 如果 操作: 批量 P2P(操作).等待() 除了 异常 作为 e: 记录器.错误( 排名%s] 管道调度%s捕获到以下异常\ 在时间步%s运行操作时%s", self.排名, self..__name__, time_step, 行动, ) 记录器.错误( "%s", _format_pipeline_order( self.管道顺序, 错误步骤编号=时间步长 ), ) 提升 e 如果传入容器,则返回损失 self.更新损失(self.阶段, 损失)
_PipelineScheduleRuntime(管道调度多): "" 提供一个简单的运行时,该运行时需要一个包括指定通信操作的 'schedule IR'。 可以直接通过创建 _PipelineScheduleRuntime 并调用 load_csv 来实例化,或者可以通过 子类化,子类可以负责创建一个调度 IR。 "源代码" def _load_actions( self, 动作: 字典[整数, 列表[可选[_动作]], 格式: 字符串 = "仅计算", ): "" 对于一个简单的内存表示的计算调度,将其降低为一个包括通信动作的复杂调度 将调度存储在 self 中,必须在运行 step_mo()之前调用 "源代码" 验证提供的操作是否有效并覆盖默认的 stage_index_to_group_rank 阶段 超级()._validate_and_set_stage_mapping(动作) self.通讯管道顺序: 字典[整数, 列表[动作]] = {} 如果 格式 == 计算通信: 排名 动作: self.通讯管道顺序[排名] = [] 行动 动作[排名] 断言 行动 self.通讯管道顺序[排名].追加(动作) TODO 应该为计算+通信计划提供何种级别的验证? elif 格式 == "仅计算": 执行调度降低 排名 动作: self.通讯管道顺序[排名] = 添加未分片重分片( 动作[排名] ) self.带通信的管道顺序 = _添加发送接收( self.通讯管道顺序, 阶段到排名=lambda s: self.阶段索引到组排名[s], 阶段数=self._num_stages, ) 否则: 抛出 不支持的操作异常(f"{格式=}尚未实现) def 加载 CSV(self, 文件名: 字符串, 格式: 字符串 = "仅计算"): "加载简单格式的 csv 文件,并将其转换为小写以包含通信动作" 格式必须是“仅计算”或“计算通信”。如果是“仅计算”,则将自动运行降级操作以生成计算通信计划。 将自动运行降级操作以生成计算通信计划。 "源代码" 如果 格式 == "仅计算": # 将填充 self.pipeline_order 超级().加载 CSV(文件名) # this will populate self.pipeline_order_with_comms self._load_actions(self.管道顺序) elif 格式 == 计算通信: 动作 = {} 打开(文件名, 换行符=输入文本翻译为简体中文为:"") 作为 CSV 文件: 读者 = csv.读取器(CSV 文件) 排名, 列举(读取器): 动作[排名] = [动作.from_str(s) s ] self._load_actions(动作, 格式=格式) 否则: 提升 不支持的操作异常(f"{格式=}尚未实现) def _导出 CSV(self, 文件名: 字符串): 将计算 + 通信计划以 CSV 格式导出到提供的文件名文件中。 # TODO 是否应该有从 PipelineScheduleRuntime 导出仅计算计划的选项?这是可能的 # 如果它是由计算 + 通信计划创建的,则可能不存在。 断言 self.带通信的管道顺序 , ( 必须在导出 csv 之前初始化 compute_comms 调度 ) 打开(文件名, w, 换行符=输入文本翻译为简体中文为:"") 作为 CSV 文件: 作者 = csv.作者(CSV 文件) 排名 self.通讯管道顺序: 作者.写入行(self.通讯管道顺序[排名]) def 模拟(self): 返回 模拟通讯计算( self.通讯管道顺序, lambda s: self.阶段索引到组排名[s], self._num_stages, ) def _微批次步骤( self, 参数_mbs: 可选[列表] = , 关键字参数_mbs: 可选[列表] = , 目标_mbs: 可选[列表] = , 损失: 可选[列表] = , ): "" 在循环调度上对微批次进行操作(每个 rank 有多个阶段)。 TODO:未使用 sorted_batch_isend_irecv()。因此,此调度不支持具有跳转连接的模型。 不支持具有跳转连接的模型。 "源代码" 参数_mbs, kwarg_mbs = self._检查输入(参数_mbs, 关键字参数_mbs, 目标_mbs, 损失) 如果 self._stages_initialized: self.初始化阶段(参数_mbs[0], 关键字参数_mbs[0]) # 根据在 __init__ 中创建的第 1 步计划: # 2. 根据 pipeline_order 执行通信 阶段索引到阶段: 字典[整数, _PipelineStageBase] = { 阶段.阶段索引: 阶段 阶段 self._阶段 } 断言 self.带通信的管道顺序 , ( "必须在调用 _step_microbatches() 之前调用 _load_actions()" ) # 接收操作按 (阶段索引, 微批次索引) 索引,在使用前需要等待 反向接收操作: 字典[元组[整数, 整数], 工作] = {} 前向接收操作: 字典[元组[整数, 整数], 工作] = {} # 发送操作应在 step()执行之前等待,主要是为了卫生 send_ops: 列表[工作] = [] # 当与 FSDP 一起使用时,我们跟踪哪些阶段是“活跃”的,并在对阶段进行计算之前等待未分片操作 解碎片操作: 字典[整数, UnshardHandle] = {} unsharded_stages = 集合() 定义 断言非分片(stage_idx: 整数): 如果`stage_idx`处于活动状态,则等待它并标记`stage_idx`为未共享。 如果 stage_idx 解碎片操作: 解碎片操作[stage_idx].等待() 删除 解碎片操作[stage_idx] 未分片阶段.添加(stage_idx) 断言 stage_idx 未分片阶段, ( f尝试在分片上计算{stage_idx=}" ) # 将 full_backward 或 backward_weight 之一计数,以确定何时同步 DP 梯度 backward_counter: 计数器[整数] = 计数器() time_step, 行动 列举(self.通讯管道顺序[self.排名)] try: comp_type = 行动.计算类型 mb 索引: 整型 = ( 行动.微批索引 如果 行动.微批索引 否则 -1 ) 断言 mb 索引 >= 0 comp_type ( 未碎片化, 重新碎片化, ), f"{行动=}缺少 mb_index" stage_idx = 行动.阶段索引 阶段 = 阶段索引到阶段[stage_idx] stage_uses_fsdp = isinstance(阶段.子模块, FSDP 模块) 参见[注意:V 调度特殊情况] 是否在此 rank 上开启下一阶段 = stage_idx + 1 阶段索引到阶段 是否前一个阶段在此 rank 上 = stage_idx - 1 阶段索引到阶段 记录器.调试( "_PipelineScheduleRuntime 运行时间步长"%d,动作"%s", time_step, 行动, ) # TODO(whc) 在模型有跳过连接的不常见情况下,在这里使用 _batch_p2p 实际上并不安全 我们不希望在超过一对排名之间批量操作。_sorted_batch_p2p 将 # 安全使用替代。 然而,我在想,如果存在批处理操作的情况,我是否应该完全避免调用批处理操作 每批只处理一个操作员。我可以逐个遍历 'fwd_send_ops' 并运行它们。 如果 comp_type == 发送_F: send_ops.追加(批量 P2P(阶段.获取前向发送操作(mb 索引))) elif comp_type == 发送_B: send_ops.追加(批量 P2P(阶段.获取反向发送操作(mb 索引))) elif comp_type == 接收_F: 断言 ( stage_idx, mb 索引, ) 前向接收操作, ( "接收两次 {阶段索引=}{块索引=} 而不执行前向" ) 前向接收操作[stage_idx, mb 索引] = 批量 P2P( 阶段.获取前向接收操作(mb 索引) ) elif comp_type == 接收_B: 断言 ( stage_idx, mb 索引, ) 反向接收操作, ( "两次接收 {stage_idx=} {mb_index=} 而不执行反向操作" ) 反向接收操作[stage_idx, mb 索引] = 批量 P2P( 阶段.get_bwd_recv_ops(mb 索引) ) elif comp_type == 未碎片化: 如果 阶段使用 fsdp: 断言 ( stage_idx unsharded_stages stage_idx unshard_ops ), f"Unsharding the same"{stage_idx=}两次" 解碎片操作[stage_idx] = 阶段.子模块.解碎片(async_op=True) # type: ignore[operator] elif comp_type == 重新碎片化: 如果 阶段使用 fsdp: 断言 stage_idx 未分片阶段, ( f重新分片{stage_idx=}不进行解分片 ) 断言 stage_idx 解碎片操作, ( f重新分片{stage_idx=}在完成 unshard 之前" ) 阶段.子模块.重新分片() # type: ignore[operator] elif comp_type == 前向: 如果 阶段使用 fsdp: 断言非分片(stage_idx) 如果 ( 阶段.是第一个 对于 V-schedule 特殊案例(参见[注:V-schedule 特殊案例])不期望接收操作 是否前一个阶段在此 rank 上 ): 断言 ( stage_idx, mb 索引, ) 前向接收操作, f计算{行动=}在接收输入之前" 前向接收操作.弹出((stage_idx, mb 索引)).等待() 输出 = 阶段.前进一个块( mb 索引, 参数_mbs[mb 索引], 关键字参数_mbs[mb 索引] ) self._可能计算损失(阶段, 输出, 目标_mbs, mb 索引) 避免在相同 rank 上的相邻阶段进行特殊情况的发送/接收操作 参见[注意:V 调度特殊情况] 如果 当前排名是否开启下一阶段: 阶段索引到阶段[stage_idx + 1].设置本地前向输入( 输出, mb 索引 ) elif comp_type == 全部反向: 如果 阶段使用 fsdp: 断言非分片(stage_idx) 如果 ( 阶段.是最后一个 对于 V-schedule 特殊案例(参见[注:V-schedule 特殊案例])不期望接收操作 是否在此 rank 上开启下一阶段 ): 断言 ( stage_idx, mb 索引, ) 反向接收操作, ( f尝试运行计算{行动=}在接收输入之前" ) 反向接收操作.弹出((stage_idx, mb 索引)).等待() 损失 = self.可能会损失(阶段, mb 索引) backward_counter[stage_idx] += 1 最后一个反向 = backward_counter[stage_idx] == self._n_微批次 梯度缩放因子 = self._n_微批次 如果 self.缩放梯度 否则 1 阶段.向后移动一个块( mb 索引, 损失=损失, 全部后向=True, 最后一次反向=最后一次反向, ) 如果 最后一次反向: 阶段.梯度缩放(梯度缩放因子) 避免在相同 rank 上的相邻阶段进行特殊情况的发送/接收操作 参见[注意:V 调度特殊情况] 如果 is_prev_stage_on_this_rank: 阶段索引到阶段[stage_idx - 1].设置本地反向输入( 阶段.获取局部反向输出(mb 索引), mb 索引 ) elif comp_type == 反向输入: 如果 阶段使用 fsdp: 断言非分片(stage_idx) 如果 阶段.是最后一个 当前排名是否开启下一阶段: 断言 ( stage_idx, mb 索引, ) 反向接收操作, ( f尝试运行计算{行动=}在接收输入之前" ) 反向接收操作.弹出((stage_idx, mb 索引)).等待() 损失 = self.可能会损失(阶段, mb 索引) 阶段.向后移动一个块( mb 索引, 损失=损失, 全部后向=错误, 最后一次反向=错误, ) 避免在相同 rank 上的相邻阶段进行特殊情况的发送/接收操作 参见[注意:V 调度特殊情况] 如果 is_prev_stage_on_this_rank: 阶段索引到阶段[stage_idx - 1].设置本地反向输入( 阶段.获取局部反向输出(mb 索引), mb 索引 ) elif comp_type == 反向权重: 如果 阶段使用 fsdp: 断言非分片(stage_idx) backward_counter[stage_idx] += 1 阶段.向后权重一个块( mb 索引, 最后一次反向=backward_counter[stage_idx] == self._n_个微批次, ) 否则: 提升 ValueError(f"{行动=}未知或不受支持) 除了 异常 作为 e: 记录器.错误( "_PipelineScheduleRuntime 在步骤中捕获异常"%s运行操作时%s。完整计划:, time_step, 行动, ) # TODO(whc) 对于打印多行日志的最佳实践是什么? # 日志记录器会将其拆分为多行日志,但这使得阅读变得困难(太宽) 打印( _format_pipeline_order( self.通讯管道顺序, # type: ignore[arg-type] 错误步骤编号=time_step, ) ) 提升 e # 大多数这些操作应该早就完成了,但并没有一个明显等待它们的时间 len(send_ops): send_ops.弹出().等待() 断言 len(解碎片操作) == 0, "未使用的 unshard 操作" 如果传入容器,则返回损失 self.更新损失(self.阶段, 损失)
[文档] 调度循环 BFS(管道调度多): "" 广度优先管道并行 详细内容请见 https://arxiv.org/abs/2211.05953 与交错 1F1B 相似,循环 BFS 支持每个 rank 多个阶段 不同之处在于,当微批准备就绪时,多个本地 阶段,BFS 循环将优先处理早期阶段,一次性运行所有可用的微批。 微批。 "源代码" 定义 初始化( self, 阶段: 列表[_PipelineStageBase], 微批次数量: 整数, 损失函数: 可选[联盟[可调用, _Loss]] = , 输出合并规范: 可选[联盟[字典[字符串, 任意], 元组[任意]]] = , 梯度缩放: 布尔类型 = True, ): 超级().初始化( 阶段=阶段, 微批次数量=微批次数量, 损失函数=损失函数, 输出合并规范=输出合并规范, 梯度缩放=梯度缩放, ) # 1. 创建 pipeline_order(所有排名都进行此计算) # 用于跟踪整个管道的当前状态 # pipeline_order[rank] = [Action(计算类型, 微批处理索引, 阶段索引), ...] self.管道顺序: 字典[整数, 列表[可选[动作]]] = {} # ======================================================================== 排名 范围(self.算法组大小): 排序操作 = self._计算单个排名操作(排名) self.管道顺序[排名] = 排序操作 定义 _计算单个排名操作(self, 排名): 本地阶段数 = len(self.阶段) 阶段索引 = 范围( 排名, self.pp_group_size * 本地阶段数, self.pp_group_size ) 存储用于该等级的操作列表 预填充,等级从基于预热的无操作开始。 排名操作: 列表[可选[动作]] = [ _ 范围(排名] 阶段索引 阶段索引: 排名操作.扩展( 动作(阶段索引, 计算类型.前向, mb 索引) mb 索引 范围(self._n_个微批次) ) 等待第一次反向传播慢慢上升 每次跳跃 2 预热操作 = 2 * (self.pp_group_size - 1 - 排名) 排名操作.扩展([] * 预热操作) 阶段索引 反转(阶段索引): 排名操作.扩展( 动作(阶段索引, 计算类型.全部反向, mb 索引) mb 索引 反转(范围(self._n_个微批次)) ) 返回 排名操作
定义 获取_1f1b_rank_ops( 本地阶段数, 算法组大小, 预热操作, 前后操作, 冷却操作, 排名, 前进阶段索引, 向后阶段索引, num_1f1b_微批次=0, 启用零气泡=错误, ): 所有阶段均从处理微批 0 开始 前向阶段 MB 索引: 字典[整数, 整数] = defaultdict(整数) 后向阶段 MB 索引: 字典[整数, 整数] = defaultdict(整数) 重量阶段 MB 索引: 字典[整数, 整数] = defaultdict(整数) 存储用于该等级的操作列表 预填充,等级从基于预热的无操作开始。 排名操作: 列表[可选[动作]] = [ _ 范围(排名] 用于计算填充无操作槽位的数量,以补偿预热延迟 当我们想要等待反向传播慢慢返回并开始 1f1b 以对齐所有排名时 公式: 预填充 + 预热操作 + 预热后操作 = 首次反向传播的最早时间步 # post_warmup_ops = [第一个反向操作的最早时间步] - (warmup_ops + 预填充) # 第一个反向操作的最早时间步 = [local_stages * group_size + 2 * (group_size - 1 - rank)] # warmup_ops = 上文计算得出 预热操作 = ( 本地阶段数 * pp_group_size + 2 * (pp_group_size - 1 - 排名) ) - (预热操作 + 排名) 如果 启用零气泡: 预热操作 = pp_group_size - 排名 - 1 总操作数 = 预热操作 + 前向和后向操作 + 冷却操作 后向操作 ID 列表 = [] 权重操作次数 = 0 全部向后或向后输入 = ( 反向输入 如果 启用零气泡 否则 全部倒退 ) 操作符 范围(总操作数): # 预热阶段 如果 操作符 < 预热操作: 前向阶段索引 = 前进阶段索引(操作) 这将分配当前微批处理索引并更新它 前向阶段 MB 索引[前向阶段索引] = ( mb 索引 := 前向阶段 MB 索引[前向阶段索引] ) + 1 排名操作.追加( 动作(前向阶段索引, 计算类型.前向, mb 索引) ) 如果 操作符 == 预热操作 - 1: 这是预热阶段的最后一步,因此我们需要等待反向传播慢慢返回 排名操作.扩展([] * 预热操作) 1F1B 阶段(正向和反向) elif 预热操作 操作符 < 预热操作 + 前后操作: 前向阶段索引 = 前进阶段索引(操作) 前向阶段 MB 索引[前向阶段索引] = ( fwd_mb_index := 前向阶段 MB 索引[前向阶段索引] ) + 1 排名操作.追加( 动作(前向阶段索引, 计算类型.前向, fwd_mb_index) ) 反向阶段索引 = 向后阶段索引(操作) 后向阶段 MB 索引[逆向阶段索引] = ( bwd_mb_index := 后向阶段 MB 索引[逆向阶段索引] ) + 1 排名操作.追加( 动作(逆向阶段索引, 全部逆向或逆向输入, bwd_mb_index) ) 向后操作 ID.追加(操作) 如果 启用零气泡 操作符 - 预热操作 >= num_1f1b_微批次: 权重阶段索引 = 向后阶段索引( 向后操作 ID[权重操作数量] ) 重量阶段 MB 索引[重量阶段索引] = ( 重量 MB 索引 := 重量阶段 MB 索引[重量阶段索引] ) + 1 排名操作.追加( 动作( 重量阶段索引, 计算类型.反向权重, 重量 MB 索引, ) ) 权重操作次数 += 1 # 冷却阶段 否则: # 在冷却阶段,我们需要步骤与 1f1b 在其他等级发生的情况保持一致 # TODO: 完成后 1f1b,我们可以停止追加 None 如果 启用零气泡: 排名操作.追加() 反向阶段索引 = 向后阶段索引(操作) 后向阶段 MB 索引[逆向阶段索引] = ( bwd_mb_index := 后向阶段 MB 索引[bwd 阶段索引] ) + 1 排名操作.追加( 动作(逆向阶段索引, 全部逆向或逆向输入, bwd_mb_index) ) 向后操作 ID.追加(操作) 如果 启用零气泡 操作符 - 预热操作 >= num_1f1b_微批次: 权重阶段索引 = 向后阶段索引( 向后操作 ID[权重操作数量] ) 重量阶段 MB 索引[重量阶段索引] = ( 重量 MB 索引 := 重量阶段 MB 索引[重量阶段索引] ) + 1 排名操作.追加( 动作( 重量阶段索引, 计算类型.反向权重, 重量 MB 索引, ) ) 权重操作次数 += 1 while 启用零气泡 权重操作次数 < len(向后操作 ID): 权重阶段索引 = 向后阶段索引(向后操作 ID[权重操作次数]) 重量阶段 MB 索引[重量阶段索引] = ( 重量 MB 索引 := 重量阶段 MB 索引[重量阶段索引] ) + 1 排名操作.追加( 动作( 重量阶段索引, 计算类型.反向权重, 重量 MB 索引 ) ) 权重操作次数 += 1 返回 排序操作
[文档] 调度交错 1F1B(管道调度多): "" 交错 1F1B 调度方案。 详细内容请见 https://arxiv.org/pdf/2104.04473。 将在微批次上执行一次正向和一次反向操作。 状态支持每个等级的多个阶段。当微批次准备就绪时, 多个本地阶段时,交错 1F1B 优先考虑较早的微批次 (也称为“深度优先”)。 此调度与原始论文基本相似。 它的不同之处在于放宽了 num_microbatch % pp_size == 0 的要求。 使用 flex_pp 调度,我们将有 num_rounds = max(1, n_microbatches // pp_group_size),并且 只要 n_microbatches % num_rounds 是 0,它就能正常工作。以下是一些示例,支持 1. pp_group_size = 4,n_microbatches = 10。我们将有 num_rounds = 2,并且 n_microbatches % 2 是 0。 2. pp_group_size = 4, n_microbatches = 3。我们将有 num_rounds = 1,且 n_microbatches % 1 等于 0。 "源代码" def 初始化( 自身, 阶段: 列表[_PipelineStageBase], 微批次数量: 整数, 损失函数: 可选[可调用] = , 参数块规范: 可选[元组[张量块规范, ...]] = , kwargs_chunk_spec: 可选[字典[字符串, 张量块规范]] = , 输出合并规范: 可选[联盟[字典[字符串, 任意], 元组[任意]]] = , 梯度缩放: 布尔类型 = True, ): 自身.pp_group_size = 阶段[0].组大小 超级().初始化( 阶段=阶段, 微批次数量=微批次数量, 损失函数=损失函数, 参数块规范=参数块规范, kwargs_chunk_spec=kwargs_chunk_spec, 输出合并规范=输出合并规范, 梯度缩放=梯度缩放, ) 自身.本地阶段数 = len(阶段) 自身.排名 = 阶段[0].组排名 自身.轮数 = 最大值(1, n_微批次 // 自身.算法组大小) 自身.每轮的微批次数 = n_微批次 // 自身.轮数 如果 n_微批次 % 自身.轮数 != 0: 提升 ValueError( "交错 1F1B 需要微批次的数量是轮数的倍数" f轮数的倍数({自身.轮数}) f"但得到了"{微批次数量} ) # 1. 创建 pipeline_order(所有排名都进行此计算) # 用于跟踪整个管道的当前状态 # pipeline_order[rank] = [Action(计算类型, 微批处理索引, 阶段索引), ...] 自身.管道顺序: 字典[整数, 列表[可选[动作]]] = {} 排名 范围(自身.算法组大小): 排序操作 = 自身._计算单个排名操作(排名) 自身.管道顺序[排名] = 排序操作 def _计算单个排名操作(自身, 排名) -> 列表[可选[动作]] def 获取排名预热操作(排名): # 为最后阶段预热操作 预热操作最后阶段 = ( 自身.本地阶段数 - 1 ) * 自身.每轮的微批次数 # 每远离最后阶段一步,增加热身操作 2 个 乘数因子 = 2 预热操作 = 预热操作最后阶段 + 乘数因子 * ( (自身.pp_group_size - 1) - 排名 ) 我们不能有比微批次数更多的预热操作,所以在这里限制它 返回 最小(预热操作, 自身._n_微批次 * 自身.本地阶段数) 预热操作 = 获取排名预热操作(排名) 微批处理操作 = 自身.本地阶段数 * 自身._n_微批次 # fwd_bwd_ops 应包含剩余的前向操作 前向和后向操作 = 微批处理操作 - 预热操作 # 冷却操作应包括剩余的向后操作 冷却操作 = 微批处理操作 - 前向和后向操作 # 总操作包括正向和反向操作 总操作数 = 预热操作 + 前向和后向操作 + 冷却操作 # warmup_ops + fwd_bwd_ops * 2 + cooldown_ops == microbatch_ops * 2 记录器.调试( "排名"%s, warmup_ops %s,1f1b%s,冷却操作%s总操作数%s", 排名, 预热操作, 前后操作, 冷却操作, 总操作数, ) # 根据步数和 pp_group_size 计算阶段索引 def 前进阶段索引(步长): # 获取从 0 到 n_local_stages-1 的本地索引 本地索引 = (步骤 // 自身.每轮微批次数) % 自身.本地阶段数 返回 (本地索引 * 自身.算法组大小) + 排名 def 向后阶段索引(步长): 本地索引 = ( 自身.本地阶段数 - 1 - ((步骤 - 预热操作) // 自身.每轮微批次数) % 自身.本地阶段数 ) 返回 (本地索引 * 自身.算法组大小) + 排名 返回 获取_1f1b_rank_ops( 自身.本地阶段数, 自身.算法组大小, 预热操作, 前后操作, 冷却操作, 排名, 前进阶段索引, 向后阶段索引, )
[文档] 交错零气泡调度(管道调度多): "" 交错零气泡调度。 查看 https://arxiv.org/pdf/2401.10241 获取详细信息。 将执行对微批次的输入进行一次正向和一次反向操作 状态支持每个等级的多个阶段。使用反向填充权重 管道气泡。 特别是,这实现了论文中的 ZB1P 计划。 "源代码" def 初始化( 自身, 阶段: 列表[_PipelineStageBase], 微批次数量: 整数, 损失函数: 可选[可调用] = , 参数块规范: 可选[元组[张量块规范, ...]] = , kwargs_chunk_spec: 可选[字典[字符串, 张量块规范]] = , 输出合并规范: 可选[联盟[字典[字符串, 任意], 元组[任意]]] = , 梯度缩放: 布尔类型 = True, ): # TODO: 我们目前不支持使用 torch.compile 进行零气泡,所以 # 现在应该暂时禁用它 阶段 阶段: 如果 isinstance(阶段.子模块, 优化模块): 提升 运行时错误( "Zero Bubble 调度不支持使用 torch.compile 编译过的模块"\ pp_group_size ) 自身.pp_group_size = 阶段[0].组大小 超级().初始化( 阶段=阶段, 微批次数量=微批次数量, 损失函数=损失函数, 参数块规范=参数块规范, kwargs_chunk_spec=kwargs_chunk_spec, 输出合并规范=输出合并规范, 梯度缩放=梯度缩放, ) 自身.本地阶段数 = len(阶段) 自身.排名 = 阶段[0].组排名 自身.轮数 = 最大值(1, n_微批次 // 自身.算法组大小) 自身.每轮的微批次数 = n_微批次 // 自身.轮数 如果 n_微批次 % 自身.轮数 != 0: 提升 ValueError( 零气泡需要微批次的数量为轮数的倍数 f轮数的倍数({自身.轮数}) f"但得到了"{微批次数量} ) # 1. 创建 pipeline_order(所有排名都进行此计算) # 用于跟踪整个管道的当前状态 # pipeline_order[rank] = [Action(计算类型, 微批处理索引, 阶段索引), ...] 自身.管道顺序: 字典[整数, 列表[可选[动作]]] = {} 排名 范围(自身.算法组大小): 排序操作 = 自身._计算单个排名操作(排名) 自身.管道顺序[排名] = 排序操作 # 此函数根据动作的依赖关系向生成的日程表中添加气泡 注意,ZB1P 调度将不需要手动添加气泡 只有当 n_microbatches <= microbatches_per_round 时才有用 自身.流水线顺序 = 自身._add_bubbles_to_actions( 自身.本地阶段数 * 自身.算法组大小, ) def _计算单个排名操作(自身, 排名) -> 列表[可选[动作]] def 获取排名预热操作(排名): # 为最后阶段预热操作 预热操作最后阶段 = ( 自身.本地阶段数 - 1 ) * 自身.每轮的微批次数 # 每远离最后阶段一步,增加热身操作 2 个 乘数因子 = 1 预热操作 = 预热操作最后阶段 + 乘数因子 * ( (自身.pp_group_size - 1) - 排名 ) 我们不能有比微批次数更多的预热操作,所以在这里限制它 返回 最小(预热操作, 自身._n_微批次 * 自身.本地阶段数) 预热操作 = 获取排名预热操作(排名) 微批处理操作 = 自身.本地阶段数 * 自身._n_微批次 # fwd_bwd_ops 应包含剩余的前向操作 前向和后向操作 = 微批处理操作 - 预热操作 # 冷却操作应包括剩余的向后操作 冷却操作 = 微批处理操作 - 前向和后向操作 # 总操作包括正向和反向操作 总操作数 = 预热操作 + 前向和后向操作 + 冷却操作 # warmup_ops + fwd_bwd_ops * 2 + cooldown_ops == microbatch_ops * 2 记录器.调试( "排名"%s, warmup_ops %s,1f1b%s,冷却操作%s总操作数%s", 排名, 预热操作, 前后操作, 冷却操作, 总操作数, ) # 根据步数和 pp_group_size 计算阶段索引 def 前进阶段索引(步长): # 获取从 0 到 n_local_stages-1 的本地索引 本地索引 = (步骤 // 自身.每轮微批次数) % 自身.本地阶段数 返回 (本地索引 * 自身.算法组大小) + 排名 def 向后阶段索引(步长): 本地索引 = ( 自身.本地阶段数 - 1 - ((步骤 - 预热操作) // 自身.每轮微批次数) % 自身.本地阶段数 ) 返回 (本地索引 * 自身.算法组大小) + 排名 1f1b 微批次数量 = 排名 返回 获取_1f1b_rank_ops( 自身.本地阶段数, 自身.算法组大小, 预热操作, 前后操作, 冷却操作, 排名, 前进阶段索引, 向后阶段索引, num_1f1b_微批次, 启用零气泡=True, ) def _add_bubbles_to_actions(自身, 全局阶段数): 动作 = 自身.流水线顺序 def 需要气泡(阶段, 操作, 微批处理, 全局阶段数, 已见操作): 如果 操作符 == 计算类型.前向: 如果 阶段 != 0 (阶段 - 1, 操作, 微批处理) 已见操作: 返回 真实 elif 操作符 == 计算类型.全部反向: 如果 阶段 == 全局阶段数 - 1: 返回 (阶段, 计算类型.前向, 微批处理) 已见操作 返回 (阶段 + 1, 操作, 微批处理) 已见操作 返回 已见操作: 集合[元组[int, 计算类型, int]] = 集合() 结果: 字典[int, 列表[可选[动作]]] = {} 下一个指针: 字典[整数, 整数] = {} 气泡添加: 字典[整数, 整数] = {} 添加的气泡总数 = 0 排名 范围(自身.算法组大小): 结果[排名] = [] 下一个指针[排名] = 0 气泡添加[排名] = 0 True: 应该停止 = 真实 已见操作: 集合[元组[整数, 计算类型, 整数]] = 集合() 排名 范围(自身.算法组大小): 时间戳 = 下一个指针[排名] 如果 时间戳 >= len(动作[排名)] 继续 应该停止 = 如果 动作[排名] [时间戳] : 临时动作 = 动作[排名] [时间戳] 断言 临时动作 阶段索引, 操作, 微批处理 = 临时动作 如果 需要气泡( 阶段索引, 操作, 微批处理, 全局阶段数, 已见操作 ): 结果[排名].追加(动作[排名] [时间戳]) 如果 微批处理 : 已见操作.添加((阶段索引, 操作, 微批处理)) 下一个指针[排名] += 1 否则: 结果[排名].追加() 气泡添加[排名] += 1 否则: 下一个指针[排名] += 1 结果[排名].追加() 已见操作.更新(已见操作) 如果 应该停止: 断开 如果 添加的气泡总数 > 0: 记录器.警告( "非零气泡添加:总气泡添加数="%s气泡添加=%s", 总气泡添加数, 气泡添加, ) 返回 结果
[文档] ZBV 零气泡调度(管道调度多): "" 零气泡排程(ZBV 变体)。 详细内容请参阅 https://arxiv.org/pdf/2401.10241 第 6 节。 这个调度要求每个等级恰好有两个阶段。 此计划将对稳定微批次的输入执行一次正向和一次反向操作 状态支持每个等级的多个阶段。相对于权重使用反向填充 管道气泡。 这种 ZB-V 计划只有在时间正向等于时间反向、输入等于时间反向权重时才具有“零气泡”属性。 实际上,对于真实模型来说,这不太可能成立,因此可以另辟蹊径。 可以实现一个用于不等时/不平衡时间的贪婪调度器。 "源代码" def 初始化( 自身, 阶段: 列表[_PipelineStageBase], 微批次数量: 整数, 损失函数: 可选[可调用] = , 参数块规范: 可选[元组[张量块规范, ...]] = , kwargs_chunk_spec: 可选[字典[字符串, 张量块规范]] = , 输出合并规范: 可选[联盟[字典[字符串, 任意] 元组[任意]]] = , 梯度缩放: 布尔类型 = True, ): 自身.pp_group_size = 阶段[0].组大小 超级().初始化( 阶段=阶段, 微批次数量=微批次数量, 损失函数=损失函数, 参数块规范=参数块规范, kwargs_chunk_spec=kwargs_chunk_spec, 输出合并规范=输出合并规范, 梯度缩放=梯度缩放, ) 自身.阶段索引到组排名 = 生成阶段到排名映射( 自身.算法组大小, 自身._num_stages, 样式="v" ) 阶段 自身.阶段: 阶段.阶段索引到组排名 = 自身.阶段索引到组排名 自身.本地阶段数 = len(阶段) 如果 自身.本地阶段数 != 2: 提升 ValueError( "ZBV 每个等级需要恰好 2 个阶段,但得到了 " f"{自身.本地阶段数} ) 自身.排名 = 阶段[0].组排名 自身.阶段数量 = 阶段[0].阶段数量 # 1. 创建 pipeline_order(所有排名都进行此计算) # 用于跟踪整个管道的当前状态 # pipeline_order[rank] = [Action(计算类型, 微批处理索引, 阶段索引), ...] 自身.管道顺序: 字典[int, 列表[可选[动作]]] = {} 排名 范围(自身.算法组大小): 排序操作 = 自身._计算单个排名操作(排名) 自身.管道顺序[排名] = 排序操作 def _计算单个排名操作(自身, 排名) -> 列表[可选[动作]] 确保微批次的数量至少为 2 * self.pp_group_size - 1 以尽可能多的微批次数量来充分利用管道 n_micro = 最大值(2 * 自身.pp_group_size - 1, 自身._n_个微批次) 排名操作: 列表[可选[动作]] = [ _ 范围(排名] 阶段块 0 和块 1 的前向和反向动作计数 f0_cnt, f1_cnt, b0 计数, b1 计数 = 0, 0, 0, 0 预热阶段 warmup_n1 = 2 * (自身.pp_group_size - 排名) - 1 阶段 ID 块 0 = 排名 阶段 ID 块 1 = 自身.阶段数量 - 1 - 排名 _ 范围(预热_n1): 排名操作.追加( 动作(阶段 ID_块 0, 计算类型=F, 微批索引=f0_cnt) ) f0_cnt += 1 warmup_n2 = 排名 _ 范围(预热_n2): 排名操作.追加( 动作(阶段 ID 块 1, 计算类型=F, 微批索引=f1_cnt) ) f1_cnt += 1 排名操作.追加( 动作(阶段 ID_块 0, 计算类型=F, 微批索引=f0_cnt) ) f0_cnt += 1 预热_n3 = 自身.pp_group_size - 排名 _ 范围(预热_n3): 排名操作.追加( 动作(阶段 ID 块 1, 计算类型=F, 微批索引=f1_cnt) ) f1_cnt += 1 排名操作.追加( 动作(阶段 ID 块 1, 计算类型=I, 微批索引=b1 计数) ) 排名操作.追加( 动作(阶段 ID 块 1, 计算类型=W, 微批索引=b1 计数) ) b1 计数 += 1 # 稳定阶段 f1_cnt < f0_cnt f0_cnt < n_micro: 如果 f0_cnt < n_micro: 排名操作.追加( 动作( 阶段 ID_块 0, 计算类型=F, 微批索引=f0_cnt ) ) f0_cnt += 1 排名操作.追加( 动作(阶段 ID_块 0, 计算类型=I, 微批索引=b0 计数) ) 排名操作.追加( 动作(阶段 ID_块 0, 计算类型=W, 微批索引=b0 计数) ) b0_cnt += 1 排名操作.追加( 动作(阶段 ID 块 1, 计算类型=F, 微批索引=f1_cnt) ) f1_cnt += 1 排名操作.追加( 动作(阶段 ID 块 1, 计算类型=I, 微批索引=b1 计数) ) 排名操作.追加( 动作(阶段 ID 块 1, 计算类型=W, 微批索引=b1 计数) ) b1 计数 += 1 # 冷却阶段 w0 计数, w1 计数 = b0 计数, b1 计数 冷却_n1 = 排名 _ 范围(冷却_n1): 排名操作.追加( 动作(阶段 ID_块 0, 计算类型=I, 微批索引=b0 计数) ) b0_cnt += 1 排名操作.追加( 动作(阶段 ID 块 1, 计算类型=I, 微批索引=b1 计数) ) b1 计数 += 1 冷却_n2 = 自身.pp_group_size - 排名 _ 范围(冷却_n2): 排名操作.追加( 动作(阶段 ID_块 0, 计算类型=I, 微批索引=b0 计数) ) b0_cnt += 1 排名操作.追加( 动作(阶段 ID_块 0, 计算类型=W, 微批索引=w0 计数) ) w0_cnt += 1 w1 计数 < b1 计数: 排名操作.追加( 动作(阶段 ID 块 1, 计算类型=W, 微批索引=w1_cnt) ) w1 计数 += 1 w0_cnt < b0 计数: 排名操作.追加( 动作(阶段 ID_块 0, 计算类型=W, 微批索引=w0 计数) ) w0_cnt += 1 断言 w0_cnt == b0_cnt b0_cnt == f0_cnt 断言 w1 计数 == b1 计数 b1 计数 == f1_cnt 我们在上面的 n_micro 计算中使用了 max()函数,因此可能需要 移除冗余微批 排序操作 = [ ( 行动 如果 行动 行动.微批索引 行动.微批索引 < 自身._n_微批次 否则 ) 行动 排序操作 ] 返回 排名操作
def 获取课程表(调度名称: 字符串): "" 将课程名称(不区分大小写)映射到相应的类对象。 参数: 课程名称 (str):课程名称。 "源代码" 调度图 = { 1F1B: 调度 1F1B, "交错 1F1B": 调度交错 1F1B, "GPipe": 安排 GPipe, "LoopedBFS": 调度循环 BFS, "交错零气泡": 交错零气泡调度, "单条管道调度": PipelineScheduleSingle, "PipelineScheduleMulti": 管道调度多, ZBV 零气泡: ZBV 零气泡调度, } 小写键 = {k.小写(): k k 调度图.()} 小写调度名称 = 调度名称.小写() 如果 小写调度名称 小写键: 提升 ValueError( f"未知调度名称"{调度名称}'. 有效的选项包括{列表(调度图.())}" ) 返回 调度图[小写键[小写调度名称]] def 模拟通讯计算( 管道顺序, 阶段到排名: 可调用[[int] int] 阶段数: 整型 ): 此函数模拟从所有等级的角度执行计划中的操作,并标记 任何由缺失或顺序错误的信息引起的死锁。它还模拟了时间中的任何气泡,其中排名 无法执行任何操作,因为正在等待未满足的依赖项。模拟器步骤总数可用于 作为涉及 IR 优化传递(如重排序和合并)的单元测试的度量标准 模拟步骤。 模拟不是高保真度,不模拟计算和通信的重叠,或 CUDA 流。 未来工作可能包括增强这一点,并建模计算时间、通信重叠甚至内存。 "源代码" 流水线顺序 = { 排名: [a a 管道顺序[排名] 如果 a ] 排名 排序(管道顺序) } 日程: 字典[int, 列表[动作 | ]] = { 排名: [] 排名 排序(管道顺序) } _prev_ops_rank: 字典[int, 集合[动作]] = {排名: 集合() 排名 日程} def 添加到日程(排名: int, 行动: 可选[动作)] 日程[排名].追加(行动) 如果 行动 : _prev_ops_rank[排名].添加(行动) def _准备调度(行动: 可选[动作]) -> 布尔: 如果 行动 : 返回 真实 stage_idx = 行动.阶段索引 前操作 = _prev_ops_rank[阶段到排名(stage_idx] 如果 行动.计算类型 == F: 如果 行动.阶段索引 == 0: 返回 真实 elif ( 动作(行动.阶段索引, 接收_F, 行动.微批索引) 前操作 ): 返回 真实 elif ( 动作(行动.阶段索引 - 1, F, 行动.微批索引) 前操作 ): 返回 真实 返回 elif 行动.计算类型 (反向输入, 全部反向): 如果 行动.阶段索引 == 阶段数量 - 1: 返回 真实 如果 动作(行动.阶段索引, 接收_B, 行动.微批索引) 前操作: 返回 真实 如果 ( 动作(行动.阶段索引 + 1, 反向输入, 行动.微批索引) 前操作 ): 返回 真实 如果 ( 动作(行动.阶段索引 + 1, 全部反向, 行动.微批索引) 前操作 ): 返回 真实 返回 elif 行动.计算类型 == 反向权重: 返回 真实 elif 行动.计算类型 == 发送_F: 预期_F = 动作(行动.阶段索引, F, 行动.微批索引) 返回 预期_F 前操作 elif 行动.计算类型 == 接收_F: 对等阶段索引 = stage_idx - 1 预期发送 = 动作(peer_stage_idx, 发送_F, 行动.微批索引) 返回 预期发送 _prev_ops_rank[阶段到排名(peer_stage_idx] elif 行动.计算类型 == 发送_B: 预期_b = 动作( 行动.阶段索引, 反向输入, 行动.微批索引 ) 预期带宽 = 动作( 行动.阶段索引, 全部反向, 行动.微批索引 ) 返回 预期_b 前操作 预期带宽 前操作 elif 行动.计算类型 == 接收_B: 对等阶段索引 = stage_idx + 1 预期发送 = 动作(peer_stage_idx, 发送_B, 行动.微批索引) 返回 预期发送 _prev_ops_rank[阶段到排名(peer_stage_idx] 否则: 提升 ValueError(f不支持的操作类型{行动}") 管道顺序: 进度 = 排名 排序(管道顺序): 如果 len(管道顺序[排名]) == 0: 继续 行动 = 管道顺序[排名] [0] 如果 _准备调度(行动): 如果 行动 : 添加到日程(排名, 行动) 管道顺序[排名].弹出(0) 进度 = 真实 否则: 添加到日程(排名, ) i 排序(管道顺序, reverse=True): 如果 len(管道顺序[i]) == 0: 删除 管道顺序[i] # 稍显笨拙,但在此时间步替换任何 'none' 为实际动作,如果它被解除阻塞 后来的等级之一 排名 排序(管道顺序): 如果 len(管道顺序[排名]) == 0: 继续 如果 日程[排名] [-1] : 继续 行动 = 管道顺序[排名] [0] 如果 _准备调度(行动): 如果 行动 : 日程[排名] [-1] = 行动 _prev_ops_rank[排名].添加(行动) 管道顺序[排名].弹出(0) i 排序(管道顺序, reverse=True): 如果 len(管道顺序[i]) == 0: 删除 管道顺序[i] 如果 进度: 打印(WIP 通讯日程:输入文本翻译为简体中文为:\n", _format_pipeline_order(日程)) 排名 管道顺序: 打印(f"{排名=}下一个动作={管道顺序[排名] [0]}") 提升 ValueError(安排未按计划进行) 返回 _安排 def 导出 Chrome 追踪(调度, 文件名): "" 该函数将调度 IR 导出为 chrometrace 格式,以便进行可视化。 目前功能非常基础,仅作为将调度 IR 以文本形式导出的图形化替代方案。 未来工作可能扩展此功能,包括更精确的持续时间启发式算法,或允许用户输入持续时间, 添加“流事件”以让 UI 显示发送和接收之间的连接,并模拟 cuda 流用于通信/计算。 作为 chrometrace 视图上的独立流。 "源代码" 事件 = [] 排名 排序(调度): 时间步长, 行动 列举(调度[排名)] 如果 行动 : 继续 活动.追加( { 名称: 字符串(行动), "猫": ( 计算 如果 行动.计算类型 (F, B, W) 否则 通讯 ), ph: X, "进程 ID": 排名, tid: 排名, "ts": 时间步长, "dur": 1, } ) 导入 json 打开(文件名, w) 作为 f: json.导出({"traceEvents": 活动}, f)

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源