快捷键

torch.distributed.checkpoint.default_planner 的源代码

# mypy: 允许未类型化定义
版权所有(C)Meta Platforms,Inc. 及其关联公司

导入 复制
导入 dataclasses
导入 输入/输出
导入 记录日志
导入 操作符
from 集合 导入 链式映射
from functools 导入 reduce
from 打字 导入 任何, 角色, 可选, 联合

导入 火炬
from torch.distributed._shard._utils 导入 通过索引缩小张量
from torch.distributed.checkpoint._dedup_save_plans 导入 去重保存计划
from torch.distributed.checkpoint._nested_dict 导入 (
    平铺映射,
    平铺状态字典,
)
from torch.distributed.checkpoint._sharded_tensor_utils 导入 _flatten_sharded_tensors
from torch.distributed.checkpoint._traverse 导入 set_element
from torch.distributed.checkpoint.metadata 导入 (
    字节存储元数据,
    块存储元数据,
    元数据,
    元数据索引,
    STATE_DICT_TYPE,
    STORAGE_TYPES,
    StorageMeta,
    TensorStorageMetadata,
)
from torch.distributed.checkpoint.planner 导入 (
    加载计划,
    加载规划器,
    读取项目,
    保存计划 SavePlan,
    保存规划器,
    编写项目,
    编写项目类型,
)
from torch.distributed.checkpoint.planner_helpers 导入 (
    _compare_save_plans,
    _创建默认元数据计划,
    _创建读取项,
    _创建写入项,
    _初始化状态字典,
    合并本地计划差异,
)
from torch.distributed.checkpoint.utils 导入 查找状态字典对象
来自 torch.distributed.tensor 导入 DTensor

from . 导入 _version


日志记录器: 记录.日志记录器 = 记录.获取日志记录器(__name__)


全部 = [
    "默认保存计划器",
    "默认加载规划器",
    "创建默认本地加载计划",
    "创建默认全局加载计划",
    "创建默认本地保存计划",
    "创建默认全局保存计划",
]


# TODO: 更新 default_planner.py 的文档字符串
[文档] 默认保存规划器(保存规划器): 映射: 平铺映射 定义 初始化( , 展平状态字典: 布尔 = 是的, 展平分片张量: 布尔 = 是的, 去重复制张量: 可选[布尔] = , 去重并保存到最低等级: 布尔 = 错误, 启用计划缓存: 布尔 = 错误, ) -> : .展平状态字典 = 展平状态字典 .展平分片张量 = 展平分片张量 .映射 = {} .去重并保存到最低秩 = 去重保存到最低排名 如果 dedup_replicated_tensors 去重复制张量 : 日志记录器.警告( 默认保存规划器的 `dedup_replicated_tensors` 参数正在被 弃用,并且不再有任何效果。请移除此参数 "您的来电。" ) ._cached_plans_key: 字符串 = ..__name__ ._启用计划缓存 = 启用计划缓存 def 设置计划器( , 状态字典: 状态字典类型, 存储元数据: 可选[StorageMeta] = , 是否为协调器: 布尔 = 假的, ) -> : if .展平状态字典: 状态字典, .映射 = 展平状态字典(状态字典) if .展平分片张量: 状态字典 = __展平分片张量(状态字典) .状态字典 = 状态字典 .是否是协调器 = 是否是协调器 def 创建本地计划() -> 保存计划 SavePlan: 计划 = 创建默认本地保存计划(.状态字典, .是否为协调器) if .flatten_state_dict: plan = dataclasses.替换(plan, planner_data=.映射) .计划 = 计划 if .启用计划缓存: 如果计划相同,我们可以跳过将计划发送给协调器。 if ( ._缓存的计划键 保存规划器.缓存保存计划 比较保存计划( 计划, 保存计划._缓存的保存计划[._缓存的计划键] ) ): 日志记录器.信息( 本地计划无变化。跳过将计划发送给协调器 ) 返回 保存计划 SavePlan([], 可用=错误) 否则: 保存计划._缓存的保存计划[._缓存的计划键] = 规划 返回 .规划 定义 创建全局规划( , 所有计划: 列表[保存计划 SavePlan] ) -> 元组[列表[保存计划 SavePlan], 元数据]: 所有计划 = 去重保存方案(所有方案, .去重保存至最低等级) 全局方案, 元数据 = 创建默认全局保存计划(所有计划) 如果 .展平状态字典: # | 不适用于 Python 3.8 或更早版本。 # merged_mappings = reduce( # lambda x, y: x | y, (p.planner_data for p in global_plan) # ) planner_data_dict = [p.planner_data for p 全局计划] 合并映射 = 字典(链映射(*规划器数据字典)) 元数据 = dataclasses.替换(元数据, 规划器数据=合并映射) 如果 验证全局计划(全局计划, 元数据): raise ValueError(验证全局计划失败) 返回 全球计划, 元数据 def 使用缓存创建全局计划( , 所有计划: 列表[保存计划 SavePlan] ) -> 元组[列表[保存计划 SavePlan], 列表[保存计划 SavePlan], 元数据]: "" 创建具有缓存的全球计划。 返回一个包含 global_plan_delta、global_plan 和 metadata 的元组。 "文档" global_plan_delta: 列表[保存计划 SavePlan] = 输入文本为空,请提供需要翻译的文本 如果 ._cached_plans_key 保存计划._cached_all_plans: # 对 all_plans 进行深拷贝,以避免去重后修改的计划被缓存 保存计划._缓存的全部计划[._缓存的计划键] = 复制.深拷贝( 全部计划 ) 全球计划, 元数据 = 自身.创建全局计划(所有计划) 保存计划._缓存的全球计划[._缓存的计划键] = 全球计划 如果计划未被缓存,全局计划 delta 将与全局计划相同。 返回 全局计划, 全局计划, 元数据 我们为新 delta 计划获取全局计划。 # 等级已经缓存了没有变化的计划。 合并计划 = 合并本地计划( 保存计划._缓存的全部计划[._缓存的计划键], 全部计划 ) # 对合并后的计划进行深拷贝,以避免去重后缓存修改后的计划 保存计划._缓存的全部计划[._缓存的计划键] = 复制.深拷贝( 合并后的计划 ) 全球计划, 元数据 = .创建全球计划(合并计划) 如果 .缓存计划键 .缓存的全局计划: for 缓存计划, 新计划 zip( 保存规划器._cached_global_plan[._cached_plans_key], global_plan ): if _compare_save_plans(缓存计划, 新计划): 全球计划增量.追加(保存计划 SavePlan([], 可用=假的)) 否则: 全局计划增量.追加(新计划) 保存规划器._缓存的全球计划[._cached_plans_key] = global_plan # 如果计划已缓存,则 global_plan 的 delta 将是新 global_plan 和缓存 global_plan 的 delta。 # 的新 global_plan 和缓存 global_plan 的数量。 返回 全局计划增量, 全局计划, 元数据 def 创建全局计划( , 所有计划: 列表[保存计划 SavePlan] ) -> 元组[列表[保存计划 SavePlan], 元数据]: 全局计划增量: 列表[保存计划 SavePlan] = 输入文本为空,请提供需要翻译的文本 if .启用计划缓存: 如果计划已缓存,我们只需发送全局计划增量进行分散 横跨各个 rank。rank 将使用缓存的最终计划。 ( 全局计划增量, 全局计划, 元数据, ) = ._使用缓存创建全局计划(所有计划) 否则: 全局计划, 元数据 = .创建全局计划(所有计划) # 如果未启用缓存,全局增量计划将始终与新的全局计划相同。 全局计划增量 = 全局计划 .全局计划 = 全局计划 .元数据 = 元数据 返回 全局计划增量, .元数据 def 使用缓存完成计划(, 新计划: 保存计划 SavePlan) -> 保存计划 SavePlan: 完成计划: 保存计划 = 新计划 if 新计划.可用: 完成计划 = 保存规划器._缓存的最终保存计划[._缓存的计划键] 否则: 完成计划 = 新计划 保存规划器._缓存的最终保存计划[.缓存计划键] = 新计划 返回 完成计划 def 完成计划(, 新计划: 保存计划 SavePlan) -> 保存计划 SavePlan: 完成计划: 保存计划 = 新计划 if .启用计划缓存: 完成计划 = .使用缓存完成计划(新计划) .规划 = 完成规划 返回 .规划 def 解析数据(, 编写条目: 编写字符串) -> 联盟[torch.张量, 输入/输出.BytesIO]: 对象 = .查找对象(编写条目.索引) 返回 .转换对象(编写条目, 对象)
[文档] def 查找对象(self, 索引: 元数据索引) -> 任意: """从规划器接口扩展,使其易于扩展默认规划器。""" return find_state_dict_object(self.state_dict, index)
[文档] def transform_object(self, write_item: WriteItem, object: Any): """从规划器接口扩展,使其易于扩展默认规划器。""" if write_item.type == WriteItemType.BYTE_IO: bytes = io.BytesIO() torch.save(object, bytes) object = bytes return object
[文档] 默认加载规划器(加载规划器): "" 默认加载规划器,在加载规划器的基础上增加了多个功能。 具体来说,它增加了以下功能: flatten_state_dict: 处理嵌套字典的 state_dict flatten_sharded_tensors: 用于 FSDP 的 2D 并行模式 allow_partial_load: 如果为 False,当 state_dict 中存在但 checkpoint 中不存在的键时,将引发运行时错误。 "文档" original_state_dict: 状态字典类型 映射: 展平映射 def __init__( , 展平状态字典: 布尔 = True, 展平分片张量: 布尔 = True, 允许部分加载: 布尔 = 假的, ) -> None: .展平状态字典 = 展平状态字典 .展平分片张量 = 展平分片张量 .原始状态字典 = {} .映射关系 = {} .允许部分加载 = 允许部分加载 def 设置规划器( , 状态字典: 状态字典类型, 元数据: 可选[元数据] = None, 是协调员: 布尔 = 错误, ) -> : 初始化状态字典(状态字典) .原始状态字典 = 状态字典 如果 .展平分片张量: 状态字典 = _展平分片张量(状态字典) 如果 .展平状态字典: 状态字典, .映射 = 展平状态字典(状态字典) .状态字典 = 状态字典 .元数据 = 元数据 .是否协调器 = 是否为协调器 def 创建本地计划() -> 加载计划: 断言 .元数据 如果 .展平状态字典: # 为了支持在 v2.4 之前保存的检查点,我们必须 区分是否由于旧检查点导致的缺失键。 合同如下: 1. 我们发现缺失键的 3 种情况。 1.1 实际缺失键,但 allow_partial_load 为 False。 # 1.2 实际缺少键,但允许部分加载为 True # 1.3 旧检查点,但允许部分加载为 False # 1.4 旧检查点,但允许部分加载为 True # 2. 如果我们找到缺少的键,我们首先将键转换回 v2.3 的键格式 如果之前缺失的键在 v2.3 的键中,我们假设 这是一个旧检查点。 将 state_dict 传递给 `create_default_local_load_plan()`, # 具有检查 allow_partial_load 是否缺失的逻辑。 因此对于 1.2 和 1.4 情况,我们将 allow_partial_load 检查委托给 `create_default_local_load_plan()`。这里的逻辑是确定 # 检查点属于 2.3(或之前)还是 2.4(或之后)。 当前键 = 设置(.状态字典.()) 加载键 = 设置(.元数据.状态字典元数据.()) 缺少键 = 加载键 - 当前键 if 缺少键: _版本.派生版本 = "2_3" 旧状态字典, 旧映射 = 展平状态字典( .原始状态字典 ) 旧键 = 设置(旧状态字典.()) if 旧键 & 缺少键: .状态字典, .映射 = 旧状态字典, 旧映射 # _derived_version 仅由 flatten_state_dict 使用。 # 将其恢复为 None,以便稍后保存到新版本。 _版本._derived_version = 返回 创建默认本地加载计划( .状态字典, .元数据, .允许部分加载 ) def 创建全局计划(, 全局计划: 列表[加载计划]\) -> 列表[加载计划]: 返回 创建默认全局加载计划(全局计划) def 完成计划(, 新计划: 加载计划) -> 加载计划: 返回 新计划 def 加载数据(, 读取项目: 读取项目, : 输入/输出.BytesIO) -> : if .展平状态字典: 设置元素( .原始状态字典, .映射关系[阅读项.目标索引.完全限定名], torch.加载(, 仅权重=假的), ) 否则: .状态字典[阅读项.目标索引.完全限定名] = torch.加载( , 仅权重= ) def 解析张量(, 读取项: 读取项): 张量 = .查找张量(读取项.目标索引) 返回 .转换张量(读取项, 张量) def 提交张量(, 读取项: 读取项, 张量: torch.张量) -> : 通过
[文档] def 查找张量(self, index: 元数据索引) -> torch.Tensor: 从规划器接口扩展,使其易于扩展默认规划器。 返回 find_state_dict_object(self.state_dict, index)
[文档] def transform_tensor(self, read_item: ReadItem, tensor: torch.Tensor): 从规划器接口扩展,使其易于扩展默认规划器。 通过索引缩小 tensor_by_index(tensor, read_item.dest_offsets, read_item.lengths)
_EmptyStateDictLoadPlanner(默认加载规划器): "" DefaultLoadPlanner 的扩展,从保存的元数据中重建状态字典。 适用于在不首先初始化模型的情况下加载 state_dict,例如 将 DCP 检查点转换为 Torch 保存文件时。 . 注意:在使用此 LoadPlanner 时,`state_dict`必须为空字典。 .. 警告:: 因为整个状态字典都被初始化了,所以建议只使用 在单个 rank 或进程中加载此 LoadPlanner 以避免内存溢出。 "文档" def __init__(, =, *参数, **kwargs): . = 超级().__init__(*参数, **kwargs) def 应包含此键(, : 字符串, 元数据: 元数据) -> 布尔: 如果 . None: 返回 真实 如果 key .: 真实 未展平的键: 列表[字符串] = 输入文本为空,请提供需要翻译的文本 规划器数据 = 元数据.规划数据.获取() 未展平键 规划数据: 如果 未展平键列表: 未展平的键.追加( “点”.连接([未展平的键[-1], 字符串(未展平的键)]]) ) 否则: 未展平的键.追加(未展平的键) 如果 任何(未展平的键 . 未展平的键 未展平的键列表): 返回 真实 返回 def 设置计划器( , 状态字典: 状态字典类型, 元数据: 可选[元数据] = None, 是否为协调器: 布尔 = 假的, ) -> None: 断言 状态字典 断言 元数据 从元数据重建状态字典 k, v 元数据.状态字典元数据.项目(): 如果 ._是否包含键(k, 元数据): continue if isinstance(v, 张量存储元数据): v = torch.空的(v.尺寸, 数据类型=v.属性.数据类型) # 类型:忽略[赋值] if k 元数据.规划数据: 设置元素(状态字典, 元数据.规划数据[k], v) 否则: 状态字典[k] = v 超级().设置规划器(状态字典, 元数据, 是否为协调器) def 创建默认本地加载计划( 状态字典: 字典[字符串, 任何], 元数据: 元数据, 严格的: 布尔 = 真实 ) -> 加载计划: 请求 = 输入文本为空,请提供需要翻译的文本 "" 创建由 DefaultLoadPlanner 使用的``LoadPlan``。 它使用``metadata``中的元数据,为``state_dict``中的每个值生成一个读取项。 默认行为是在 state_dict 和 metadata 之间精确匹配键。 通过对存储进行多次读取请求来处理重新分片,以匹配负载需求。 "文档" for 完全限定名, 对象 状态字典.项目(): 如果 strict=False,则忽略 `state_dict` 中不存在的 `state_dict` 键。 如果 完全限定名 否。 元数据.状态字典元数据: 如果 严格的: raise 运行时错误(f"在检查点状态字典中缺少键:"{完全限定名}.") 否则: continue md = 元数据.状态字典元数据[完全限定名] 如果 ( isinstance(md, 张量存储元数据) 并且 getattr(对象, 大小, ) 并且 md.大小 != 对象.尺寸() ): raise ValueError( f"已保存的大小不匹配"{md.尺寸}和当前:{对象.尺寸()}对于{完全限定名}", ) 由于 DTensor 支持子网格,因此添加额外的检查以确保_create_read_items() 仅当当前排名是相应 DTensor 的 mesh 部分时才被调用。 如果 isinstance(对象, DTensor): 如果 对象.设备网格.获取坐标() : 请求 += _创建读取项(完全限定名, md, 对象) 否则: 请求 += 创建读取项(完全限定名, md, 对象) 返回 加载计划(请求) def 创建默认全局加载计划( 所有计划: 列表[加载计划], ) -> 列表[加载计划]: "" 创建默认加载规划器使用的全局加载计划。 默认加载行为不涉及全局协调,此功能 目前不会改变本地计划。 "文档" 返回 all_plans def create_default_local_save_plan( 状态字典: 字典[字符串, 任何], 是否为协调器: 布尔 ) -> 保存计划 SavePlan: "" 创建由 DefaultSavePlanner 使用的``SavePlan``。 在非协调器节点上,此函数忽略张量和非张量对象, 只为 ShardedTensor 对象生成写操作。 在协调器节点上,为所有值生成写操作。 "文档" 请求 = 输入文本为空,请提供需要翻译的文本 完全限定名, 对象 状态字典.项目(): 由于 DTensor 支持子网格,添加额外的检查以确保 _create_write_items() # 仅当当前节点是相应 DTensor 网格的一部分时才调用。 if isinstance(对象, DTensor): if 对象.设备网格.获取坐标() : 请求 += 创建写入项(完全限定名, 对象) 否则: 对于平面张量和非张量值,添加所有请求 # 排名。协调员将决定是否根据键值进行去重 # 基于键值来决定值。 请求 += _创建写入项(完全限定名, 对象) 返回 保存计划 SavePlan(请求) def 创建默认全局保存计划( 所有计划: 列表[保存计划 SavePlan], 重新编写索引提示: 布尔 = True, ) -> 元组[列表[保存计划 SavePlan], 元数据]: "" 创建 DefaultSavePlanner 使用的全局计划和元数据。 元数据通过连接所有``WriteItem``从提供的计划中的元数据生成。 唯一的全局规划更改是在所有``MetadataIndex``对象中更新索引提示。 当``rewrite_index_hints``为 True 时。 "文档" md: 字典[字符串, 存储类型] = {} 新计划 = 输入文本为空,请提供需要翻译的文本 计划 所有计划: 新项目 = 输入文本为空,请提供需要翻译的文本 项目 计划.项目: 如果 项目.类型 == 写入项目类型.碎片: 断言 项目.索引.完全限定名 md 如果 项目.类型 == 写入项类型.字节输入输出: md[项目.索引.完全限定名] = 字节存储元数据() 新项目.追加(项目) 否则: 断言 项目.张量数据 张量_md = 角色( 张量存储元数据, md.setdefault( 项目.索引.完全限定名, 张量存储元数据( 属性=项目.张量数据.属性, 尺寸=项目.张量数据.尺寸, 数据块们=[] ), ), ) 新项目 = 项目 如果 重新编写索引提示: 新索引 = dataclasses.替换( 项目.索引, 索引=len(张量文档.数据块们) ) 新条目 = dataclasses.替换(项目, 索引=新索引) 新条目们.追加(新条目) 断言 项目.张量数据.数据块 None, f"" 无法为没有边界的张量创建 MD。 全限定名:{项目.索引.完全限定名} "文档" 张量_md.数据块们.追加(项目.张量数据.数据块) 新计划.追加(dataclasses.替换(计划, 项目=新项目)) 返回 (新计划, 元数据(md)) def _创建默认本地元数据(状态字典: STATE_DICT_TYPE) -> 元数据: “如果使用了 DefaultSavePlanner 来检查点 state_dict,则返回“元数据”。” 计划 = _仅创建默认元数据计划(state_dict) _, md = create_default_global_save_plan[plan]) 返回 md def _复选框重叠检查(box0: 块存储元数据, box1: 块存储元数据) -> 布尔: 检查两个盒子是否重叠。元组为(偏移量,长度)。 对于每个分片维度,检查一个分片是否位于另一个分片上 与该维度相关的第二个分片结束。例如,对于二维分片,我们会检查一个分片是否在另一个分片之上或在其左侧。 # shard, we would check if one shard is above or on the # other shard. 维度数 = 长度(盒子 0.偏移量) for i 范围(维数): 如果 盒子 0.偏移量[i] 大于等于 盒子 1.偏移量[i] + 盒子 1.尺寸[i]: 返回 如果 盒子 1.偏移量[i] >= 盒子 0.偏移量[i] + 盒子 0.尺寸[i]: 返回 返回 真实 def 检查复选框边界( 外框大小: torch.尺寸, 内框: 块存储元数据 ) -> 布尔: i 范围(长度(外箱尺寸)): if 内箱.偏移量[i] < 0: 返回 if 内箱.尺寸[i] < 0: 返回 if 内箱.偏移量[i] + 内部盒.尺寸[i] > 外部盒尺寸[i]: 返回 返回 真实 def _验证全局计划(全局计划: 列表[保存计划 SavePlan], 元数据: 元数据) -> 布尔: 一切正常 = 真实 , 元数据.状态字典元数据.项目(): if isinstance(, 字节数据存储元数据): continue if 长度(.尺寸) == 0: continue 数据块体积 = 0 chunk_idx, chunk0 列举(.数据块们): 计算体积 if _检查框边界(.尺寸, chunk0): 日志记录器.警告( "" 键:%s 超出范围块: 张量大小:%s 块:%s """, , .尺寸, chunk0, ) 一切顺利 = 块体积 += 减少(操作符., 块 0.尺寸, 1) 检查重叠 chunk1 .数据块们[chunk_idx + 1 ] if _check_box_overlap(chunk0, chunk1): 日志记录器.警告( "键:%s具有重叠的块:%s %s", , chunk0, chunk1 ) 一切顺利 = 检查组合块是否覆盖整个张量 张量体积 = 减少(操作符., .尺寸, 1) 如果 块体积 != 张量体积: 日志记录器.警告( "" 键:%s 无效填充 张量体积: %s 块体积:%s """, , 张量体积, 块体积, ) 都很好 = 返回 都很好

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源