# mypy: 允许未类型化定义
版权所有(C)Meta Platforms,Inc. 及其关联公司
导入
记录日志
导入
操作符
来自 abc
导入 ABC,
抽象方法
来自
打字
导入
任何,
可调用,
角色,
可选,
联合
导入
火炬
导入 torch.distributed
是 dist
导入 torch.fx
是 fx
导入 torch.nn
是
神经网络
来自 torch._subclasses.fake_tensor
导入
模拟 Tensor
来自 torch.distributed.fsdp
导入
FSDP 模块,
全部分片
来自
torch.fx 节点
导入
参数,
地图聚合
来自 torch.nn.parallel
导入
分布式数据并行
来自 torch.utils._pytree
导入
树图模式
来自 ._backward
导入
后退阶段,
阶段后向输入,
阶段后退权重
来自
._调试
导入
map 调试信息
来自
_utils
导入
展平参数,
管道信息,
验证张量元数据
__all__ = [
管道阶段,
构建阶段,
]
日志记录器 =
记录日志.
获取日志记录器(__name__)
def _normalize_model_output_as_tuple(输出:
任何) ->
元组[
任何
]
"""[注意:管道模型输出类型]"""
模型传递给管道的输出可以是任何类型,由用户控制。
然而,有两个 API 界面使这个问题变得复杂。
(1) 中间阶段的输出通过 Send/Recv 操作传递给后续阶段。隐含的假设是每个输出元素都是一个张量。否则,Send/Recv 将不被支持。例外情况是模型的最后一层,它可以输出任何东西,而这些东西不会通过 Send/Recv 进行通信。
是的,每个输出元素都是一个张量。否则,Send/Recv 将不被支持。例外情况是模型的最后一层,它可以输出任何东西,而这些东西不会通过 Send/Recv 进行通信。
是的,每个输出元素都是一个张量。否则,Send/Recv 将不被支持。例外情况是模型的最后一层,它可以输出任何东西,而这些东西不会通过 Send/Recv 进行通信。
(2) 模型的最后一层的输出返回给用户,或者传递给损失函数。
损失函数可以以任何方式编写,使其输入与模型的输出相匹配。
如果我们可以严格指定包装模型的管道阶段的输出签名,那就方便了,
但我们不想对用户提供的模型施加不必要的约束。
目前,我们允许用户提供的模型从每个阶段返回张量或张量元组。由于
torch.export 导出跟踪,编译后的模型也可能返回一个列表而不是元组,我们将将其规范化回
元组用于一致性。
待办:我们是否应该更严格地断言阶段模块(中间和输出)只返回 Tensor
值?
"文档"
if 类型(
输出)
是
列表:
这是一个为了解决导出创建列表格式输出的临时解决方案
# output in list format
输出 =
元组(
输出)
将输出形式统一为元组,以便与 `act_send_info` 容易对应
# `act_send_info`
输出元组 =
输出 if
类型(
输出)
是
元组
否则 (
输出,)
返回
输出元组
类
根参数占位符:
""
模型级别输入的占位符。
"文档"
定义
初始化(
我,
张量):
我.
元数据 =
张量.
到(
元数据)
类
接收信息:
""
表示一个阶段输入。
"文档"
定义
初始化(
self,
输入名称:
字符串,
源:
整数,
缓冲区:
火炬.
张量,
):
# 此输入的名称
self.输入名称 =
输入名称
# 此输入源的阶段索引
self.源 =
源
缓冲区接收输入
self.缓冲区 =
缓冲区
定义 __repr__(self):
返回 f
"_RecvInfo(input="{self.
输入名称}
, 来源={self.
源}
, 形状={self.
缓冲区.
尺寸()})"
# 输入可以是接收到的激活或模型输入
输入信息 =
联盟[
接收信息,
根参数占位符]
定义
从元数据创建张量(
示例:
联盟[
PyTorch.
张量,
假 Tensor
],
设备:
PyTorch.
设备,
) -> PyTorch.
张量:
""
从一个张量创建一个真实张量。
"文档"
返回
PyTorch.
空的(
示例.
尺寸(),
数据类型=
示例.
数据类型,
布局=
示例.
布局,
设备=
设备,
)
类 _PipelineStageBase(ABC):
""
管道阶段的基类。
定义或实现 `_PipelineStage` 使用的常用方法
手动前端使用的跟踪前端和 `PipelineStage`。
"文档"
定义
初始化(
self,
子模块:
PyTorch.
神经网络.
模块,
stage_index: 整数,
阶段数量:
整数,
设备:
PyTorch.
设备,
群组:
可选[
距离.
流程组] =
无,
dw_builder: 可选[
可调用
[]
可调用[...,
无]]] =
无,
):
""
参数:
子模块(torch.nn.Module):在此阶段要执行模块。
stage_index(int):此阶段的索引。
num_stages(int):此管道中的阶段总数。
device (torch.device):运行此阶段的设备。
group(可选 dist.ProcessGroup):用于通信的进程组。
如果为 `None`,则使用默认进程组。
默认:`None`。
dw_builder(可选[Callable[[], Callable[..., None]]]):如果提供,dw_builder 是一个构建函数
该函数将构建一个新的 dw_runner 函数,用于运行模块反向过程中有意跳过的部分
构建函数必须在每个阶段运行模型反向后调用,并且阶段应保存最新的 dw_runner 以在权重传递(W)期间运行
模型反向,并且阶段应保存最新的 dw_runner 以在权重传递(W)期间运行
如果未提供,将通过遍历 autograd 图自动生成 dw_runner。
当与仅包含 F 和 B 步骤的调度一起使用时,将调用新的 dw_runner 函数作为 I(输入反向)的一部分。
当与 F、I、W 调度一起使用时,dw_runner 函数实现'W'。
"文档"
超级().
初始化()
如果
阶段索引 >=
阶段数量:
抛出 ValueError(
f"阶段索引"{stage_index}
超出了范围{
阶段数量}"
)
self.子模块 =
子模块
self.阶段索引 =
阶段索引
self.阶段数量 =
阶段数量
self.设备 =
设备
self.组 =
组
self.dw_builder = dw_builder
# 向后状态
self.backward_state: 字典[
整数,
元组[
任意, ...]] = {}
# 将 dw_runner 存储在每个 microbatch_id 中
self.dw_runner: 字典[
整数,
可调用[...,
无]] = {}
# `group_rank` 是在进程组 `group` 中的排名。
self.组排名 =
距离.
获取排名(self.
群组)
self.组大小 =
距离.
获取世界大小(
我.
群组)
如果
我.
组大小 >
我.
阶段数量:
raise 运行时错误(
f"管道组大小"{
我.
组大小}
不能大于阶段数量{
我.
阶段数量}"
)
运行时状态
我.
_输出元数据:
可选[
元组[
火炬.
张量, ...]] =
无
# 将微批处理 ID 映射到前向张量参数列表
我.
前向缓存:
字典[
整数,
元组[
任何,
列表[
火炬.
张量]]] = {}
将微批处理 ID 映射到反向梯度张量参数列表
我.bwd_cache:
字典[
整数,
元组[
可选[
火炬.
张量
], ...]] = {}
# 缓存块输出以进行最终输出合并或缩减
我.
输出块:
列表[
任何] = []
# 初始化 has_backward 为 false;如果 loss 函数传递给 pipeline schedule,则将其设置为 true
# 函数传递给 pipeline schedule
我.has_backward =
假
# 日志前缀
我.
日志前缀 = f
"[阶段"{
我.stage_index}]"
# 前向基础设施
我.args_recv_info:
字典[
整数,
元组[
输入信息, ...]] = {}
我.
发送信息操作:
字典[
整数,
列表] = {}
后向基础设施将懒加载创建
我.grad_recv_info:
字典 = {}
我.
毕业发送信息:
可选[
列表] =
无
将由调度器稍后填充
我.
数据块们:
可选[
整数] =
无
我.
阶段索引到组排名:
字典[
整数,
整数] = {
i: i % 我.
组大小
为 i
在
范围(
自身.
阶段数)
}
@property
def 有后向(
自身) ->
布尔:
""
该阶段是否有反向传播。
"文档"
返回
我.
是否有反向操作
@has_backward.设置器
定义 has_backward(
我, has_backward:
布尔):
我.
是否有反向操作 = has_backward
@property
定义 is_first(
我):
""
返回 true,如果这个阶段是管道中的第一个阶段。
"文档"
返回
我.
阶段索引 == 0
@property
定义
是最后一个(
我):
""
如果这个阶段是管道中的最后一个阶段,则返回 true。
"文档"
返回
我.
阶段索引 ==
我.
阶段数量 - 1
定义
检查块 ID(
我, chunk_id:
整数):
if 我.
数据块
是
无:
raise 运行时错误(
尝试在配置块之前访问 chunk_id。
)
if chunk_id >= 我.
数据块们:
raise 运行时错误(
f块 ID{chunk_id}
超出范围 [0,]{
我.
数据块们})"
)
定义
配置输出元数据(self,
输出元数据:
元组[
火炬.
张量, ...
)]
""
跟踪此阶段的输出形状/数据类型,因为它们决定了必须匹配的发送操作(们)
下一阶段的接收操作。下一阶段_将_基于其初始状态冻结其接收缓冲区。
配置,因此冻结/验证输出端也很重要,以避免任何发送/接收不匹配
这可能导致挂起、静默损坏或其他错误。
"文档"
断言 self._outputs_meta
是
无, (
"尝试重新配置 output_meta,这是不支持的"
)
self._outputs_meta = 元组(
输出元数据)
# 类型:忽略[赋值]
定义
获取输出元数据(self) ->
元组[
PyTorch.
张量, ...
]
获取表示此阶段输出的输出元数据(元张量)
断言 self._outputs_meta
是
非
无, (
尝试在未配置输出元数据的情况下获取_outputs_meta()
)
返回 self._outputs_meta
定义 _create_grad_send_info(
self,
args_recv_info: 元组,
) -> 列表[
可选[
整数
]]
""
创建要发送梯度的阶段索引列表。
"文档"
毕业发送信息:
列表[
可选[
整数]] = []
定义 map_recv_to_send(a):
# 注意:只要在之前阶段,我们就将梯度发送回前一个阶段
# forward 它是一个接收到的输入,无论它是否需要
# grad. 由前一阶段负责丢弃此梯度。
如果 isinstance(a,
接收信息):
毕业发送信息.
追加(a.
源)
返回 a.
源
else:
毕业发送信息.
追加(
无)
返回
无
map_aggregate(args_recv_info, map_recv_to_send)
记录器.
调试("%s
Grad 发送信息:%s", self.log_prefix,
毕业发送信息)
返回 grad_send_info
@abstractmethod
定义
准备前向基础设施(
self,
num_microbatches: 整数,
参数:
元组[
任意, ...
],
kwargs: 可选[
字典[
字符串,
任意]] =
无,
) -> 元组[
任意, ...
]
抛出
未实现异常
定义
_准备反向基础设施(self, num_microbatches:
整数):
# TODO:这是用于 backward_maybe_with_nosync 所必需的
self.数据块 =
微批次数
为
mb 索引
在
范围(num_microbatches):
`grad_recv_info` 是 `act_send_info` 的镜像
self.grad_recv_info[mb 索引] = self.
创建梯度接收信息(
self.发送信息操作
)
@abstractmethod
定义
创建梯度接收信息(
self,
发送信息操作:
字典,
) -> 元组[
接收信息, ...
]
抛出
未实现异常
定义
_获取接收操作(
self,
接收信息:
元组[
输入信息, ...
],
) -> 列表[
距离.
P2P 操作
]
""
`get_fwd_recv_ops` 和 `get_bwd_recv_ops` 共享的辅助函数。
返回与接收信息对应的操作列表。
"文档"
操作:
列表[
距离.
P2P 操作] = []
为
信息
在
接收信息:
如果
非 isinstance(
信息,
接收信息):
继续
伙伴排名 =
我.
阶段索引到组排名[
信息.
源]
伙伴全局排名 = (
伙伴排名
如果
我.
组
是
无
否则
距离.
获取全球排名(
我.
群组,
同侪排名)
)
操作.
追加(
距离.
P2P 流(
距离.
接收,
信息.
缓冲区,
全球排名,
我.
群组)
)
返回 ops
"[注意:V-排期特殊情况]"
V-排期存在一个特殊情况,即相邻的 stage_id 的 2 个阶段位于同一等级。
例如:2 个等级,4 个阶段形成一个简单的 V:
rank0:阶段 0 阶段 3
rank1: 阶段 1 阶段 2
阶段 0,1 和 2,3 使用 send/recv 正常通信,但阶段 1,2 不需要
使用通信操作。相反,它们应通过函数调用直接传递张量数据。
set_local_fwd_input 和 (get_local_bwd_output + set_local_bwd_input) 有助于这种优化,并且
应在管道调度适当的时间调用(在正向或反向执行之后)。
"文档"
定义
设置本地前向输入(
我,
前一阶段输出:
任何,
mb 索引:
整数) ->
无:
""
将'prev_stage_outputs'从同一 rank 的另一个阶段移动到本阶段的输入位置。避免
复制张量数据或使用 send/recv 操作。断开原始张量并设置 requires_grad。
张量可以作为 autograd 的叶子节点,在反向传播过程中可以从中收集梯度。
"文档"
接收信息:
元组[
输入信息, ...] =
我.args_recv_info[
mb 索引]
# 查看 [备注:管道模型输出类型]
前一阶段输出 = _normalize_model_output_as_tuple(
前一阶段输出)
为
信息,
张量
在 zip(
接收信息,
前一阶段输出):
断言 isinstance(
张量,
火炬.
张量), (
f预期从前一阶段输出的张量值,得到{
类型(
张量)}"
)
断言 isinstance(
信息,
接收信息), (
"仅在非第一阶段调用 set_local_Fwd_input,第一阶段应始终具有 RecvInfo"
)
# 这里我们不需要进行数据复制,因为我们可以直接从
# 一个阶段传递激活张量引用到下一个阶段。然而,我们需要将激活标记为叶子张量,因为它将
作为创建全新自动微分图的输入张量,不属于前一阶段的自动微分图的一部分。
TODO:确认,我们是否将此激活作为前一阶段的反向调用的根?脱离(detach)对此有何影响?
脱离(detach)对此有何影响?
信息.
缓冲区 =
张量.detach().
需要梯度_(True)
定义
获取局部反向输出(
我,
mb 索引):
“”
返回该阶段的输入梯度张量,这些张量对应于前向过程中的阶段输入。
"源代码"
断言 self.
有后向, (
无法在当前阶段没有反向时窃取_bwd_input
)
断言
非
我.is_first,
"如果这个阶段是第一个,则无法获取 bwd 输出"
我.
检查块 ID(
mb 索引)
返回
我.bwd_cache.
流行(
mb 索引)
定义
设置本地反向输入(
我,
下一个阶段的反向输出:
元组[
可选[
火炬.
张量
], ...
],
mb 索引:
整型
) -> 无:
""
将'grad input'张量从下一个阶段移动到当前阶段的'grad_output',避免复制或发送/接收。
不分离或设置 '_requires_grad'。
"文档"
断言 isinstance(
下一个阶段的反向输出,
元组), (
f预期得到元组,却得到了{
类型(
下一个阶段的反向输出)}"
)
断言
我.has_backward, (
无法设置反向输入,如果此阶段没有反向功能
)
断言
非
我.
是最后一个,
如果这个阶段是最后一个,则不能设置反向输入
接收信息 =
我.grad_recv_info[
mb 索引]
为
信息,
张量
在 zip(
接收信息,
下一个阶段的反向输出):
断言 isinstance(
张量,
火炬.
张量), (
f预期从前一阶段输出的张量值,得到{
类型(
张量)}"
)
断言 isinstance(
信息,
接收信息), (
f预期接收信息,却得到了{
类型(
信息)}"
)
信息.
缓冲区 =
张量
定义
获取前向接收操作(self,
前向块 ID:
整数) ->
列表[
距离.
P2P 流
]
""
返回接收输入参数所需的操作列表
。
"文档"
接收信息:
元组[
输入信息, ...] =
我.args_recv_info[
前向块 ID]
返回
我.
_获取接收操作(
接收信息)
定义 get_bwd_recv_ops(
我, bwd_chunk_id:
整数) ->
列表[
距离.
P2P 流
]
""
返回此阶段所需的接收梯度的操作列表
。
"文档"
如果
非
我.has_backward
或者
我.
是最后一个:
返回 []
接收信息 =
我.grad_recv_info[bwd_chunk_id]
返回
我.
_获取接收操作(
接收信息)
定义
获取前向发送操作(
我,
前向块 ID:
整数) ->
列表[
距离.
P2P 流
]
""
获取当前阶段的正向激活发送操作。
"文档"
输出 =
我.
输出块[
前向块 ID]
将输出形式统一为元组,以便与 `act_send_info` 容易对应
# `act_send_info`
输出元组 =
输出
如果
类型(
输出)
是
元组
否则 (
输出,)
操作:
列表[
距离.
P2P 流] = []
为
索引, out
在
列举(
输出元组):
目标阶段 =
我.
发送信息操作[
索引]
为 dst
在
目标阶段:
如果 dst
是
无:
继续
记录器.
调试(
"%s将张量发送到阶段%s: %s",
我.log_prefix,
目标,
输出.
尺寸(),
)
伙伴排名 =
我.
阶段索引到组排名[
目标]
伙伴全局排名 = (
伙伴排名
if 我.
组
是
无
否则
距离.
获取全球排名(
我.
群组,
同侪排名)
)
操作.
追加(
距离.
P2P 流(
距离.isend,
输出,
全球排名, self.
群组))
返回 ops
定义
获取反向发送操作(self, bwd_chunk_id:
整数) ->
列表[
距离.
P2P 操作
]
""
获取当前阶段的反向梯度发送操作。
"文档"
self.检查块 ID(bwd_chunk_id)
如果
非 self.has_backward
或者 self.is_first:
返回 []
懒惰地创建反向发送基础设施
如果 self.grad_send_info
是
无:
# 向反向过程中输入梯度发送信息:
# 对应输入梯度的目的地列表
# 如果输入没有梯度,则可以是 None
`grad_send_info` 是 `args_recv_info` 的镜像
self.grad_send_info = self._create_grad_send_info(我.args_recv_info[0])
操作:
列表[
距离.
P2P 操作] = []
梯度输入 =
我.bwd_cache.
弹出(bwd_chunk_id)
为
研究生, grad_recv_stage
在
压缩(
输入梯度,
我.
毕业发送信息):
如果 isinstance(
研究生,
PyTorch.
张量)
和 grad_recv_stage
是
非
无:
记录器.
调试(
"%s将梯度发送到阶段%s: %s",
我.log_prefix,
grad_recv_stage,
研究生.
尺寸(),
)
伙伴排名 =
我.
阶段索引到组排名[grad_recv_stage]
伙伴全局排名 = (
伙伴排名
如果
我.
组
是
无
否则
距离.
获取全球排名(
我.
群组,
同侪排名)
)
操作.
追加(
距离.
P2P 操作(
距离.isend,
研究生,
全球排名,
我.
群组))
否则:
如果
非 (
梯度
是
无
和 grad_recv_stage
是
无):
抛出
运行时错误(
f"[{self.stage_index}] 用于块{bwd_chunk_id}
有渐变{
研究生} "
f并期望将渐变发送到阶段{grad_recv_stage}"
)
返回 ops
def 清除运行时状态(
我) ->
无:
“”
清除舞台的运行状态。
"文档"
# 将微批处理 ID 映射到前向张量参数列表
我.
前向缓存.
清晰()
# 缓存块输出以进行最终输出合并或缩减
我.
输出块.
清晰()
清除计划步骤之间的输入缓冲区。这是因为在
`torch.autograd.backward() 将将梯度累积到叶子节点`
默认情况下为张量。为了使梯度能够回传到前面的阶段,我们
不希望有这种积累。
为
接收元组
在
我.args_recv_info.
值():
# 遍历所有块
为 a
在
接收元组:
遍历所有输入参数
如果 isinstance(a,
接收信息):
将其设置为 None 是清除梯度的新方法,比`zero_()`更推荐。
查看 https://github.com/pytorch/pytorch/pull/92731
a.缓冲区.
梯度 =
无
def _map_tensor_from_recv_info(
我,
接收信息:
元组[
输入信息, ...
],
):
“”
将接收信息中的张量映射到一个列表中。
"文档"
def 获取接收张量(
信息):
如果 isinstance(
信息,
接收信息):
返回
信息.
缓冲区
else:
提升
断言错误(f
预期 _RecvInfo 但得到{
类型(
信息)}")
返回 map_aggregate(
角色(
参数,
接收信息),
获取接收张量)
def _检索接收到的激活(
我,
前向块 ID:
整数):
""
在前向过程中检索当前阶段的接收到的激活。
"文档"
接收信息 = self.args_recv_info[
前向块 ID]
激活 =
我._map_tensor_from_recv_info(
接收信息)
返回
激活
def _检索接收梯度(
我,
bwd_chunk_id: 整数,
):
“”
在反向传播过程中检索当前阶段的接收到的梯度。
"文档"
接收信息 =
我.grad_recv_info[bwd_chunk_id]
梯度 =
我._map_tensor_from_recv_info(
接收信息)
返回
梯度
def 可能不同步转发(
我, *
参数, **kwargs):
# 如果子模块被 DDP 包装,我们使用`no_sync`上下文管理器
# 避免每个微批次的梯度全量归一化
如果 isinstance(
我.
子模块,
分布式数据并行):
与 self.
子模块.
不同步(): # type: ignore[operator]
输出值 =
我.
子模块(*
参数, **kwargs)
else:
输出值 =
我.
子模块(*
参数, **kwargs)
返回
输出值
def 梯度缩放(
我,
梯度缩放因子:
整数) ->
无:
将模型梯度按`grad_scale_factor`缩放,该因子应与`scale_factor`一起指定
损失函数用于流水线。对于执行'平均'损失减少的损失函数,`grad_scale_factor`
应设置为 num_microbatches。对于使用`sum`缩减的损失函数,`grad_scale_factor`应
设置为 1。
应该在每个管道调度步骤中只调用一次,在所有反向传递完成后。
""
# 仅对其自身的贡献(微批)进行 PP 缩放,但依赖于 DP 进行进一步缩放。
# 对于 DP 度。
如果
梯度缩放因子 != 1:
为 p
在
我.
子模块.
参数():
如果 p.
梯度
是
非
无:
p.研究生.div_(
梯度缩放因子)
def 可能反向,无同步(
我,
反向类型,
bwd 参数:
字典,
最后一次反向:
布尔类型 =
错误,
) -> 元组[
元组[
可选[
PyTorch.
张量
], ...
],
可选[
列表[
字典[
字符串,
任意]]]]:
""
无论使用 PP 与 FSDP 还是 DDP,在最后一个反向步骤中都有一些运行时差异
其他步骤。具体来说,我们需要在之前的步骤中累积梯度,并在最后一步中减少它们,
根据所使用的数据并行度,可能还有额外的状态变量和性能考虑因素。
此辅助函数应适应任何管道并行调度,以便与常见的/支持的数据并行库一起工作。
""
def 执行反向操作(
反向类型,
) -> 可调用[
[]
元组[
元组[
可选[
PyTorch.
张量
], ...
],
可选[
列表[
字典[
字符串,
任意]]]],
]
如果
反向类型 ==
全部:
返回 lambda: (
后退阶段(
bwd 参数[
阶段输出
],
bwd 参数[
输出梯度
],
bwd 参数[
输入值
],
),
无,
)
elif 反向类型 ==
输入:
返回 lambda:
阶段后向输入(
bwd 参数[
阶段输出
],
bwd 参数[
输出梯度
],
bwd 参数[
输入值
],
我.
子模块.
参数(),
)
elif 反向类型 ==
权重:
返回 lambda: (
阶段向后权重(
我.
子模块.
参数(),
bwd 参数[
"参数组"]
),
无,
)
else:
抛出
运行时错误(f
"未知反向类型:"{
反向类型}")
# 如果子模块被 DDP 包装
如果 isinstance(
我.
子模块,
分布式数据并行):
如果
最后一次反向:
# 最后的块,准备进行梯度缩减
# HACK:在这里深入到 DDP 实现细节。有更好的方法吗?
我.
子模块.
简化器.
准备反向操作( # type: ignore[union-attr, operator]
列表(
PyTorch.
神经网络.
并行.
分布式.
查找张量(
# 类型: 忽略[attr-defined]
bwd 参数[
阶段输出]
)
)
)
结果 =
执行反向操作(
反向类型)()
否则:
与
我.
子模块.
不同步(): # type: ignore[operator]
结果 =
执行反向操作(
反向类型)()
如果子模块是 FSDP 模块
elif isinstance(我.
子模块,
FSDP 模块):
我.
子模块.
设置为最后一个反向操作(
错误)
我.
子模块.
设置反向操作后重新分片(
错误)
我.
子模块.
设置需要梯度同步(
错误)
结果 =
执行反向操作(
反向类型)()
如果
最后一次反向:
# 手动调用 FSDP 的 post backward
def 运行后向回(
fsdp 模块:
FSDP 模块) ->
无:
fsdp 模块.
设置为最后一个反向操作(True)
fsdp 模块.
设置反向操作后重新分片(True)
fsdp 模块.
设置需要梯度同步(True)
fsdp 状态 =
全分片.
状态(
fsdp 模块)
# 类型: 忽略[attr-defined]
为
状态
在
fsdp 状态.
_状态上下文.
所有状态:
如果
状态.
_fsdp 参数组:
状态.
_fsdp 参数组.
反向传播后()
# 如果反向调用.backward 能够触发自动微分钩子,那就好多了。
# 那样 DDP/FSDP 等模块才会按预期工作。目前我们只能暂时解决这个问题,
# 我们还需要调用这个,以确保 FSDP 能够将梯度缩减操作同步回默认流。
fsdp 状态._root_post_backward_final_callback()
运行后向回(self.
子模块)
否则:
非 DP 子模块,常规反向
结果 =
执行反向操作(
反向类型)()
梯度,
参数组 =
结果
返回
梯度,
参数组
def 前进一个块(
我,
前向块 ID:
整数,
参数:
元组[
任意, ...
],
kwargs: 可选[
字典[
字符串,
任意]] =
无,
):
""
在该阶段使用一个微批处理执行前向传递。
`args` 和 `kwargs` 是外部输入到该阶段的参数。
截至 2024 年 9 月:
`- `args` 仅适用于第一阶段,其他阶段接收 args`
通过激活传输。
`- `kwargs` 可以通过各自的 `step` 调用传递给所有阶段。`
""
如果
我.is_first:
# 第一阶段无需接收任何内容
复合参数 = args
否则:
# 接收此块激活
# 激活以参数形式传入
复合参数 =
我.
_检索接收到的激活(
前向块 ID)
composite_kwargs = kwargs 或 {}
self._验证正向输入(
参数, kwargs)
计算正向
try:
输出 =
自身.
可能不同步转发(*
复合参数, **
组合关键字参数)
除了
异常
作为 e:
exc_msg = f""
{自身.log_prefix}
运行前向失败:
参数:{
map 调试信息(
复合参数)}
kwargs: 参数字典{
map 调试信息(
组合关键字参数)}
"源代码"
抛出
运行时错误(exc_msg)
来自 e
# 查看 [备注:管道模型输出类型]
输出元组 = _normalize_model_output_as_tuple(
输出)
准备最终输出合并或缩减
自身.
输出块.
追加(
输出)
保存激活和输入以供反向传播
平铺参数 =
展平参数(
复合参数)
平坦参数 =
展平参数(
组合关键字参数)
展平输入张量 =
平铺参数 +
平坦参数
自身.
前向缓存[
前向块 ID] = (
输出元组,
# 阶段输出
展平输入张量,
输入值
)
记录器.
调试(
"%s转发块%s
,输出:%s",
自身.log_prefix,
前向块 ID,
map 调试信息(
输出),
)
自身.
_验证前向输出(
输出元组)
# 返回原始用户提供的输出,不转换为元组格式。
# 查看 [备注:管道模型输出类型]
返回
输出
def 向后移动一个块(
self,
bwd_chunk_id: 整数,
损失=
无,
全部后向:
布尔类型 = True,
最后一次反向=
错误,
):
""
在模块上执行反向传播。
应该在每个微批次中只调用一次。
如果 full_backward 为 True(默认值),则将运行包括权重和输入梯度的完整反向传播,
调用 `backward_weight_one_chunk` 对于此 bwd_chunk_id 是错误的。
如果 full_backward 为 False,则在 __init__ 时向 PipelineStage 提供的 `dw_runner` 是可选的
需要随后调用 `backward_weight_one_chunk` 以调用 dw_runner 并完成反向操作。
last_backward 由调度和 DP 组间的梯度同步信号控制
在最后向后之后。
"源代码"
self.检查块 ID(bwd_chunk_id)
(
阶段输出,
输入值,
) = self.前向缓存.
弹出(bwd_chunk_id)
# 反向计算
如果 self.
是最后一个:
# 最后阶段从损失中计算梯度,没有来自下一阶段的梯度
# 下一阶段
bwd_kwargs = {
阶段输出:
损失,
输出梯度:
无,
输入值:
输入值,
}
否则:
# 否则,接收下一阶段的梯度
grads_output = self._检索接收梯度(bwd_chunk_id)
# 如果管道的输入需要梯度,
`torch.autograd.backward` 将梯度累加到这样的输入的 `.grad` 字段中
`.grad` 字段中
bwd_kwargs = {
阶段输出:
阶段输出,
输出梯度:
梯度输出,
输入值:
输入值,
}
输入梯度:
元组[
可选[
PyTorch.
张量
], ...] = ()
自定义反向函数
如果 self.dw_builder:
# TODO: 我们可能想要改变我们的语义,以便我们可以忽略
# 'dw_builder' 并在它是 full_backward 操作时直接调用 full_backward。
输入梯度, _ = self.
可能反向,无同步(
全部,
bwd 参数,
最后一次反向=
最后一次反向,
)
如果
全部后向:
self.dw_builder()()
否则:
self.dw_runner[bwd_chunk_id] = self.dw_builder()
否则:
如果
全部后向:
输入梯度, _ = self.
可能反向,无同步(
全部,
bwd 参数,
最后一次反向=
最后一个反向
)
否则:
参数组:
列表[
字典[
字符串,
任意]] |
无 =
无
跳过第一阶段的后向传播,因为我们将在 backward_weight_one_chunk 中执行权重更新
在 backward_weight_one_chunk 中执行 autograd.backward
如果
非 self.is_first:
如果 isinstance(
bwd 参数[
阶段输出
],
PyTorch.
张量):
bwd 参数[
阶段输出] = (
bwd 参数[
阶段输出],)
# 对输入执行自定义的逆向操作
# 当 "阶段输出" 是损失时,它是一个张量,否则它是一组张量
输入梯度,
参数组 = self.
可能反向,无同步(
输入,
bwd 参数,
最后一次反向=
最后一个反向
)
# TODO: 我们不需要保存这个,添加到 dw_runner 吗?
self.backward_state[bwd_chunk_id] = (
bwd 参数[
输入值
],
参数组,
bwd 参数[
阶段输出
],
bwd 参数[
输出梯度
],
)
保存 dw_runner 的占位符
self.dw_runner[bwd_chunk_id] = lambda: 无
self.bwd_cache[bwd_chunk_id] = 梯度输入
如果 self.
是最后一个
和
非 self.is_first:
# 自动微分依赖:
# rest_of_autograd_graph -> stage_output -> loss
# stage_output 在最后阶段不再使用,仅用于回溯
# 返回给用户以合并输出块
# 应将其分离以释放 autograd 图上下文并提前释放内存
为 t
在
阶段输出:
如果
非 t._is_view():
# 视图不可就地分离
t.断开连接()
记录器.
调试("%s
退回的块%s", self.log_prefix, bwd_chunk_id)
def 向后权重一个块(self, bwd_chunk_id:
整数,
最后一次反向=
错误):
断言 bwd_chunk_id
在 self.dw_runner, (
f"{self.log_prefix}尝试运行 backward_weight_one_chunk 为 chunk{bwd_chunk_id}"
"在没有先调用 `backward_one_chunk(full_backward=False)` 的情况下"
)
如果 self.dw_builder
是
非
无:
self.dw_runner.弹出(bwd_chunk_id)()
否则:
(
输入值,
参数组,
阶段输出,
输出梯度,
) = self.backward_state.弹出(bwd_chunk_id)
如果 self.
阶段索引 != 0:
bwd_kwargs = {
阶段输出:
阶段输出,
"参数组":
参数组,
}
self.可能反向,无同步(
权重,
bwd 参数,
最后一次反向=
最后一个反向
)
否则:
# TODO: 想出一个更好的方法来做这件事:
# 如果输入不需要梯度,
# 阶段反向输入期间参数组将无法完全捕获
# 在这种情况下,我们需要直接在参数上调用 grad
# 解决方案:让输入函数先进行交集计算,然后在 W 阶段完成
bwd_kwargs = {
阶段输出:
阶段输出,
输出梯度:
输出梯度,
输入值:
输入值,
}
self.可能反向,无同步(
全部,
bwd 参数,
最后一次反向=
最后一个反向
)
def _验证正向输入(self,
参数, kwargs):
如果输入参数/关键字参数的形状与该阶段配置的形状不匹配,则引发 RuntimeError 异常。
如果 self.is_first:
# TODO 为什么每个管道块都有一个单独的接收信息?
避免向此函数传递 `fwd_chunk_id`,我们
将所有块与 args_recv_info[0] 进行检查
预期参数 = self.args_recv_info[0]
否则:
# 假设它们不接受非 0 阶段,我们不检查输入
用户输入在标准管道场景中
返回
如果 len(kwargs):
# TODO-需要将 kwarg 映射到 self.args_recv_info 中的位置
没有它,我们无法 100%确定如何匹配 args 和
expected_args。
返回
# TODO-需要将 kwarg 映射到 self.args_recv_info 中的位置
# 可能无法判断长度不匹配是因为
# (a) 用户传递了多余的参数或遗漏了参数
# (b) 用户未传递具有默认值的 kwarg
预期张量元数据 = [
e.元数据
如果 isinstance(e,
根参数占位符)
否则 e.
缓冲区
为 e
在
预期参数
]
验证张量元数据(
f阶段{self.
阶段索引}
向前输入,
预期张量元数据, args
)
def _验证前向输出(self,
输出:
元组[
PyTorch.
张量, ...
)]
如果这个阶段产生了意外形状/数据类型的输出,将引发 RuntimeError。
这很可能是由于用户错误地指定了输出形状,或者是因为在原始模型上进行了形状推断,但在运行时模型被包装成了混合精度,从而改变了输出数据类型。
例如,形状推断是在原始模型上进行的,但在运行时模型被包装成了混合精度,这会改变输出数据类型。
这可能导致输出数据类型发生变化。
"源代码"
预期张量元数据 = self.
获取输出元数据()
验证张量元数据(
f阶段{self.
阶段索引}
前向输出,
预期张量元数据,
输出
)
类 _PipelineStage(_PipelineStageBase):
定义
初始化(
self,
阶段模块:
PyTorch.
神经网络.
模块,
阶段索引:
整数,
管道信息:
管道信息,
设备:
PyTorch.
设备,
群组:
可选[
距离.
流程组] =
无,
):
""
根据要包装的 stage_module 创建一个管道阶段
并描述管道阶段关系的 `pipe_info`
参数:
stage_module (torch.nn.Module):要被此阶段包装的模块
stage_index (int):此阶段在管道中的索引
pipe_info (PipeInfo):关于管道的信息,可以通过 `pipe.info()` 获取
device (torch.device):此阶段使用的设备
group (Optional[dist.ProcessGroup]):此阶段使用的进程组
"源代码"
_PipelineStageBase.初始化(
self,
阶段模块,
阶段索引,
管道信息.
阶段数,
设备,
群组,
)
self.管道信息 =
管道信息
在图中查找阶段节点
子模块节点 = [
节点
为
节点
在
管道信息.
图.
节点
如果 node.
操作符 ==
"调用模块"
]
如果 len(
子模块节点) != self.
阶段数:
提升
断言错误(
f管道图中子模块的数量{len(
子模块节点)}
阶段数量不匹配{self.
阶段数}"
)
在图中查找我的阶段节点
self.节点 =
子模块节点[self.
阶段索引]
self.名称 = self.node.
名称
记录器.
信息(
"[%s创建管道阶段%s
对于%s",
self.群组排名,
阶段索引,
self.名称,
)
将舞台名称映射到舞台索引
self.子模块到阶段索引:
字典[
字符串,
整数] = {}
为 i,
节点
在
列举(
子模块节点):
self.子模块到阶段索引.setdefault(node.
名称, i)
# 将子模块转换为设备
self._move_submod_to_device()
定义 _move_submod_to_device(self):
尝试将子模块移动到指定的设备上(如果可能)
注意:由于元张量不支持 to()方法,因此无法将元模块移动到真实设备上
在这种情况下,需要就地交换张量
。
具有元参数 =
任何(
isinstance(p, 假 Tensor)
或 p.
是否是元数据
为 p
在 self.
子模块.
参数()
)
如果
具有元参数:
记录器.
调试("%s
发现元参数!, self.log_prefix)
否则:
self.子模块.
到(self.
设备)
定义
准备前向基础设施(
self,
num_microbatches: 整数,
参数:
元组[
任意, ...
],
kwargs: 可选[
字典[
字符串,
任意]] =
无,
) -> 元组[
任意, ...
]
""
创建激活(在正向过程中)的发送/接收基础设施
"源代码"
# TODO(whc)
# 此方法一旦实现懒加载缓冲区分配后应删除
# 目前,它忽略 args/kwargs,因为它不应需要进行形状推断
为
数据块
在
范围(num_microbatches):
self.args_recv_info[数据块] = self.
创建活动接收信息()
每次激活时发送信息
self.发送信息操作 = self.
创建活动发送信息()
返回
元组()
定义
获取子模块的阶段索引(
self,
子模块名称:
字符串,
):
""
给定子模块名称,返回子模块的阶段索引。
"源代码"
如果
子模块名称
非
在 self.
子模块到阶段索引:
提升
断言错误(f
阶段 ID 为{
子模块名称}
未找到)
返回 self.
子模块到阶段索引[
子模块名称]
定义
创建活动接收信息(
self,
):
""
为阶段的输入创建一个 `_RecvInfo` 元组。
"源代码"
定义
创建接收张量(
占位符,
节点参数):
""
为占位符创建接收缓冲区。
"源代码"
示例值 =
占位符.
元数据[
值]
如果
节点参数.
操作符 ==
占位符:
# 这是一个根级占位符,因此是整个模型的输入参数。
# 我们可能处于 0 阶段,因此不需要创建接收缓冲区。
返回
根参数占位符(
示例值)
确定此输入的源阶段
当
节点参数.
目标
是
操作符.
获取项:
如果输入是 getitem,我们需要进一步深入
端节点 =
节点参数.
参数[0]
断言
节点参数.
操作符 ==
调用模块, (
f预期调用模块,却得到了{
节点参数.
操作}"
)
源阶段 = self.
获取子模块的阶段索引(arg_node.
名称)
为此占位符创建接收缓冲区
记录器.
调试(
"%s创建接收缓冲区为输入 '%s' : %s, %s",
自身.log_prefix,
占位符.
名称,
示例值.shape,
示例值.
数据类型,
)
缓冲区 =
从元数据创建张量(
示例值,
自身.
设备)
如果存在反向传播,则设置接收缓冲区的 requires_grad
# 在第一个箭头之前
如果
自身.
有后向:
缓冲区.
需要梯度_(True)
返回
接收信息(
节点参数.
名称,
源阶段,
缓冲区,
)
args_recv_info: 列表[
输入信息] = []
# 从 `self.submod`(一个 GraphModule)中过滤掉占位符节点
占位符 =
过滤( # type: ignore[var-annotated]
lambda node: node.操作符 ==
占位符, # type: ignore[arg-type]
self.子模块.
图.
节点,
忽略参数类型和属性联合
)
占位符是子模块内部的节点。
`self.node.args`是外部图中的依赖节点。
这两个是一对一的关系。
为
占位符,
端节点
在
压缩(
占位符,
自身.node.
参数):
为此占位符创建接收缓冲区
recv_info = 创建接收张量(
占位符,
节点参数)
args_recv_info.追加(
接收信息)
记录器.
调试(
"%s激活接收 / 参数信息:%s", self.log_prefix,
参数接收信息
)
`args` 是一个元组,因此我们将返回一个 Tuple[InputInfo]
返回
元组(args_recv_info)
def 查找目标排名(
自身,
用户: fx.
节点,
) -> 可选[
整数
]
""
找到`用户`节点的目标排名。
如果`用户`不是子模块,则可能返回`None`。
"源代码"
如果
用户.
操作符 ==
调用模块:
用户处于舞台(调用模块)
返回
自身.
获取子模块的阶段索引(
用户.
名称)
否则:
# - 如果用户.op == "output":
无需发送回 0 号排名
- 如果用户的目标是 stage_backward:
- 假设子模块输出已本地存储或
- 应该在激活检查点的情况下重新计算
返回
无
def 创建活动发送信息(
自身):
""
创建激活信息的字典。
该字典的形式为:
{
output_index: [dst_rank_0, dst_rank_1, ...],
...
}
其中`dst_rank`列表涵盖了输出值可能的情况。
可被多个阶段消耗。
"源代码"
# 输出索引:接收方进程列表
发送信息操作:
字典[
整数,
列表] = {}
out_idx = 0
为
用户
在
自身.node.
用户们:
如果
用户.
目标
是
操作符.
获取项:
递归查找实际目的地
gi_dsts = 发送信息操作.setdefault(out_idx,
[]
为 gi_user
在
用户.
用户们:
目标排名 =
自身.
查找目标排名(gi_user)
如果
目标排名
是
非
无:
目标 gi.
追加(
目标排名)
下一个`getitem`将指向下一个输出索引
out_idx += 1
否则:
如果只有一个输出值,则`out_idx`不会增加
dsts = 发送信息操作.setdefault(out_idx,
[]
目标排名 =
自身.
查找目标排名(
用户)
如果
目标排名
是
非
无:
目标排名列表.
追加(
目标排名)
输出节点 =
自身.
_获取输出节点()
输出值:
元组[
PyTorch.
张量] =
元组(
v.元数据[
值]
为 v
在
展平参数(
输出节点.
参数)
)
自身.
配置输出元数据(
输出值)
记录器.
调试("%s
发送信息:%s",
自身.log_prefix,
发送信息操作)
返回
发送信息操作
def _获取输出节点(
自身):
输出节点 = [
节点
为
节点
在
自身.
子模块.
图.
节点
如果 node.
操作符 ==
输出]
# 类型:忽略[联合属性]
断言 len(
输出节点) == 1
输出节点 =
输出节点[0]
返回
输出节点
def 创建梯度接收信息(
自身,
发送信息操作:
字典,
) -> 元组[
接收信息, ...
]
""
创建一个用于梯度的 `_RecvInfo` 元组。
"源代码"
# Dict[输出索引, _RecvInfo]
grad_recv_info: 字典[
整数,
接收信息] = {}
输出节点 =
自身.
_获取输出节点()
# 输出节点可能接受多个参数,意味着子模块有多个输出值。
输出值 =
展平参数(
输出节点.
参数)
为 out_idx,
目标列表
在
发送信息操作.
项目():
如果
非
目标列表:
没有实际接收器用于激活,因此没有返回梯度
继续
输出 =
输出值[out_idx]
示例值 =
输出.
元数据[
值]
记录器.
调试(
f"{自身.log_prefix}
创建输出接收缓冲区{
输出.
名称} "
# 无需注意:G004
f":"{
示例值.shape}, {
示例值.
数据类型}"
)
# TODO: 否则需要梯度累积
断言 len(
目标列表) == 1,
"跳过连接的反向尚未支持"
梯度源 =
目标列表[0]
grad_recv_info[out_idx] = 接收信息(
f"{源}",
# 无需注意:G004
源,
从元数据创建张量(
示例值,
自身.
设备),
)
将其转换为元组,以便在 get_ops 和检索张量时方便使用
grad_recv_info_tuple = 元组(grad_recv_info.
值())
记录器.
调试("%s Grad recv info: %s",
自身.log_prefix, grad_recv_info_tuple)
返回 grad_recv_info_tuple
基于追踪的管道信息创建管道阶段的辅助函数
[文档]def build_stage(
阶段模块: torch.nn.Module,
阶段索引: int,
pipe_info: 管道信息,
device: 火炬设备,
group: 可选的 dist.ProcessGroup = None,
) -> _PipelineStage:
```python
# 假设输入文本为:
input_text = '"""'
# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 这里应该调用真实的翻译 API 进行翻译
# 由于示例中不使用真实的 API,以下为模拟翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
创建一个由该阶段包装的阶段模块的管道阶段
管道信息。
参数:
stage_module (torch.nn.Module): 需要被此阶段包装的模块
stage_index (int): 此阶段在流水线中的索引
pipe_info (PipeInfo): 流水线信息,可以通过 `pipe.info()` 获取
device (torch.device): 此阶段使用的设备
group(可选[dist.ProcessGroup]):此阶段所使用的进程组
返回值:
_PipelineStage:一个可以与`PipelineSchedules`一起运行的管道阶段
"""
return _PipelineStage(
stage_module,
stage_index,
pipe_info,
设备,
组,
)
[文档]
类
管道阶段(_PipelineStageBase):
""
代表管道并行设置中管道阶段的类。
PipelineStage 假设模型进行顺序划分,即模型被分割成块,其中一块的输出馈入下一块的输入,没有跳过连接。
PipelineStage 会自动通过传播 stage0 的输出到后续阶段,执行运行时形状/数据类型推断。
PipelineStage 通过传播 stage0 的输出到后续阶段,自动执行运行时形状/数据类型推断。
阶段 1 以及类似,按线性顺序。要绕过形状推断,请将`input_args`和`output_args`传递给每个
PipelineStage 实例。
参数:
子模块(nn.Module):此阶段包装的 PyTorch 模块。
stage_index(int):此阶段的 ID。
num_stages (int): 阶段总数。
device (torch.device): 此阶段所在设备。
input_args (Union[torch.Tensor, Tuple[torch.tensor]], optional): 子模块的输入参数(可选)。
output_args (Union[torch.Tensor, Tuple[torch.tensor]], optional): 子模块的输出参数。
group (dist.ProcessGroup, optional): 分布式训练的进程组。如果为 None,则使用默认组。
dw_builder (Optional[Callable[[], Callable[..., None]]): 如果提供,dw_builder 将构建一个新的 dw_runner 函数
用于 F、I、W (Fwd、Input、Weight) 零气泡调度计划的 W 操作(输入权重)。
"源代码"
def 初始化(
自身,
子模块:
神经网络.
模块,
阶段索引:
整数,
阶段数:
整数,
设备:
PyTorch.
设备,
输入参数:
可选[
联盟[
PyTorch.
张量,
元组[
PyTorch.
张量, ...]]] =
无,
输出参数:
可选[
联盟[
PyTorch.
张量,
元组[
PyTorch.
张量, ...]]] =
无,
群组:
可选[
距离.
流程组] =
无,
dw_builder: 可选[
可调用
[]
可调用[...,
无]]] =
无,
):
超级().
初始化(
子模块,
阶段索引,
阶段数,
设备,
群组, dw_builder)
自身.
输入:
可选[
列表[
PyTorch.
张量]] =
无
自身.
输入元数据:
可选[
元组[
PyTorch.
张量, ...]] =
无
# 注意:输入和子模块理想情况下应在元设备上。我们决定(目前)不强制执行此要求。
# 可能会破坏现有用户。
如果
输入参数
是
无:
断言 output_args
是
无, (
"如果指定输出参数,则必须也指定输入参数。"
"否则,将在运行时执行形状推断"
)
否则:
自身.
输入元数据 = (
(输入参数,)
如果 isinstance(
输入参数,
PyTorch.
张量)
否则
输入参数
)
如果 output_args
是
无:
记录器.
警告(
"弃用警告:传递 input_args 和执行初始化时形状推断已被弃用。"
"PipelineStage 现在支持使用调度步骤()提供的实际输入进行运行时形状推断。"
"或者删除 `input_args` 参数以 `PipelineStage` 选择运行时形状推断,"
"或者额外传递 `output_args` 到 `PipelineStage` 以完全覆盖形状推断。"
)
try:
与
PyTorch.
不梯度():
output_args = 子模块(*
自身.
输入元数据)
output_args = 仅树映射(
PyTorch.
张量, lambda x: x.
到(
元数据), output_args
)
除了
异常
作为 e:
提升
运行时错误(
执行管道形状推理失败 - 您的输入是否与模块位于同一设备上?
) 来自 e
断言 output_args
是
非
无, (
如果传递 input_args,也请传递 output_args 以覆盖形状推断
)
自身.
配置输出元数据(
(输出参数,)
如果 isinstance(
输出参数,
PyTorch.
张量)
否则 output_args
)
这些是在反向发送/接收中使用的缓冲区,它们稍后分配
自身.
输出梯度:
列表[
PyTorch.
张量] = []
dbg_str = (
f管道阶段初始化完成,{
自身.
阶段索引=}, {
自身.is_first=}
,"
# 无需注意:G004
f"{自身.
是最后一个=}, {
自身.
阶段数=}
,"
)
如果
自身.
输入元数据
是
非
无:
dbg_str += (
f"inputs: "{[
输入.
形状
为
输入
在
自身.
输入元数据]}
,"
f"output: "{[
输出.
形状
为
输出
在
自身.
获取输出元数据
空括号]}"
)
否则:
dbg_str += "运行时进行形状推断"
记录器.
调试(dbg_str)
def 形状推理(
自身,
参数:
元组[
任意, ...
]
kwargs: 可选[
字典[
字符串,
任意]] =
无,
):
如果 kwargs
是
无:
kwargs = {}
断言 args
是
非
无,
"参数可能是一个空元组,但不能是 None"
# 如果我们是第一阶段,或者前一个阶段在同一 rank 上,则跳过 recv 通信
# 并且可以将输出形状作为 args 传入,而不是使用 send/recv。
如果 (
自身.
是第一个
如果非第一阶段,则检查前一阶段是否在同一等级
或
自身.
阶段索引到组排名[
自身.
阶段索引 - 1] ==
自身.
组排名
):
记录器.
调试(
形状推断:阶段%s
跳过接收,因为通过`args`传递了形状信息,
自身.
阶段索引,
)
args = 仅树映射(
PyTorch.
张量, lambda x: x.
到(
元数据),
参数)
否则:
断言 len(
参数) == 0, (
"无法为非第一阶段提供形状推理输入参数"
)
物体 = [
无]
记录器.
调试(
形状推断:阶段%s
接收来自阶段%s",
自身.
阶段索引,
自身.
阶段索引 - 1,
)
距离.
接收对象列表(
对象,
源=
距离.
获取全球排名(
自身.
组
或
距离.
分布式_c10d.
获取默认组(),
自身.
阶段索引到组排名[
自身.
阶段索引 - 1
]
),
群组=
自身.
群组,
设备=
自身.
设备,
)
recv_args = 对象[0]
断言 isinstance(recv_args,
元组),
类型(recv_args)
args = recv_args
# 缓存输入形状,用于 recv 缓冲区分配期间
自身.
输入元数据 = args
args = 仅树映射(
PyTorch.
张量, lambda x:
PyTorch.
与...相同形状的零(x,
设备=
自身.
设备), args
)
设置所需的属性以进行正向操作
与
PyTorch.
不梯度():
输出 =
自身.
子模块(*
参数, **kwargs)
如果是单个张量,则转换使其始终为列表
如果 isinstance(
输出,
PyTorch.
张量):
输出 = [
输出]
传达元输出而不是实际输出有两个原因
1 - 它更快(尤其是由于对象收集序列化张量数据!)
# 2 - 避免在接收端反序列化时为 src rank 激活 CUDA 上下文!
输出元数据 =
元组(
仅树映射(
PyTorch.
张量, lambda x: x.
到(
元数据),
输出)
)
记录器.
调试(
形状推断:阶段%s
输入%s
,输出%s",
自身.
阶段索引,
自身.
输入元数据,
输出元数据,
)
自身.
配置输出元数据(
输出元数据)
# 将输出传递到下一阶段:
# 两种情况
# 1. 通常:使用 send/recv 通信传递输出
# 2. 特殊情况:对于 V 调度,2 个相邻的阶段(例如 8 阶段 4 排 V 中的阶段 3、4)
# 通过返回值和函数参数传递形状信息,而不是发送/接收。
如果 (
自身.
是最后一个
# 如果不是最后一个阶段,则检查下一个阶段是否在同一排
或
自身.
阶段索引到组排名[
自身.
阶段索引 + 1] ==
自身.
组排名
):
# 上述情况(2):通过返回值传递形状信息,调用者将其作为参数传递给下一个阶段的
# _shape_inference 调用
记录器.
调试(
形状推断:阶段%s
跳过发送到下一阶段,
自身.
阶段索引,
)
否则:
# 情况(1):通过发送操作发送形状,并确保不返回给调用者
记录器.
调试(
形状推断:阶段%s
发送到阶段%s",
自身.
阶段索引,
自身.
阶段索引 + 1,
)
距离.
发送对象列表(
[输出元数据
]
目标=
距离.
获取全球排名(
自身.
组
或
距离.
分布式_c10d.
获取默认组(),
自身.
阶段索引到组排名[
自身.
阶段索引 + 1
]
),
群组=
自身.
群组,
设备=
自身.
设备,
)
输出元数据 =
元组()
返回
输出元数据
def 准备前向基础设施(
自身,
num_microbatches: int,
参数:
元组[
任意, ...
]
kwargs: 可选[
字典[
字符串,
任意]] =
无,
) -> 元组[
任意, ...
]
# TODO 将 self.device 从 step API 的输入张量移动到一个参数中?
断言
微批次数
是
非
无,
"TODO 修复 num_microbatches"
输出:
元组[
任意, ...] =
元组()
如果
自身.
输入元数据
是
无:
输出 =
自身.
形状推理(
参数, kwargs)
断言
自身.
输入元数据
是
非
无
# 接收前向信息
# TODO: 懒惰地创建 args_recv_info?(PipelineStage 同样需要)
为 chunk_id
在
范围(num_microbatches):
如果
非
自身.is_first:
# 我们假设总是从阶段 - 1 接收
接收信息 =
元组(
[
接收信息(
frecv_for_{
自身.
阶段索引}_from_{
自身.
阶段索引 - 1}",
自身.
阶段索引 - 1,
从元数据创建张量(
输入,
自身.
设备),
)
为
输入
在
自身.
输入元数据
]
)
如果存在反向传播,则设置接收缓冲区的 requires_grad
如果
自身.
有后向:
为 r
在
接收信息:
r.缓冲区.
需要梯度_(True)
自身.args_recv_info[chunk_id] =
接收信息
否则:
自身.args_recv_info[chunk_id] =
元组(
[根参数占位符(i)
为 i
在
自身.
输入元数据]
)
每次激活时发送信息
# 只需要发送的排名
自身.
发送信息操作:
字典[int,
列表] = {}
为
索引
在
范围(len(
自身.
获取输出元数据())):
# 假设我们总是发送到下一阶段 + 1
如果
非
自身.
是最后一个:
自身.
发送信息操作[
索引] = [
自身.
阶段索引 + 1]
否则:
自身.
发送信息操作[
索引] = []
返回
输出
def 创建梯度接收信息(
自身,
发送信息操作:
字典,
) -> 元组[
接收信息, ...
]
grad_recv_info: 元组[
接收信息, ...] = ()
如果
非
自身.
是最后一个:
不支持从多个来源接收梯度
因此我们只取第一个目的地
grad_recv_info = 元组(
[
接收信息(
frecv_grad_for_{
自身.
阶段索引}_from_{
目标列表[0]}",
目标列表[0
]
从元数据创建张量(
自身.
获取输出元数据()[
索引
]
自身.
设备
),
)
为
索引,
目标列表
在
发送信息操作.
项目()
]
)
返回 grad_recv_info