快捷键

torch.distributed.checkpoint.format_utils 源代码

# mypy: 允许未类型化定义
导入 argparse
导入 操作系统
from 枚举 导入 枚举
from 打字 导入 角色, 可选, 联合

导入 火炬
导入 torch.distributed  dist
from torch.distributed._shard._utils 导入 窄化张量通过索引
from torch.distributed.checkpoint 导入 文件系统读取器, 文件系统写入器
from torch.distributed.checkpoint._nested_dict 导入 展平状态字典
from torch.distributed.checkpoint.default_planner 导入 (
    空状态字典加载规划器,
    默认加载规划器,
)
from torch.distributed.checkpoint.metadata 导入 (
    元数据,
    状态字典类型,
    存储类型,
    张量属性,
    张量存储元数据,
)
from torch.distributed.checkpoint.planner 导入 载入项目类型, 加载计划, 载入规划器
from torch.distributed.checkpoint.planner_helpers 导入 _create_chunk_list
from torch.distributed.checkpoint.state_dict_loader 导入 _load_state_dict
from torch.distributed.checkpoint.state_dict_saver 导入 _save_state_dict
from torch.distributed.checkpoint.storage 导入 存储读取器
from torch.futures 导入 未来


全部 = [
    dcp_to_torch_save,
    torch_save_to_dcp,
    "BroadcastingTorchSaveReader",
    "DynamicMetaLoadPlanner",
]


[文档] BroadcastingTorchSaveReader(StorageReader): "" 用于读取 Torch 保存文件的 StorageReader。此读取器将读取整个检查点 在协调器 rank 上,然后将每个张量广播并分片到所有 rank 。注意:旨在与 DynamicMetaLoadPlanner 一起使用 .. 警告:: 当前实现仅支持加载张量 >>> # xdoctest: +SKIP("undefined vars") >>> sd = {"mode": model} >>> dcp.load( >>> sd, >>> storage_reader=BroadcastingTorchSaveReader(), >>> planner=DynamicMetaLoadPlanner(), >>> checkpoint_id="path_to_model.pt" >>> ) "文档" def __init__( , checkpoint_id: 可选[联盟[字符串, 操作系统.PathLike]] = , 协调器等级: 整型 = 0, ) -> : .检查点 ID = 检查点 ID .协调器等级 = 协调器等级
[文档] def read_metadata(self) -> Metadata: 扩展默认的 StorageReader 以支持构建元数据文件 # 元数据在 planner.set_up_planner 中构建,因为我们实际上并没有从 磁盘 return Metadata(state_dict_metadata={})
[文档] def 读取数据(, 规划: 加载计划, 规划器: LoadPlanner) -> 未来[]: "" 在协调器 rank 上读取 torch 保存的数据,之后进行广播 这会产生通信成本,但避免了需要加载 每个 rank 上的整个检查点,希望防止内存溢出问题 "文档" 规划器 = 角色(默认加载规划器, 规划器) 数据在协调器节点上读取,之后进行广播 这会产生通信成本,但避免了需要加载 每个 rank 上的完整检查点,希望防止内存溢出问题 # TODO:在每个主机上读取,而不是只读取协调器 if .是否为协调器: 断言 .检查点 ID torch 状态字典 = torch.加载( .检查点 ID, 地图位置="cpu", 仅权重= ) if 规划器.展平状态字典: PyTorch 状态字典, _ = 展平状态字典(torch_state_dict) 否则: torch_state_dict = req plan.项目: if req.类型 == LoadItemType.BYTE_IO: raise 运行时错误( f"非张量值识别在"{req.存储索引.完全限定名}. f"目前"{类型().__name__}仅支持加载张量。" ) # 从协调器 rank 广播张量 if .is_coordinator: PG 设备 = 距离.分布式_c10d._get_pg_default_device() 张量 = torch_state_dict[req.存储索引.完全限定名].(pg 设备) 否则: 张量 = torch.空值类似(规划器.状态字典[req.存储索引.完全限定名]\) 距离.广播(张量, =.协调器排名, async_op=假的) 张量 = 通过索引缩小张量(张量, req.存储偏移量, req.长度) 目标张量 = 规划器.解析张量(req).detach() 断言 目标张量.尺寸() == 张量.尺寸(), ( f"需求{req.存储索引}大小不匹配," f"{目标张量.尺寸()}{张量.尺寸()}" ) 目标张量.复制_(张量) 规划器.提交张量(req, 目标张量) fut: 未来 = 未来() fut.设置结果() 返回 fut
[文档] def set_up_storage_reader(self, metadata: Metadata, is_coordinator: bool) -> None: """存储读取器的实现""" self.is_coordinator = is_coordinator if self.is_coordinator: assert dist.get_rank() == self.coordinator_rank assert self.checkpoint_id is not None
[文档] def prepare_local_plan(self, plan: LoadPlan) -> LoadPlan: """存储读取器方法的实现""" 返回计划
[文档] def 准备全局计划(self, global_plan: list[LoadPlan]) -> list[LoadPlan]: """存储读取器方法的实现""" 返回全局计划
[文档] def reset(self, checkpoint_id: Union[str, os.PathLike, None] = None) -> None: """存储读取器的实现""" self.checkpoint_id = checkpoint_id
[文档] @classmethod def validate_checkpoint_id(cls, checkpoint_id: Union[str, os.PathLike]) -> bool: """实现 StorageReader 方法的实现""" return os.path.isfile(checkpoint_id)
[文档]class DynamicMetaLoadPlanner(DefaultLoadPlanner): """ 默认加载规划器的扩展,根据传入的状态字典创建一个新的元数据对象, 避免了需要从磁盘读取元数据的需求。这在读取没有元数据文件的格式时很有用, 比如 Torch 保存文件。 . 注意:建议与 BroadcastingTorchSaveReader 一起使用 .. 警告: 当前实现仅支持加载张量。 >>> # xdoctest: +SKIP("未定义的变量") >>> sd = {"mode": 模型} >>> dcp.load( >>> sd, >>> storage_reader=BroadcastingTorchSaveReader(), >>> 规划器=DynamicMetaLoadPlanner(), >>> checkpoint_id="path_to_model.pt" >>> ) """
[文档] def set_up_planner( self, state_dict: STATE_DICT_TYPE, metadata: Optional[Metadata] = None, is_coordinator: 布尔值 = False, ) -> None: """设置规划器,通过从状态字典创建 Metadata 对象扩展默认行为""" super().set_up_planner(state_dict, metadata, is_coordinator) state_dict_metadata: dict[str, STORAGE_TYPES] = {} for key, tensor in self.state_dict.items(): if not torch.is_tensor(tensor): raise RuntimeError( 在 {key} 处识别到非张量值。 目前 {type(self).__name__} 只支持加载张量。 ) state_dict_metadata[key] = TensorStorageMetadata( Tensor 属性(dtype=tensor.dtype) tensor 的大小() _create_chunk_list(tensor)() ) self.metadata = Metadata(state_dict_metadata=state_dict_metadata)
[文档]def dcp_to_torch_save( dcp_checkpoint_dir: Union[str, os.PathLike], torch_save_path: Union[str, os.PathLike], ): """ 给定一个包含 DCP 检查点的目录,此函数将将其转换为 PyTorch 保存文件。 Args: dcp_checkpoint_dir: 包含 DCP 检查点的目录。 torch_save_path: 存储转换后的 Torch 保存文件的文件名。 ..警告:: 避免内存溢出,建议仅在单个 rank 上运行此函数。 """ sd: STATE_DICT_TYPE = {} _加载状态字典( sd, storage_reader=FileSystemReader(dcp_checkpoint_dir), planner=_EmptyStateDictLoadPlanner(), no_dist=True, ) torch.save(sd, torch_save_path)
[文档]def torch_save_to_dcp( torch_save_path: Union[str, os.PathLike], dcp_checkpoint_dir: Union[str, os.PathLike], ): """ 给定 torch 保存文件的位置,将其转换为 DCP 检查点。 Args: torch_save_path: Torch 保存文件的文件名。 dcp_checkpoint_dir: 存储 DCP 检查点的目录。 ..警告:: 避免内存溢出,建议仅在单个 rank 上运行此函数。 """ state_dict = torch.load(torch_save_path, weights_only=False) 由于预期任何通过此方式加载的内容都不需要状态行为,因此我们这里不需要状态行为。 torch.load 不会包含状态对象。 _save_state_dict( state_dict, storage_writer=FileSystemWriter(dcp_checkpoint_dir), no_dist=True )
如果 __name__ == "__main__":
格式模式(枚举): 火炬转 DCP = torch_to_dcp DCP 转火炬 = dcp_to_torch 解析命令行参数 解析器 = argparse.ArgumentParser() 解析器.添加参数( 模式, 类型=字符串, 帮助="转换模式", 选择=[m. for m 格式模式], 默认=格式模式.火炬转 DCP, ) 解析器.添加参数("源", 类型=字符串, 帮助="源模型路径") 解析器.添加参数("目标", 类型=字符串, 帮助="目标模型路径") args = 解析器.解析参数() 打印( f"将检查点从{参数.}{参数.目标}使用方法:'{参数.模式} ) 检查点缺失警告 = ( f在此处未找到检查点{参数.}. 跳过转换。 ) if 参数.模式 == 格式模式.火炬转 DCP.: if 操作系统.路径.判断是否为文件(参数.): 火炬保存为 DCP(参数., 参数.目标) 否则: 打印(检查点缺失警告) elif 参数.模式 == 格式模式.DCP_TO_TORCH.: 如果 os.路径.isdir(参数.): dcp_to_torch_save(参数., 参数.目标) 否则: 打印(检查点缺失警告) 否则: raise ValueError(f"未知转换模式:"{参数.模式}")

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源