# mypy: 允许未类型化定义
版权所有(C)Meta Platforms,Inc. 及其关联公司
导入
复制
导入 dataclasses
导入
输入/输出
导入
记录日志
导入
操作符
from 集合
导入
链式映射
from functools 导入 reduce
from 打字
导入
任何,
角色,
可选,
联合
导入
火炬
from torch.distributed._shard._utils 导入
通过索引缩小张量
from torch.distributed.checkpoint._dedup_save_plans 导入
去重保存计划
from torch.distributed.checkpoint._nested_dict 导入 (
平铺映射,
平铺状态字典,
)
from torch.distributed.checkpoint._sharded_tensor_utils 导入 _flatten_sharded_tensors
from torch.distributed.checkpoint._traverse 导入 set_element
from torch.distributed.checkpoint.metadata 导入 (
字节存储元数据,
块存储元数据,
元数据,
元数据索引,
STATE_DICT_TYPE,
STORAGE_TYPES,
StorageMeta,
TensorStorageMetadata,
)
from torch.distributed.checkpoint.planner 导入 (
加载计划,
加载规划器,
读取项目,
保存计划 SavePlan,
保存规划器,
编写项目,
编写项目类型,
)
from torch.distributed.checkpoint.planner_helpers 导入 (
_compare_save_plans,
_创建默认元数据计划,
_创建读取项,
_创建写入项,
_初始化状态字典,
合并本地计划差异,
)
from torch.distributed.checkpoint.utils 导入
查找状态字典对象
来自 torch.distributed.tensor
导入 DTensor
from . 导入 _version
日志记录器:
记录.
日志记录器 =
记录.
获取日志记录器(__name__)
全部 = [
"默认保存计划器",
"默认加载规划器",
"创建默认本地加载计划",
"创建默认全局加载计划",
"创建默认本地保存计划",
"创建默认全局保存计划",
]
# TODO: 更新 default_planner.py 的文档字符串
[文档]
类
默认保存规划器(
保存规划器):
映射:
平铺映射
定义
初始化(
我,
展平状态字典:
布尔 =
是的,
展平分片张量:
布尔 =
是的,
去重复制张量:
可选[
布尔] =
无,
去重并保存到最低等级:
布尔 =
错误,
启用计划缓存:
布尔 =
错误,
) -> 无:
我.
展平状态字典 =
展平状态字典
我.
展平分片张量 =
展平分片张量
我.
映射 = {}
我.
去重并保存到最低秩 =
去重保存到最低排名
如果
dedup_replicated_tensors
去重复制张量 是
不
无:
日志记录器.
警告(
默认保存规划器的 `dedup_replicated_tensors` 参数正在被
弃用,并且不再有任何效果。请移除此参数
"您的来电。"
)
我._cached_plans_key:
字符串 =
我.
类.__name__
我.
_启用计划缓存 =
启用计划缓存
def 设置计划器(
我,
状态字典:
状态字典类型,
存储元数据:
可选[StorageMeta] =
无,
是否为协调器:
布尔 =
假的,
) -> 无:
if 我.
展平状态字典:
状态字典,
我.
映射 =
展平状态字典(
状态字典)
if 我.
展平分片张量:
状态字典 =
__展平分片张量(
状态字典)
我.
状态字典 =
状态字典
我.
是否是协调器 =
是否是协调器
def 创建本地计划(
我) ->
保存计划 SavePlan:
计划 =
创建默认本地保存计划(
我.
状态字典,
我.
是否为协调器)
if 我.flatten_state_dict:
plan = dataclasses.替换(plan, planner_data=
我.
映射)
我.
计划 =
计划
if 我.
启用计划缓存:
如果计划相同,我们可以跳过将计划发送给协调器。
if (
我.
_缓存的计划键
在
保存规划器.
缓存保存计划
和
比较保存计划(
计划,
保存计划.
_缓存的保存计划[
我.
_缓存的计划键]
)
):
日志记录器.
信息(
本地计划无变化。跳过将计划发送给协调器
)
返回
保存计划 SavePlan([],
可用=
错误)
否则:
保存计划.
_缓存的保存计划[
我.
_缓存的计划键] =
规划
返回
我.
规划
定义
创建全局规划(
我,
所有计划:
列表[
保存计划 SavePlan]
) -> 元组[
列表[
保存计划 SavePlan
],
元数据
]:
所有计划 =
去重保存方案(
所有方案,
我.
去重保存至最低等级)
全局方案,
元数据 =
创建默认全局保存计划(
所有计划)
如果
我.
展平状态字典:
# | 不适用于 Python 3.8 或更早版本。
# merged_mappings = reduce(
# lambda x, y: x | y, (p.planner_data for p in global_plan)
# )
planner_data_dict = [p.planner_data for p 在
全局计划]
合并映射 =
字典(
链映射(*
规划器数据字典))
元数据 = dataclasses.
替换(
元数据,
规划器数据=
合并映射)
如果
不
验证全局计划(
全局计划,
元数据):
raise ValueError(验证全局计划失败)
返回
全球计划,
元数据
def 使用缓存创建全局计划(
我,
所有计划:
列表[
保存计划 SavePlan]
) -> 元组[
列表[
保存计划 SavePlan
],
列表[
保存计划 SavePlan
],
元数据
]:
""
创建具有缓存的全球计划。
返回一个包含 global_plan_delta、global_plan 和 metadata 的元组。
"文档"
global_plan_delta: 列表[
保存计划 SavePlan] =
输入文本为空,请提供需要翻译的文本
如果
我._cached_plans_key
不
在
保存计划._cached_all_plans:
# 对 all_plans 进行深拷贝,以避免去重后修改的计划被缓存
保存计划.
_缓存的全部计划[
我.
_缓存的计划键] =
复制.
深拷贝(
全部计划
)
全球计划,
元数据 =
自身.
创建全局计划(
所有计划)
保存计划.
_缓存的全球计划[
我.
_缓存的计划键] =
全球计划
如果计划未被缓存,全局计划 delta 将与全局计划相同。
返回
全局计划,
全局计划,
元数据
我们为新 delta 计划获取全局计划。
# 等级已经缓存了没有变化的计划。
合并计划 =
合并本地计划(
保存计划.
_缓存的全部计划[
我.
_缓存的计划键
],
全部计划
)
# 对合并后的计划进行深拷贝,以避免去重后缓存修改后的计划
保存计划.
_缓存的全部计划[
我.
_缓存的计划键] =
复制.
深拷贝(
合并后的计划
)
全球计划,
元数据 =
我.
创建全球计划(
合并计划)
如果
我.
缓存计划键
在
我.
缓存的全局计划:
for 缓存计划,
新计划
在 zip(
保存规划器._cached_global_plan[
我._cached_plans_key
], global_plan
):
if _compare_save_plans(缓存计划,
新计划):
全球计划增量.
追加(
保存计划 SavePlan([],
可用=
假的))
否则:
全局计划增量.
追加(
新计划)
保存规划器.
_缓存的全球计划[
我._cached_plans_key] = global_plan
# 如果计划已缓存,则 global_plan 的 delta 将是新 global_plan 和缓存 global_plan 的 delta。
# 的新 global_plan 和缓存 global_plan 的数量。
返回
全局计划增量,
全局计划,
元数据
def 创建全局计划(
我,
所有计划:
列表[
保存计划 SavePlan]
) -> 元组[
列表[
保存计划 SavePlan
],
元数据
]:
全局计划增量:
列表[
保存计划 SavePlan] =
输入文本为空,请提供需要翻译的文本
if 我.
启用计划缓存:
如果计划已缓存,我们只需发送全局计划增量进行分散
横跨各个 rank。rank 将使用缓存的最终计划。
(
全局计划增量,
全局计划,
元数据,
) = 我.
_使用缓存创建全局计划(
所有计划)
否则:
全局计划,
元数据 =
我.
创建全局计划(
所有计划)
# 如果未启用缓存,全局增量计划将始终与新的全局计划相同。
全局计划增量 =
全局计划
我.
全局计划 =
全局计划
我.
元数据 =
元数据
返回
全局计划增量,
我.
元数据
def 使用缓存完成计划(
我,
新计划:
保存计划 SavePlan) ->
保存计划 SavePlan:
完成计划:
保存计划 =
新计划
if 不
新计划.
可用:
完成计划 =
保存规划器.
_缓存的最终保存计划[
我.
_缓存的计划键]
否则:
完成计划 =
新计划
保存规划器.
_缓存的最终保存计划[
我.
缓存计划键] =
新计划
返回
完成计划
def 完成计划(
我,
新计划:
保存计划 SavePlan) ->
保存计划 SavePlan:
完成计划:
保存计划 =
新计划
if 我.
启用计划缓存:
完成计划 =
我.
使用缓存完成计划(
新计划)
我.
规划 =
完成规划
返回
我.
规划
def 解析数据(
我,
编写条目:
编写字符串) ->
联盟[torch.
张量,
输入/输出.BytesIO
]:
对象 =
我.
查找对象(
编写条目.
索引)
返回
我.
转换对象(
编写条目,
对象)
[文档] def 查找对象(self, 索引: 元数据索引) -> 任意:
"""从规划器接口扩展,使其易于扩展默认规划器。"""
return find_state_dict_object(self.state_dict, index)
[文档] def transform_object(self, write_item: WriteItem, object: Any):
"""从规划器接口扩展,使其易于扩展默认规划器。"""
if write_item.type == WriteItemType.BYTE_IO:
bytes = io.BytesIO()
torch.save(object, bytes)
object = bytes
return object
[文档]
类
默认加载规划器(
加载规划器):
""
默认加载规划器,在加载规划器的基础上增加了多个功能。
具体来说,它增加了以下功能:
flatten_state_dict: 处理嵌套字典的 state_dict
flatten_sharded_tensors: 用于 FSDP 的 2D 并行模式
allow_partial_load: 如果为 False,当 state_dict 中存在但 checkpoint 中不存在的键时,将引发运行时错误。
"文档"
original_state_dict: 状态字典类型
映射:
展平映射
def __init__(
我,
展平状态字典:
布尔 = True,
展平分片张量:
布尔 = True,
允许部分加载:
布尔 =
假的,
) -> None:
我.
展平状态字典 =
展平状态字典
我.
展平分片张量 =
展平分片张量
我.
原始状态字典 = {}
我.
映射关系 = {}
我.
允许部分加载 =
允许部分加载
def 设置规划器(
我,
状态字典:
状态字典类型,
元数据:
可选[
元数据] = None,
是协调员:
布尔 =
错误,
) -> 无:
初始化状态字典(
状态字典)
我.
原始状态字典 =
状态字典
如果
我.
展平分片张量:
状态字典 =
_展平分片张量(
状态字典)
如果
我.
展平状态字典:
状态字典,
我.
映射 =
展平状态字典(
状态字典)
我.
状态字典 =
状态字典
我.
元数据 =
元数据
我.
是否协调器 =
是否为协调器
def 创建本地计划(
我) ->
加载计划:
断言
我.
元数据
是
不
无
如果
我.
展平状态字典:
# 为了支持在 v2.4 之前保存的检查点,我们必须
区分是否由于旧检查点导致的缺失键。
合同如下:
1. 我们发现缺失键的 3 种情况。
1.1 实际缺失键,但 allow_partial_load 为 False。
# 1.2 实际缺少键,但允许部分加载为 True
# 1.3 旧检查点,但允许部分加载为 False
# 1.4 旧检查点,但允许部分加载为 True
# 2. 如果我们找到缺少的键,我们首先将键转换回
v2.3 的键格式
如果之前缺失的键在 v2.3 的键中,我们假设
这是一个旧检查点。
将 state_dict 传递给 `create_default_local_load_plan()`,
# 具有检查 allow_partial_load 是否缺失的逻辑。
因此对于 1.2 和 1.4 情况,我们将 allow_partial_load 检查委托给
`create_default_local_load_plan()`。这里的逻辑是确定
# 检查点属于 2.3(或之前)还是 2.4(或之后)。
当前键 =
设置(
我.
状态字典.
键())
加载键 =
设置(
我.
元数据.
状态字典元数据.
键())
缺少键 =
加载键 -
当前键
if 缺少键:
_版本.
派生版本 = "2_3"
旧状态字典,
旧映射 =
展平状态字典(
我.
原始状态字典
)
旧键 =
设置(
旧状态字典.
键())
if 旧键 &
缺少键:
我.
状态字典,
我.
映射 =
旧状态字典,
旧映射
# _derived_version 仅由 flatten_state_dict 使用。
# 将其恢复为 None,以便稍后保存到新版本。
_版本._derived_version =
无
返回
创建默认本地加载计划(
我.
状态字典,
我.
元数据,
不
我.
允许部分加载
)
def 创建全局计划(
我,
全局计划:
列表[
加载计划
]\) ->
列表[
加载计划
]:
返回
创建默认全局加载计划(
全局计划)
def 完成计划(
我,
新计划:
加载计划) ->
加载计划:
返回
新计划
def 加载数据(
我,
读取项目:
读取项目,
值:
输入/输出.BytesIO) ->
无:
if 我.
展平状态字典:
设置元素(
我.
原始状态字典,
我.
映射关系[
阅读项.
目标索引.
完全限定名
],
torch.加载(
值,
仅权重=
假的),
)
否则:
我.
状态字典[
阅读项.
目标索引.
完全限定名] = torch.
加载(
值,
仅权重=
假
)
def 解析张量(
我,
读取项:
读取项):
张量 =
我.
查找张量(
读取项.
目标索引)
返回
我.
转换张量(
读取项,
张量)
def 提交张量(
我,
读取项:
读取项,
张量: torch.
张量) ->
无:
通过
[文档] def 查找张量(self, index: 元数据索引) -> torch.Tensor:
从规划器接口扩展,使其易于扩展默认规划器。
返回 find_state_dict_object(self.state_dict, index)
[文档] def transform_tensor(self, read_item: ReadItem, tensor: torch.Tensor):
从规划器接口扩展,使其易于扩展默认规划器。
通过索引缩小 tensor_by_index(tensor, read_item.dest_offsets, read_item.lengths)
类 _EmptyStateDictLoadPlanner(
默认加载规划器):
""
DefaultLoadPlanner 的扩展,从保存的元数据中重建状态字典。
适用于在不首先初始化模型的情况下加载 state_dict,例如
将 DCP 检查点转换为 Torch 保存文件时。
. 注意:在使用此 LoadPlanner 时,`state_dict`必须为空字典。
.. 警告::
因为整个状态字典都被初始化了,所以建议只使用
在单个 rank 或进程中加载此 LoadPlanner 以避免内存溢出。
"文档"
def __init__(我,
键=
无, *
参数, **kwargs):
我.
键 =
键
超级().__init__(*
参数, **kwargs)
def 应包含此键(
我,
键:
字符串,
元数据:
元数据) ->
布尔:
如果
我.
键
是 None:
返回
真实
如果 key
在
我.
键:
真实
未展平的键:
列表[
字符串] =
输入文本为空,请提供需要翻译的文本
规划器数据 =
元数据.
规划数据.
获取(
键)
为
未展平键
在
规划数据:
如果
未展平键列表:
未展平的键.
追加(
“点”.
连接([
未展平的键[-1
],
字符串(
未展平的键
)]])
)
否则:
未展平的键.
追加(
未展平的键)
如果
任何(
未展平的键
在
我.
键
为
未展平的键
在
未展平的键列表):
返回
真实
返回
假
def 设置计划器(
我,
状态字典:
状态字典类型,
元数据:
可选[
元数据] = None,
是否为协调器:
布尔 =
假的,
) -> None:
断言
不
状态字典
断言
元数据
是
不
无
从元数据重建状态字典
为 k, v
在
元数据.
状态字典元数据.
项目():
如果
不
我.
_是否包含键(k,
元数据):
continue
if isinstance(v, 张量存储元数据):
v = torch.空的(v.
尺寸,
数据类型=v.
属性.
数据类型)
# 类型:忽略[赋值]
if k 在
元数据.
规划数据:
设置元素(
状态字典,
元数据.
规划数据[k
], v)
否则:
状态字典[k] = v
超级().
设置规划器(
状态字典,
元数据,
是否为协调器)
def 创建默认本地加载计划(
状态字典:
字典[
字符串,
任何
],
元数据:
元数据,
严格的:
布尔 =
真实
) -> 加载计划:
请求 =
输入文本为空,请提供需要翻译的文本
""
创建由 DefaultLoadPlanner 使用的``LoadPlan``。
它使用``metadata``中的元数据,为``state_dict``中的每个值生成一个读取项。
默认行为是在 state_dict 和 metadata 之间精确匹配键。
通过对存储进行多次读取请求来处理重新分片,以匹配负载需求。
。
"文档"
for 完全限定名,
对象
在
状态字典.
项目():
如果 strict=False,则忽略 `state_dict` 中不存在的 `state_dict` 键。
如果
完全限定名
否。
在
元数据.
状态字典元数据:
如果
严格的:
raise 运行时错误(f
"在检查点状态字典中缺少键:"{
完全限定名}.")
否则:
continue
md = 元数据.
状态字典元数据[
完全限定名]
如果 (
isinstance(md, 张量存储元数据)
并且 getattr(
对象,
大小,
无)
是
不
无
并且 md.
大小 !=
对象.
尺寸()
):
raise ValueError(
f"已保存的大小不匹配"{md.
尺寸}
和当前:{
对象.
尺寸()}
对于{
完全限定名}",
)
由于 DTensor 支持子网格,因此添加额外的检查以确保_create_read_items()
仅当当前排名是相应 DTensor 的 mesh 部分时才被调用。
如果 isinstance(
对象, DTensor):
如果
对象.
设备网格.
获取坐标()
是
不
无:
请求 +=
_创建读取项(
完全限定名, md,
对象)
否则:
请求 +=
创建读取项(
完全限定名, md,
对象)
返回
加载计划(
请求)
def 创建默认全局加载计划(
所有计划:
列表[
加载计划
],
) -> 列表[
加载计划
]:
""
创建默认加载规划器使用的全局加载计划。
默认加载行为不涉及全局协调,此功能
目前不会改变本地计划。
"文档"
返回 all_plans
def create_default_local_save_plan(
状态字典:
字典[
字符串,
任何
],
是否为协调器:
布尔
) -> 保存计划 SavePlan:
""
创建由 DefaultSavePlanner 使用的``SavePlan``。
在非协调器节点上,此函数忽略张量和非张量对象,
只为 ShardedTensor 对象生成写操作。
在协调器节点上,为所有值生成写操作。
"文档"
请求 =
输入文本为空,请提供需要翻译的文本
为
完全限定名,
对象
在
状态字典.
项目():
由于 DTensor 支持子网格,添加额外的检查以确保 _create_write_items()
# 仅当当前节点是相应 DTensor 网格的一部分时才调用。
if isinstance(对象, DTensor):
if 对象.
设备网格.
获取坐标()
是
不
无:
请求 +=
创建写入项(
完全限定名,
对象)
否则:
对于平面张量和非张量值,添加所有请求
# 排名。协调员将决定是否根据键值进行去重
# 基于键值来决定值。
请求 +=
_创建写入项(
完全限定名,
对象)
返回
保存计划 SavePlan(
请求)
def 创建默认全局保存计划(
所有计划:
列表[
保存计划 SavePlan
],
重新编写索引提示:
布尔 = True,
) -> 元组[
列表[
保存计划 SavePlan
],
元数据
]:
""
创建 DefaultSavePlanner 使用的全局计划和元数据。
元数据通过连接所有``WriteItem``从提供的计划中的元数据生成。
唯一的全局规划更改是在所有``MetadataIndex``对象中更新索引提示。
当``rewrite_index_hints``为 True 时。
"文档"
md: 字典[
字符串,
存储类型] = {}
新计划 =
输入文本为空,请提供需要翻译的文本
为
计划
在
所有计划:
新项目 =
输入文本为空,请提供需要翻译的文本
为
项目
在
计划.
项目:
如果
不
项目.
类型 ==
写入项目类型.
碎片:
断言
项目.
索引.
完全限定名
不
在 md
如果
项目.
类型 ==
写入项类型.
字节输入输出:
md[项目.
索引.
完全限定名] =
字节存储元数据()
新项目.
追加(
项目)
否则:
断言
项目.
张量数据
是
不
无
张量_md =
角色(
张量存储元数据,
md.setdefault(
项目.
索引.
完全限定名,
张量存储元数据(
属性=
项目.
张量数据.
属性,
尺寸=
项目.
张量数据.
尺寸,
数据块们=
[]
),
),
)
新项目 =
项目
如果
重新编写索引提示:
新索引 = dataclasses.
替换(
项目.
索引,
索引=len(
张量文档.
数据块们)
)
新条目 = dataclasses.
替换(
项目,
索引=
新索引)
新条目们.
追加(
新条目)
断言
项目.
张量数据.
数据块
是
不 None, f
""
无法为没有边界的张量创建 MD。
全限定名:{
项目.
索引.
完全限定名}
"文档"
张量_md.
数据块们.
追加(
项目.
张量数据.
数据块)
新计划.
追加(dataclasses.
替换(
计划,
项目=
新项目))
返回 (
新计划,
元数据(md))
def _创建默认本地元数据(
状态字典: STATE_DICT_TYPE) ->
元数据:
“如果使用了 DefaultSavePlanner 来检查点 state_dict,则返回“元数据”。”
计划 =
_仅创建默认元数据计划(state_dict)
_, md = create_default_global_save_plan[plan])
返回 md
def _复选框重叠检查(box0:
块存储元数据, box1:
块存储元数据) ->
布尔:
检查两个盒子是否重叠。元组为(偏移量,长度)。
对于每个分片维度,检查一个分片是否位于另一个分片上
与该维度相关的第二个分片结束。例如,对于二维分片,我们会检查一个分片是否在另一个分片之上或在其左侧。
# shard, we would check if one shard is above or on the
# other shard.
维度数 =
长度(
盒子 0.
偏移量)
for i 在
范围(
维数):
如果
盒子 0.
偏移量[i]
大于等于
盒子 1.
偏移量[i] +
盒子 1.
尺寸[i
]:
返回
假
如果
盒子 1.
偏移量[i] >=
盒子 0.
偏移量[i] +
盒子 0.
尺寸[i
]:
返回
假
返回
真实
def 检查复选框边界(
外框大小: torch.
尺寸,
内框:
块存储元数据
) -> 布尔:
为 i
在
范围(
长度(
外箱尺寸)):
if 内箱.
偏移量[i] < 0:
返回
假
if 内箱.
尺寸[i] < 0:
返回
假
if 内箱.
偏移量[i] +
内部盒.
尺寸[i] >
外部盒尺寸[i
]:
返回
假
返回
真实
def _验证全局计划(
全局计划:
列表[
保存计划 SavePlan
],
元数据:
元数据) ->
布尔:
一切正常 =
真实
为
键,
值
在
元数据.
状态字典元数据.
项目():
if isinstance(值,
字节数据存储元数据):
continue
if 长度(
值.
尺寸) == 0:
continue
数据块体积 = 0
为 chunk_idx, chunk0
在
列举(
值.
数据块们):
计算体积
if 不
_检查框边界(
值.
尺寸, chunk0):
日志记录器.
警告(
""
键:%s 超出范围块:
张量大小:%s 块:%s
""",
键,
值.
尺寸,
chunk0,
)
一切顺利 =
假
块体积 +=
减少(
操作符.
多,
块 0.
尺寸, 1)
检查重叠
为 chunk1
在
值.
数据块们[chunk_idx + 1
]
if _check_box_overlap(chunk0, chunk1):
日志记录器.
警告(
"键:%s
具有重叠的块:%s %s",
键, chunk0, chunk1
)
一切顺利 =
假
检查组合块是否覆盖整个张量
张量体积 =
减少(
操作符.
多,
值.
尺寸, 1)
如果
块体积 !=
张量体积:
日志记录器.
警告(
""
键:%s 无效填充 张量体积:
%s 块体积:%s
""",
键,
张量体积,
块体积,
)
都很好 =
假
返回
都很好