快捷键

torch.distributed.pipelining.microbatch 的源代码

# mypy: 允许未类型化定义
版权所有(C)Meta Platforms,Inc. 及其关联公司
导入 记录日志
来自 打字 导入 任何, 可选

导入 火炬
来自 torch.fx 节点 导入 地图聚合
来自 torch.utils._pytree 导入 树形展平, tree_unflatten


全部 = [
    TensorChunkSpec,
    split_args_kwargs_into_chunks,
    合并块,
]

记录器 = 记录.获取日志记录器(__name__)

""
_debug_mask_minibatches 指定发送掩码版本的 mini-batch
通过而不是微批切片--这可以用于更稳定的
数值测试(参见[关于正确性测试的说明])
""
_debug_mask_minibatches = 


 自定义 Reducer:
    ""
可用于指定自定义操作的定制化 reducer 类
将多个微批次的损失合并成一个值。

示例:
    >>> # xdoctest: +SKIP
    >>> sum_reducer = _CustomReducer(
    >>>     torch.tensor(0.0),
    >>>     lambda a, b: a + b
    >>> )
"文档"

    定义 初始化(, 初始化值, reduce 函数):
        .初始化值 = 初始化值
        .reduce 函数 = reduce 函数


 _损失 Reducer(自定义 Reducer):
    通过


sum_reducer = _损失 Reducer(火炬.张量(0.0), lambda a, b: a + b)

# 默认分块维度为 0。这用于用户未指定的情况
未指定分块维度
DEFAULT_CHUNK_DIM = 0


[文档] 张量块规范: "" 用于指定输入分块的类 "文档" 定义 初始化(, 分割维度): .分割维度 = 分割维度 分割维度: 整型 定义 __repr__(): 返回 ( f"{..__模块__}.{..__name__}({.分割维度})" ) 定义 __str__(): 返回 fTensorChunkSpec({.分割维度})" @staticmethod 定义 从元组中( 块维度: 元组[整数, ...], ): "" 一个从块元组创建`TensorChunkSpec`元组的辅助工具 维度(整数)。 示例: >>> # xdoctest: +SKIP 模型有三个位置参数 >>> # 我们沿着维度 0, 0 和 1 分别进行分块 >>> args_chunk_spec = TensorChunkSpec.from_tuple((0, 0, 1)) "文档" args_chunk_spec = map_aggregate( 块维度, lambda 暗淡: 张量块规范(暗淡), # type: ignore[arg-type,return-value] ) 返回 args_chunk_spec @staticmethod 定义 from_dict( 块维度: 字典[字符串, 整数], ): "" 一个从`TensorChunkSpec`创建字典的辅助工具 字典块尺寸(整数)。 示例: >>> # xdoctest: +SKIP >>> # 块维度 0 用于 "id" 参数,1 用于 "mask" 参数 >>> kwargs_chunk_spec = TensorChunkSpec.from_dict({"id": 0, "mask": 1}) "文档" kwargs_chunk_spec = map_aggregate( 块维度, lambda 暗淡: 张量块规范(暗淡), # type: ignore[arg-type,return-value] ) 返回 kwargs_chunk_spec
# 用于指定输入复制的类 _Replicate: 通过 定义 _shard_dict_of_args( 参数字典, 参数块规范, num_chunks, ): "" 给定一个参数字典和一个分块规范字典,根据分块规范对参数进行分片。 根据分块规范对参数进行分片。 参数: args_dict: 参数字典 args_chunk_spec: 分块规范字典 num_chunks: 将参数分块的数量 返回: args_split: 分片后的参数列表 "文档" # 阶段 1+2:扁平化和分片/复制 # args_sharded_replicated : [参数数量,扁平值数量,分片数量] args_sharded_replicated = {} arg_specs = [] 实际块数 = num_chunks: 分块数量 首个张量 = 真实 断言 len(参数字典) == len(参数块规范), ( fargs_dict.keys() ={列表(参数字典.())} args_chunk_spec.keys() = {列表(参数块规范.())}" ) arg_key, 参数 参数字典.项目(): flat, 规范 = 树形展平(arg) 参数规范.追加(规格) chunk_spec = 参数块规范[arg_key] 断言 chunk_spec # 应由调用者设置 chunk_spec_flat, _ = 树形展平(块规范) 如果 len(平坦) != len(chunk_spec_flat): raise ValueError( f"参数值"{arg}"没有相同的“数量” f"与块规范一样多的“值”"{块规范}" ) sharded_arg_flat = [] v, chunk_v zip(平面, chunk_spec_flat): 如果 chunk_v _Replicate 或者 isinstance(v, 火炬.张量): 分片参数平面.追加([v] * 实际块数) elif isinstance(块_v, 张量块规范): # TODO: 检查 v 的类型。如果是张量,则使用 chunk(或调试掩码)。 # If it's a collection type, split it as you would expect. Otherwise, # 抛出错误 断言 isinstance(v, 火炬.张量), f"{v}不是张量 v_split_dim_size = v.尺寸(块_v.分割维度) 如果 v_split_dim_size < 实际块数: 如果 first_tensor: 我们只能在遇到这个时调整块的数量 在遇到第一个张量时出现的问题 日志记录器.警告( f"在分块维度上,张量的大小是"{v_split_dim_size}," # 无需注意:G004 f"减少块的数量从"{num_chunks}{v_split_dim_size} ) 实际块数 = v_split_dim_size else: raise 运行时错误( f"Arg"{arg_key}在分块维度的大小为{v_split_dim_size}," f"比块的数量少"{num_chunks}. "PiPPy 无法减少块的数量,因为" "其他参数的块维度大小更大。" "请调整您的 num_chunks 设置。" ) chunk_tensors = 火炬.张量分割( v, 实际块数, 块_v.分割维度 ) 如果 _调试掩码小批量: 扩展块 = [] 分割维度索引 = 0 块张量 块张量: 新值 = 火炬.与...相同形状的零(v) 上索引 = 分割维度索引 + 块张量.尺寸(块_v.分割维度) 切片索引 = [切片(, , )] * 新值.维数 切片索引[块_v.分割维度] = 切片( 分割维度索引, 上索引 ) 新值[切片索引] = 块张量 扩展块.追加(新值) 分割维度索引 += 块张量.尺寸(块_v.分割维度) 分片参数平面.追加(扩展块) else: 分片参数平面.追加(块张量) # type: ignore[arg-type] 首个张量 = else: raise 类型错误(f"无法识别的块规范:"{块_v}") 分片复制参数[arg_key] = sharded_arg_flat # chunks_flat : [num chunks, num args, num flat values] chunks_flat = [] chunk_idx 范围(实际块数): 块参数 = {} , 参数 分片复制参数.项目(): 单块参数 = [平面[chunk_idx] 平面 arg] chunk_args[] = 单块参数 chunks_flat.追加(chunk_args) # args_split : [分块数, 参数数] args_split = [] 数据块 chunks_flat: 每块参数 = {} 断言 len(参数规范) == len(数据块) (, arg), 参数规范 zip(数据块.项目(), 参数规范): 每块参数[] = 树形展开(arg, 参数规范) 参数分割.追加(每块参数) 返回 args_split
[文档]定义 将参数和关键字参数分割成块( 参数: 元组[任何, ...], kwargs: 可选[字典[字符串, 任何]], 数据块们: 整数, 参数块规范: 可选[元组[张量块规范, ...]] = , kwargs_chunk_spec: 可选[字典[字符串, 张量块规范]] = , ) -> 元组[列表[元组], 列表[字典]] "" 给定一个参数和关键字参数的序列,根据各自的分块规范将它们分成多个块。 根据各自的分块规范,将参数序列分成多个块。 参数: args:参数元组 kwargs:关键字参数字典 chunks:将 args 和 kwargs 分割成块的数量 args_chunk_spec:args 的分割规范,形状与 args 相同 kwargs_chunk_spec: kwargs 的 chunking 规范,与 kwargs 形状相同 返回: args_split: 分片后的参数列表 kwargs_split: 分片后的 kwargs 列表 "文档" # 给定`args`和`kwargs`,我们希望生成一组`chunks` args 和 kwargs,使得 # 构成张量的值已根据 `args_chunk_spec` 进行了分片/复制 # 和 `kwargs_chunk_spec` 规范。步骤如下: # # 1. 使用 pytree.tree_flatten 将每个参数及其规范扁平化为一个一维数组。 使用一个运行示例:假设我们的输入看起来像 # # 参数 = ([A, [B, C]], D) 参数规范 = ([None, [None, TensorChunkSpec]], None) # (kwargs 未显示,但过程类似) # # 然后,这一步我们将得到 # # 参数 = ([A, B, C], D) 参数规范 = ([None, None, TensorChunkSpec], None) # # 2. 根据规范中的策略复制或复制参数。假设 chunks = 2 # # args = ([[A, A], [B, B], [C_1, C_2]], [D, D]) # # 3. 旋转嵌套顺序,使 chunks 成为外部维度 # # args_chunks = [ ([A, B, C_1], D), ([A, B, C_2], D), # ] # 4. 按照规范展开每个块 # # args_chunks = [ # ([A, [B, C_1]], D), # ([A, [B, C_2]], D), # ] _debug_mask_minibatches 的调试 处理 kwargs 为 None 的情况 如果 kwargs : kwargs = {} 如果用户没有提供 args_chunk_spec 或 kwargs_chunk_spec,我们将扩展它们的格式并使用默认的沿维度 0 的分块 如果用户没有提供 args_chunk_spec 或 kwargs_chunk_spec,我们将扩展它们的格式并使用默认的沿维度 0 的分块 如果 args_chunk_spec : args_chunk_spec = (张量块规范(默认块维度),) * len(参数) 如果 kwargs_chunk_spec : kwargs_chunk_spec = 字典.fromkeys(kwargs, 张量块规范(默认块维度)) args_split_dict = _shard_dict_of_args( 字典(列举(参数)), 字典(列举(参数块规范)), 数据块们, ) 实际块数 = len(参数分割字典) 关键字参数分割 = _shard_dict_of_args( kwargs, kwargs_chunk_spec, 实际块数, ) 如果 len(关键字参数分割) < 实际块数: 当 kwargs 被分成更少的块时 例如,当`args`没有 tensor,只有值时 实际块数 = len(关键字参数分割) # 重新分片参数 args_split_dict = _shard_dict_of_args( 字典(列举(参数)), 字典(列举(参数块规范)), 实际块数, ) 如果 len(参数分割字典) != len(关键字参数分割): raise 运行时错误( "参数和关键字参数被分割成不同的块数:" f"{len(参数分割字典)}, {len(关键字参数分割)}" ) args_split = [ 元组(chunk_args[i] i 范围(len(chunk_args))) 块参数 args_split_dict ] 返回 参数分割, 关键字参数分割
[文档]定义 合并块( 数据块们: 列表[任何], 块规范, ): "" 给定一系列块,根据它们合并成一个单一值 该块规范。 参数: 块:块列表 块规范:块分块规范 返回: 值:合并值 "文档" # 这实际上是 `split_args_kwargs_into_chunks` 的逆操作,所以 步骤与该函数中的步骤类似,但顺序相反。给定输入值: 输入值: # # chunks = [ # ([A, [B, C_1]], D), # ([A, [B, C_2]], D), # ] # args_spec = ([None, [None, TensorChunkSpec]], None) # # 1. 根据 chunk_spec 展开块 # # chunks_flat = [ ([A, B, C_1], D), ([A, B, C_2], D), # ] # # 2. 旋转嵌套顺序,使块成为内部维度 # # 内部值 = ([A, B, [C_1, C_2]], D) # # 3. 连接分片参数 # # 组合值 = ([A, B, C], D) # # 4. 根据规范展开合并的参数 # # 值 = ([A, [B, C]], D) # 预处理:展开块规范 如果 chunk_spec : spec_flattened, 展平规范 = 树形展平(块规范) else: # 如果未提供 chunk_spec,则将所有输出字段沿默认维度(0)合并 通过展开块 0,我们获得输出结构并生成块规范 chunk0_flat, 展平规范 = 树形展平(数据块们[0]\) 规范展开 = [张量块规范(默认块维度)] * len(chunk0_flat) # 阶段 1:展平块 # chunks_flattened : [块数量, 参数数量] chunks_flattened = [] 数据块 数据块们: 分块展开, _ = 树形展平(数据块) 如果 len(分块展开) != len(spec_flattened): raise ValueError(f"块{数据块}与块规范不匹配{块规范}") 分块展开.追加(分块展开) # 阶段 2 和 3:旋转嵌套顺序,使块成为内部维度 # 连接分片操作数 # args_flattened : [参数数量] args_flattened = [] 索引参数, 参数 列举(spec_flattened): 如果 isinstance(arg, 张量块规范): 部分值 = [ 分块展开[chunk_idx] [索引参数] chunk_idx 范围(len(分块展开)) ] 如果 _调试掩码小批量: 再次运行 `tensor_split` 以推断单个块的大小 整体形状 = 部分值[0].形状 val 部分值[1] 断言 val.形状 == 整体形状 元数据块 = 火炬.张量分割( 火炬.空的(*整体形状, 设备=元数据), 部分=len(部分值), 暗淡=arg.分割维度, ) 要合并的值 = [] 块起始索引 = 0 断言 len(部分值) == len(元数据块) 部分值, 元数据片段 zip(部分值, 元数据块): 块结束索引 = 块起始索引 + 元数据块.尺寸(arg.分割维度) 切片索引 = [切片(, , )] * 部分值.维数 切片索引[arg.分割维度] = 切片(块起始索引, 块结束索引) 切片 = 部分值[切片索引] 要分类的值.追加(切片) 块起始索引 = 块结束索引 else: 要合并的值 = 部分值 展开的参数.追加(火炬.(要分类的值, 暗淡=arg.分割维度)) elif isinstance(arg, 自定义 Reducer): reduced_val = arg.初始化值 chunk_idx 范围(len(分块展开)): reduced_val = arg.reduce 函数( 减少后的值, 分块展开[chunk_idx] [索引参数] ) 展开的参数.追加(减少后的值) else: value = 分块展开[0] [索引参数] chunk_idx 范围(1, len(分块展开)): 断言 分块展开[chunk_idx] [索引参数] == value 展开的参数.追加() # 阶段 4:展开合并的参数 返回 树形展开(展开的参数, 展平规范)

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源