# mypy: 允许未类型化定义
版权所有(C)Meta Platforms,Inc. 及其关联公司
导入
记录日志
来自
打字
导入
任何,
可选
导入
火炬
来自
torch.fx 节点
导入
地图聚合
来自 torch.utils._pytree
导入
树形展平, tree_unflatten
全部 = [
TensorChunkSpec,
split_args_kwargs_into_chunks,
合并块,
]
记录器 =
记录.
获取日志记录器(__name__)
""
_debug_mask_minibatches 指定发送掩码版本的 mini-batch
通过而不是微批切片--这可以用于更稳定的
数值测试(参见[关于正确性测试的说明])
""
_debug_mask_minibatches = 假
类
自定义 Reducer:
""
可用于指定自定义操作的定制化 reducer 类
将多个微批次的损失合并成一个值。
示例:
>>> # xdoctest: +SKIP
>>> sum_reducer = _CustomReducer(
>>> torch.tensor(0.0),
>>> lambda a, b: a + b
>>> )
"文档"
定义
初始化(
我,
初始化值,
reduce 函数):
我.
初始化值 =
初始化值
我.
reduce 函数 =
reduce 函数
类
_损失 Reducer(
自定义 Reducer):
通过
sum_reducer = _损失 Reducer(
火炬.
张量(0.0), lambda a, b: a + b)
# 默认分块维度为 0。这用于用户未指定的情况
未指定分块维度
DEFAULT_CHUNK_DIM = 0
[文档]
类
张量块规范:
""
用于指定输入分块的类
"文档"
定义
初始化(
我,
分割维度):
我.
分割维度 =
分割维度
分割维度:
整型
定义 __repr__(
我):
返回 (
f"{我.
类.
__模块__}.{
我.
类.__name__}({
我.
分割维度})"
)
定义 __str__(
我):
返回 f
TensorChunkSpec({
我.
分割维度})"
@staticmethod
定义
从元组中(
块维度:
元组[
整数, ...
],
):
""
一个从块元组创建`TensorChunkSpec`元组的辅助工具
维度(整数)。
示例:
>>> # xdoctest: +SKIP
模型有三个位置参数
>>> # 我们沿着维度 0, 0 和 1 分别进行分块
>>> args_chunk_spec = TensorChunkSpec.from_tuple((0, 0, 1))
"文档"
args_chunk_spec = map_aggregate(
块维度,
lambda 暗淡:
张量块规范(
暗淡), # type: ignore[arg-type,return-value]
)
返回 args_chunk_spec
@staticmethod
定义 from_dict(
块维度:
字典[
字符串,
整数
],
):
""
一个从`TensorChunkSpec`创建字典的辅助工具
字典块尺寸(整数)。
示例:
>>> # xdoctest: +SKIP
>>> # 块维度 0 用于 "id" 参数,1 用于 "mask" 参数
>>> kwargs_chunk_spec = TensorChunkSpec.from_dict({"id": 0, "mask": 1})
"文档"
kwargs_chunk_spec = map_aggregate(
块维度,
lambda 暗淡:
张量块规范(
暗淡), # type: ignore[arg-type,return-value]
)
返回 kwargs_chunk_spec
# 用于指定输入复制的类
类 _Replicate:
通过
定义 _shard_dict_of_args(
参数字典,
参数块规范,
num_chunks,
):
""
给定一个参数字典和一个分块规范字典,根据分块规范对参数进行分片。
根据分块规范对参数进行分片。
参数:
args_dict: 参数字典
args_chunk_spec: 分块规范字典
num_chunks: 将参数分块的数量
返回:
args_split: 分片后的参数列表
"文档"
# 阶段 1+2:扁平化和分片/复制
# args_sharded_replicated : [参数数量,扁平值数量,分片数量]
args_sharded_replicated = {}
arg_specs = []
实际块数 =
num_chunks: 分块数量
首个张量 =
真实
断言 len(
参数字典) == len(
参数块规范), (
fargs_dict.keys() ={
列表(
参数字典.
键())} args_chunk_spec.keys() = {
列表(
参数块规范.
键())}"
)
为 arg_key,
参数
在
参数字典.
项目():
flat, 规范 =
树形展平(arg)
参数规范.
追加(
规格)
chunk_spec = 参数块规范[arg_key]
断言 chunk_spec
是
非
无
# 应由调用者设置
chunk_spec_flat, _ = 树形展平(
块规范)
如果 len(
平坦) != len(chunk_spec_flat):
raise ValueError(
f"参数值"{arg}
"没有相同的“数量”
f"与块规范一样多的“值”"{
块规范}"
)
sharded_arg_flat = []
为 v, chunk_v
在 zip(
平面, chunk_spec_flat):
如果 chunk_v
是 _Replicate
或者
非 isinstance(v,
火炬.
张量):
分片参数平面.
追加([v] *
实际块数)
elif isinstance(块_v,
张量块规范):
# TODO: 检查 v 的类型。如果是张量,则使用 chunk(或调试掩码)。
# If it's a collection type, split it as you would expect. Otherwise,
# 抛出错误
断言 isinstance(v,
火炬.
张量), f"{v}
不是张量
v_split_dim_size = v.尺寸(
块_v.
分割维度)
如果 v_split_dim_size <
实际块数:
如果 first_tensor:
我们只能在遇到这个时调整块的数量
在遇到第一个张量时出现的问题
日志记录器.
警告(
f"在分块维度上,张量的大小是"{v_split_dim_size}
,"
# 无需注意:G004
f"减少块的数量从"{num_chunks}
到{v_split_dim_size}
。
)
实际块数 = v_split_dim_size
else:
raise 运行时错误(
f"Arg"{arg_key}
在分块维度的大小为{v_split_dim_size}
,"
f"比块的数量少"{num_chunks}
.
"PiPPy 无法减少块的数量,因为"
"其他参数的块维度大小更大。"
"请调整您的 num_chunks 设置。"
)
chunk_tensors = 火炬.
张量分割(
v, 实际块数,
块_v.
分割维度
)
如果
_调试掩码小批量:
扩展块 = []
分割维度索引 = 0
为
块张量
在
块张量:
新值 =
火炬.
与...相同形状的零(v)
上索引 =
分割维度索引 +
块张量.
尺寸(
块_v.
分割维度)
切片索引 = [
切片(
无,
无,
无)] *
新值.
维数
切片索引[
块_v.
分割维度] =
切片(
分割维度索引,
上索引
)
新值[
切片索引] =
块张量
扩展块.
追加(
新值)
分割维度索引 +=
块张量.
尺寸(
块_v.
分割维度)
分片参数平面.
追加(
扩展块)
else:
分片参数平面.
追加(
块张量) # type: ignore[arg-type]
首个张量 =
假
else:
raise 类型错误(f
"无法识别的块规范:"{
块_v}")
分片复制参数[arg_key] = sharded_arg_flat
# chunks_flat : [num chunks, num args, num flat values]
chunks_flat = []
为 chunk_idx
在
范围(
实际块数):
块参数 = {}
为
键,
参数
在
分片复制参数.
项目():
单块参数 = [
平面[chunk_idx]
为
平面
在 arg]
chunk_args[键] =
单块参数
chunks_flat.追加(chunk_args)
# args_split : [分块数, 参数数]
args_split = []
为
数据块
在 chunks_flat:
每块参数 = {}
断言 len(
参数规范) == len(
数据块)
为 (
键, arg),
参数规范
在 zip(
数据块.
项目(),
参数规范):
每块参数[
键] =
树形展开(arg,
参数规范)
参数分割.
追加(
每块参数)
返回 args_split
[文档]
定义
将参数和关键字参数分割成块(
参数:
元组[
任何, ...
],
kwargs: 可选[
字典[
字符串,
任何]],
数据块们:
整数,
参数块规范:
可选[
元组[
张量块规范, ...]] =
无,
kwargs_chunk_spec: 可选[
字典[
字符串,
张量块规范]] =
无,
) -> 元组[
列表[
元组
],
列表[
字典
]]
""
给定一个参数和关键字参数的序列,根据各自的分块规范将它们分成多个块。
根据各自的分块规范,将参数序列分成多个块。
参数:
args:参数元组
kwargs:关键字参数字典
chunks:将 args 和 kwargs 分割成块的数量
args_chunk_spec:args 的分割规范,形状与 args 相同
kwargs_chunk_spec: kwargs 的 chunking 规范,与 kwargs 形状相同
返回:
args_split: 分片后的参数列表
kwargs_split: 分片后的 kwargs 列表
"文档"
# 给定`args`和`kwargs`,我们希望生成一组`chunks` args 和 kwargs,使得
# 构成张量的值已根据 `args_chunk_spec` 进行了分片/复制
# 和 `kwargs_chunk_spec` 规范。步骤如下:
#
# 1. 使用 pytree.tree_flatten 将每个参数及其规范扁平化为一个一维数组。
使用一个运行示例:假设我们的输入看起来像
#
# 参数 = ([A, [B, C]], D) 参数规范 = ([None, [None, TensorChunkSpec]], None)
# (kwargs 未显示,但过程类似)
#
# 然后,这一步我们将得到
#
# 参数 = ([A, B, C], D) 参数规范 = ([None, None, TensorChunkSpec], None)
#
# 2. 根据规范中的策略复制或复制参数。假设 chunks = 2
#
# args = ([[A, A], [B, B], [C_1, C_2]], [D, D])
#
# 3. 旋转嵌套顺序,使 chunks 成为外部维度
#
# args_chunks = [
([A, B, C_1], D),
([A, B, C_2], D),
# ]
#
4. 按照规范展开每个块
#
# args_chunks = [
# ([A, [B, C_1]], D),
# ([A, [B, C_2]], D),
# ]
_debug_mask_minibatches 的调试
处理 kwargs 为 None 的情况
如果 kwargs
是
无:
kwargs = {}
如果用户没有提供 args_chunk_spec 或 kwargs_chunk_spec,我们将扩展它们的格式并使用默认的沿维度 0 的分块
如果用户没有提供 args_chunk_spec 或 kwargs_chunk_spec,我们将扩展它们的格式并使用默认的沿维度 0 的分块
如果 args_chunk_spec
是
无:
args_chunk_spec = (张量块规范(
默认块维度),) * len(
参数)
如果 kwargs_chunk_spec
是
无:
kwargs_chunk_spec = 字典.fromkeys(kwargs,
张量块规范(
默认块维度))
args_split_dict = _shard_dict_of_args(
字典(
列举(
参数)),
字典(
列举(
参数块规范)),
数据块们,
)
实际块数 = len(
参数分割字典)
关键字参数分割 = _shard_dict_of_args(
kwargs,
kwargs_chunk_spec,
实际块数,
)
如果 len(
关键字参数分割) <
实际块数:
当 kwargs 被分成更少的块时
例如,当`args`没有 tensor,只有值时
实际块数 = len(
关键字参数分割)
# 重新分片参数
args_split_dict = _shard_dict_of_args(
字典(
列举(
参数)),
字典(
列举(
参数块规范)),
实际块数,
)
如果 len(
参数分割字典) != len(
关键字参数分割):
raise 运行时错误(
"参数和关键字参数被分割成不同的块数:"
f"{len(参数分割字典)}, {len(
关键字参数分割)}"
)
args_split = [
元组(chunk_args[i]
为 i
在
范围(len(chunk_args)))
为
块参数
在 args_split_dict
]
返回
参数分割,
关键字参数分割
[文档]
定义
合并块(
数据块们:
列表[
任何
],
块规范,
):
""
给定一系列块,根据它们合并成一个单一值
该块规范。
参数:
块:块列表
块规范:块分块规范
返回:
值:合并值
"文档"
# 这实际上是 `split_args_kwargs_into_chunks` 的逆操作,所以
步骤与该函数中的步骤类似,但顺序相反。给定输入值:
输入值:
#
# chunks = [
# ([A, [B, C_1]], D),
# ([A, [B, C_2]], D),
# ]
# args_spec = ([None, [None, TensorChunkSpec]], None)
#
# 1. 根据 chunk_spec 展开块
#
# chunks_flat = [
([A, B, C_1], D),
([A, B, C_2], D),
# ]
#
# 2. 旋转嵌套顺序,使块成为内部维度
#
# 内部值 = ([A, B, [C_1, C_2]], D)
#
# 3. 连接分片参数
#
# 组合值 = ([A, B, C], D)
#
# 4. 根据规范展开合并的参数
#
# 值 = ([A, [B, C]], D)
# 预处理:展开块规范
如果 chunk_spec
是
非
无:
spec_flattened, 展平规范 =
树形展平(
块规范)
else:
# 如果未提供 chunk_spec,则将所有输出字段沿默认维度(0)合并
通过展开块 0,我们获得输出结构并生成块规范
chunk0_flat, 展平规范 =
树形展平(
数据块们[0
]\)
规范展开 = [
张量块规范(
默认块维度)] * len(chunk0_flat)
# 阶段 1:展平块
# chunks_flattened : [块数量, 参数数量]
chunks_flattened = []
为
数据块
在
数据块们:
分块展开, _ =
树形展平(
数据块)
如果 len(
分块展开) != len(spec_flattened):
raise ValueError(f"块{
数据块}
与块规范不匹配{
块规范}")
分块展开.
追加(
分块展开)
# 阶段 2 和 3:旋转嵌套顺序,使块成为内部维度
# 连接分片操作数
# args_flattened : [参数数量]
args_flattened = []
为
索引参数,
参数
在
列举(spec_flattened):
如果 isinstance(arg,
张量块规范):
部分值 = [
分块展开[chunk_idx
]
[索引参数]
为 chunk_idx
在
范围(len(
分块展开))
]
如果
_调试掩码小批量:
再次运行 `tensor_split` 以推断单个块的大小
整体形状 =
部分值[0].
形状
为 val
在
部分值[1
]
断言 val.
形状 ==
整体形状
元数据块 =
火炬.
张量分割(
火炬.
空的(*
整体形状,
设备=
元数据),
部分=len(
部分值),
暗淡=arg.
分割维度,
)
要合并的值 = []
块起始索引 = 0
断言 len(
部分值) == len(
元数据块)
为
部分值,
元数据片段
在 zip(
部分值,
元数据块):
块结束索引 =
块起始索引 +
元数据块.
尺寸(arg.
分割维度)
切片索引 = [
切片(
无,
无,
无)] *
部分值.
维数
切片索引[arg.
分割维度] =
切片(
块起始索引,
块结束索引)
切片 =
部分值[
切片索引]
要分类的值.
追加(
切片)
块起始索引 =
块结束索引
else:
要合并的值 =
部分值
展开的参数.
追加(
火炬.
猫(
要分类的值,
暗淡=arg.
分割维度))
elif isinstance(arg, 自定义 Reducer):
reduced_val = arg.初始化值
为 chunk_idx
在
范围(len(
分块展开)):
reduced_val = arg.reduce 函数(
减少后的值,
分块展开[chunk_idx
]
[索引参数]
)
展开的参数.
追加(
减少后的值)
else:
value = 分块展开[0
]
[索引参数]
为 chunk_idx
在
范围(1, len(
分块展开)):
断言
分块展开[chunk_idx
]
[索引参数] == value
展开的参数.
追加(
值)
# 阶段 4:展开合并的参数
返回
树形展开(
展开的参数,
展平规范)