# mypy: 允许未类型化定义
导入 argparse
导入
操作系统
from 枚举
导入
枚举
from 打字
导入
角色,
可选,
联合
导入
火炬
导入 torch.distributed
是 dist
from torch.distributed._shard._utils 导入
窄化张量通过索引
from torch.distributed.checkpoint 导入
文件系统读取器,
文件系统写入器
from torch.distributed.checkpoint._nested_dict 导入
展平状态字典
from torch.distributed.checkpoint.default_planner 导入 (
空状态字典加载规划器,
默认加载规划器,
)
from torch.distributed.checkpoint.metadata 导入 (
元数据,
状态字典类型,
存储类型,
张量属性,
张量存储元数据,
)
from torch.distributed.checkpoint.planner 导入
载入项目类型,
加载计划,
载入规划器
from torch.distributed.checkpoint.planner_helpers 导入 _create_chunk_list
from torch.distributed.checkpoint.state_dict_loader 导入 _load_state_dict
from torch.distributed.checkpoint.state_dict_saver 导入 _save_state_dict
from torch.distributed.checkpoint.storage 导入
存储读取器
from torch.futures 导入
未来
全部 = [
dcp_to_torch_save,
torch_save_to_dcp,
"BroadcastingTorchSaveReader",
"DynamicMetaLoadPlanner",
]
[文档]
类 BroadcastingTorchSaveReader(StorageReader):
""
用于读取 Torch 保存文件的 StorageReader。此读取器将读取整个检查点
在协调器 rank 上,然后将每个张量广播并分片到所有 rank
。注意:旨在与 DynamicMetaLoadPlanner 一起使用
.. 警告::
当前实现仅支持加载张量
>>> # xdoctest: +SKIP("undefined vars")
>>> sd = {"mode": model}
>>> dcp.load(
>>> sd,
>>> storage_reader=BroadcastingTorchSaveReader(),
>>> planner=DynamicMetaLoadPlanner(),
>>> checkpoint_id="path_to_model.pt"
>>> )
"文档"
def __init__(
我,
checkpoint_id: 可选[
联盟[
字符串,
操作系统.PathLike]] =
无,
协调器等级:
整型 = 0,
) -> 无:
我.
检查点 ID =
检查点 ID
我.
协调器等级 =
协调器等级
[文档] def read_metadata(self) -> Metadata:
扩展默认的 StorageReader 以支持构建元数据文件
# 元数据在 planner.set_up_planner 中构建,因为我们实际上并没有从
磁盘
return Metadata(state_dict_metadata={})
[文档] def
读取数据(
我,
规划:
加载计划,
规划器: LoadPlanner) ->
未来[
无
]:
""
在协调器 rank 上读取 torch 保存的数据,之后进行广播
这会产生通信成本,但避免了需要加载
每个 rank 上的整个检查点,希望防止内存溢出问题
"文档"
规划器 =
角色(
默认加载规划器,
规划器)
数据在协调器节点上读取,之后进行广播
这会产生通信成本,但避免了需要加载
每个 rank 上的完整检查点,希望防止内存溢出问题
# TODO:在每个主机上读取,而不是只读取协调器
if 我.
是否为协调器:
断言
我.
检查点 ID
是
不
无
torch 状态字典 = torch.
加载(
我.
检查点 ID,
地图位置="cpu",
仅权重=
假
)
if 规划器.
展平状态字典:
PyTorch 状态字典, _ =
展平状态字典(torch_state_dict)
否则:
torch_state_dict = 无
为 req
在 plan.
项目:
if req.类型 == LoadItemType.BYTE_IO:
raise 运行时错误(
f"非张量值识别在"{req.
存储索引.
完全限定名}
.
f"目前"{
类型(
我).__name__}
仅支持加载张量。"
)
# 从协调器 rank 广播张量
if 我.is_coordinator:
PG 设备 =
距离.
分布式_c10d._get_pg_default_device()
张量 = torch_state_dict[req.
存储索引.
完全限定名].
到(
pg 设备)
否则:
张量 = torch.
空值类似(
规划器.
状态字典[req.
存储索引.
完全限定名
]\)
距离.
广播(
张量,
源=
我.
协调器排名, async_op=
假的)
张量 =
通过索引缩小张量(
张量, req.
存储偏移量, req.
长度)
目标张量 =
规划器.
解析张量(req).detach()
断言
目标张量.
尺寸() ==
张量.
尺寸(), (
f"需求{req.
存储索引}
大小不匹配,"
f"{目标张量.
尺寸()}
与{
张量.
尺寸()}"
)
目标张量.
复制_(
张量)
规划器.
提交张量(req,
目标张量)
fut: 未来 =
未来()
fut.设置结果(
无)
返回 fut
[文档] def set_up_storage_reader(self, metadata: Metadata, is_coordinator: bool) -> None:
"""存储读取器的实现"""
self.is_coordinator = is_coordinator
if self.is_coordinator:
assert dist.get_rank() == self.coordinator_rank
assert self.checkpoint_id is not None
[文档] def prepare_local_plan(self, plan: LoadPlan) -> LoadPlan:
"""存储读取器方法的实现"""
返回计划
[文档] def 准备全局计划(self, global_plan: list[LoadPlan]) -> list[LoadPlan]:
"""存储读取器方法的实现"""
返回全局计划
[文档] def reset(self, checkpoint_id: Union[str, os.PathLike, None] = None) -> None:
"""存储读取器的实现"""
self.checkpoint_id = checkpoint_id
[文档] @classmethod
def validate_checkpoint_id(cls, checkpoint_id: Union[str, os.PathLike]) -> bool:
"""实现 StorageReader 方法的实现"""
return os.path.isfile(checkpoint_id)
[文档]def dcp_to_torch_save(
dcp_checkpoint_dir: Union[str, os.PathLike],
torch_save_path: Union[str, os.PathLike],
):
"""
给定一个包含 DCP 检查点的目录,此函数将将其转换为
PyTorch 保存文件。
Args:
dcp_checkpoint_dir: 包含 DCP 检查点的目录。
torch_save_path: 存储转换后的 Torch 保存文件的文件名。
..警告::
避免内存溢出,建议仅在单个 rank 上运行此函数。
"""
sd: STATE_DICT_TYPE = {}
_加载状态字典(
sd,
storage_reader=FileSystemReader(dcp_checkpoint_dir),
planner=_EmptyStateDictLoadPlanner(),
no_dist=True,
)
torch.save(sd, torch_save_path)
[文档]def torch_save_to_dcp(
torch_save_path: Union[str, os.PathLike],
dcp_checkpoint_dir: Union[str, os.PathLike],
):
"""
给定 torch 保存文件的位置,将其转换为 DCP 检查点。
Args:
torch_save_path: Torch 保存文件的文件名。
dcp_checkpoint_dir: 存储 DCP 检查点的目录。
..警告::
避免内存溢出,建议仅在单个 rank 上运行此函数。
"""
state_dict = torch.load(torch_save_path, weights_only=False)
由于预期任何通过此方式加载的内容都不需要状态行为,因此我们这里不需要状态行为。
torch.load 不会包含状态对象。
_save_state_dict(
state_dict, storage_writer=FileSystemWriter(dcp_checkpoint_dir), no_dist=True
)
如果 __name__ == "__main__":
类
格式模式(
枚举):
火炬转 DCP =
torch_to_dcp
DCP 转火炬 =
dcp_to_torch
解析命令行参数
解析器 = argparse.ArgumentParser()
解析器.
添加参数(
模式,
类型=
字符串,
帮助=
"转换模式",
选择=[m.
值 for m
在
格式模式
],
默认=
格式模式.
火炬转 DCP,
)
解析器.
添加参数(
"源",
类型=
字符串,
帮助=
"源模型路径")
解析器.
添加参数(
"目标",
类型=
字符串,
帮助=
"目标模型路径")
args = 解析器.
解析参数()
打印(
f"将检查点从{
参数.
源}
到{
参数.
目标}
使用方法:'{
参数.
模式}
)
检查点缺失警告 = (
f在此处未找到检查点{
参数.
源}
. 跳过转换。
)
if 参数.
模式 ==
格式模式.
火炬转 DCP.
值:
if 操作系统.
路径.
判断是否为文件(
参数.
源):
火炬保存为 DCP(
参数.
源,
参数.
目标)
否则:
打印(
检查点缺失警告)
elif 参数.
模式 ==
格式模式.DCP_TO_TORCH.
值:
如果 os.
路径.isdir(
参数.
源):
dcp_to_torch_save(参数.
源,
参数.
目标)
否则:
打印(
检查点缺失警告)
否则:
raise ValueError(f"未知转换模式:"{
参数.
模式}")