快捷键

torch.distributed.checkpoint.state_dict_saver 的源代码

# mypy: 允许未类型化装饰器
# mypy: 允许未类型化定义
导入 检查
导入 操作系统
导入 警告
来自 concurrent.futures 导入 未来
来自 枚举 导入 枚举
来自 打字 导入 角色, 可选, 联合
来自 typing_extensions 导入 已弃用

导入 火炬
导入 torch.distributed  dist
来自 torch.distributed._state_dict_utils 导入 _copy_state_dict, _create_cpu_state_dict
来自 torch.distributed.checkpoint._async_executor 导入 (  # noqa: TC001
    _AsyncCheckpointExecutor,
)
来自 torch.distributed.checkpoint._async_process_executor 导入 (
    _ProcessBasedAsyncCheckpointExecutor,
)
来自 torch.distributed.checkpoint._async_thread_executor 导入 (
    _ThreadBasedAsyncCheckpointExecutor,
)
来自 torch.distributed.checkpoint._storage_utils 导入 _存储设置
来自 torch.distributed.checkpoint.default_planner 导入 默认保存计划器
来自 torch.distributed.checkpoint 的记录器 导入 _dcp_method 记录器
来自 torch.distributed.checkpoint.metadata 导入 元数据, STATE_DICT_TYPE
来自 torch.distributed.checkpoint.planner 导入 保存计划, 保存计划器
来自 torch.distributed.checkpoint.staging 导入 异步阶段器
来自 torch.distributed.checkpoint.stateful 导入 状态化的
来自 torch.distributed.checkpoint.storage 导入 存储写入器
来自 torch.distributed.distributed_c10d 导入 _获取默认组

来自 .工具 导入 _api_bc_check, _分发包装器, _配置文件


全部 = [保存状态字典, 保存, 异步保存, 异步检查点器类型]


[文档]类 AsyncCheckpointerType(Enum): 异步检查点类型枚举 THREAD = "线程" PROCESS = "进程"
[文档]@已弃用( "``save_state_dict`` 已弃用,将在未来版本中删除。" "请使用 `save` 代替。" category=FutureWarning ) def 保存状态字典( state_dict: 状态字典类型, storage_writer: 存储写入器, process_group: Optional[dist.ProcessGroup] = None, coordinator_rank: 整数 = 0, no_dist: 布尔 = False, planner: Optional[SavePlanner] = None, ) -> Metadata: "此方法已过时。请切换到 'save'。”" storage_writer.reset() # TODO: 在此处测试返回 'save'。” with _profile(): return _save_state_dict( state_dict, storage_writer, process_group, 协调器等级, 无分布, 规划器, )
[文档]@_dcp_method_logger(记录异常=True) # type: ignore[arg-type] @_api_bc_check 定义 保存( state_dict: 状态字典类型, *, 检查点 ID: 联盟[字符串, 操作系统.PathLike, ] = , 存储写入器: 可选[存储写入器] = , 规划器: 可选[保存规划器] = , 进程组: 可选[距离.流程组] = , 无分布式: 布尔类型 = 错误, ) -> 元数据: "" 以 SPMD 风格保存分布式模型。 这个函数与 `torch.save()` 不同,因为它只处理每个秩的本地碎片。 `ShardedTensor` 和 `DTensor` 通过每个秩只保存其本地碎片来处理。 对于每个具有 `state_dict` 和 `load_state_dict` 的 `Stateful` 对象, 保存将调用 `state_dict` 然后进行序列化。 .. 警告:: 无法保证 PyTorch 版本之间的向后兼容性 为保存状态字典。 .. 警告:: 如果使用 `process_group` 参数,请确保只有其 ranks 调用 `save_state_dict` 并确保状态字典中的所有数据都属于它。 .. 注意:: 当保存 FSDP 的`ShardingStrategy.HYBRID_SHARD`的检查点时,应该只有一个分片组调用`save_state_dict`,并且相应的进程组需要被传入。 如果没有进程组可用,此函数假定意图是保存。 如果没有进程组可用,此函数假定意图是保存。 .. 注意:: 如果没有进程组可用,此函数假定意图是保存。 本地进程中的 state_dict。 .. 注意: 假设 Rank 0 是协调器 rank。 参数: state_dict (Dict[str, Any]): 要保存的状态字典。 checkpoint_id (Union[str, os.PathLike, None]): 此检查点实例的 ID。checkpoint_id 的含义取决于存储方式。 它可以是文件夹或文件的路径。 如果存储是键值存储,它也可以是一个键。 (默认:无) storage_writer(可选[StorageWriter]): 存储写入器实例用于执行写入。如果这不是 指定的,DCP 将自动根据作者进行推断 检查点 ID。如果检查点 ID 也为 None,将抛出异常。 (默认:``None``) 规划器(Optional[SavePlanner]): SavePlanner 的实例。如果没有指定,将使用默认值。 规划器将被使用。(默认:``None``) process_group(可选[ProcessGroup]): 用于跨等级同步的 ProcessGroup。 (默认:``None``) no_dist (bool): 如果为 ``True``,则此函数将假定意图是加载 一个不使用跨秩同步的检查点。 (默认:`False`) 返回: 元数据:保存的检查点的元数据对象。 示例: >>> # xdoctest: +SKIP >>> my_model = MyModule() >>> state_dict = {"model": my_model} >>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter( ... "/checkpoint/1" ... ) >>> torch.distributed.checkpoint.save( >>> state_dict=state_dict, >>> storage_writer=fs_storage_writer, >>> ) .. 注意:: save_state_dict 使用集体操作来协调跨 rank 的写入。 基于 NCCL 的过程组,内部张量表示 对象必须在通信之前移动到 GPU 设备上。 在这种情况下,所使用的设备由 `torch.cuda.current_device()` 给出 用户有责任确保这一点被设置正确 每个等级都有一个独立的 GPU,通过 `torch.cuda.set_device()` 设置。 "文档" 火炬._C._log_api_usage_once("torch.distributed.checkpoint.save") no_dist = no_dist 或者 ( 距离.是否可用()) 或者 ( 距离.已初始化()) 如果 无分布式: warnings.warn( "torch.distributed 已被禁用、不可用或未初始化,假设意图是在单个进程中保存。" ) _配置文件(): 存储写入器 = 角色( 存储写入器, 存储设置(存储写入器, 检查点 ID, 读取器=错误) ) 返回 保存状态字典( state_dict=状态到状态字典(state_dict), 存储写入器=存储写入器, 进程组=进程组, 无分布式=无错误, 规划器=规划器, )
[文档]@_dcp_method_logger(记录异常=True) 定义 异步保存( state_dict: 状态字典类型, *, 检查点 ID: 联盟[字符串, 操作系统.PathLike, ] = , 存储写入器: 可选[存储写入器] = , 规划器: 可选[保存规划器] = , 进程组: 可选[距离.流程组] = , 异步检查点类型: AsyncCheckpointerType = AsyncCheckpointerType.线程, ) -> 未来: 异步版本的 `save`。此代码首先将 `state_dict` 解耦到 暂存存储(默认为 CPU 内存),然后在单独的线程中调用`save`。 .. 警告:: 此功能为实验性,可能随时更改。 参数: 状态字典(Dict[str, Any]):要保存的状态字典。 checkpoint_id (Union[str, os.PathLike, None]): 此检查点实例的 ID。checkpoint_id 的含义取决于存储方式。 它可以是文件夹或文件的路径。 如果存储是键值存储,它也可以是一个键。 (默认:``None``) storage_writer (可选[StorageWriter]): StorageWriter 实例,用于执行'stage'和'save'。如果 这未指定,DCP 将自动根据存储器推断写入器 检查点 ID。如果检查点 ID 也为 None,将抛出异常。 (默认:``None``) 规划器(Optional[SavePlanner]): SavePlanner 的实例。如果没有指定,将使用默认值。 规划器将被使用。(默认:``None``) process_group(可选[ProcessGroup]): 用于跨等级同步的 ProcessGroup。 (默认:``None``) 返回: 未来:保存操作返回的结果元数据对象。 示例: >>> # xdoctest: +SKIP >>> my_model = MyModule() >>> state_dict = {"model": my_model} >>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter( ... "/checkpoint/1" ... ) >>> checkpoint_future = torch.distributed.checkpoint.async_save( >>> state_dict=state_dict, >>> storage_writer=fs_storage_writer, >>> ) ... >>> # ... 进行一些工作 ... ... >>> checkpoint_future.result() "文档" 火炬._C._log_api_usage_once(torch.distributed.checkpoint.async_save) 如果 距离.是否可用() 距离.已初始化(): pg = 进程组 或者 获取默认组() 断言 ( 火炬.设备("cpu") pg._device_types # 类型: 忽略[attr-defined] ), ( "异步保存必须启用 CPU 后端;尝试使用 'cpu:gloo,cuda:nccl' 初始化进程组" ) 存储写入器 = 角色( 存储写入器, _存储设置(存储写入器, 检查点 ID, 读取器=错误) ) 状态字典 = _状态到状态字典(state_dict) 如果 isinstance(存储写入器, AsyncStager): 阶段状态字典 = 存储写入器.阶段(state_dict) else: 为未实现 AsyncStager 的 storage_writers 提供 bwc 支持 阶段化状态字典 = _create_cpu_state_dict(state_dict) _copy_state_dict(state_dict, 阶段化状态字典, 类型检查=错误) 执行器: 异步检查点执行器 = ( 基于进程的异步检查点执行器() 如果 异步检查点类型 == 异步检查点类型枚举.处理 否则 _基于线程的异步检查点执行器() ) f: 未来 = 执行器.执行保存( 阶段化状态字典, 检查点 ID=检查点 ID, 存储编写器=存储编写器, 规划器=规划器, 进程组=进程组, ) 如果 ( isinstance(存储编写器, AsyncStager) 存储编写器.执行后应同步 ): 存储写入器.同步预发布() 返回 f
定义 _将状态转换为状态字典(state_dict: 状态字典类型) -> 状态字典类型: 创建`state_dict`的浅拷贝,其中`state_dict`为每个 Stateful 对象调用。 stateful_state_dict = {} , 元素 state_dict.项目(): stateful_state_dict[] = ( 元素.state_dict() 如果 isinstance(元素, Stateful) 否则 元素 ) 返回 状态字典 定义 保存状态字典( state_dict: 状态字典类型, 存储写入器: 存储写入器, 进程组: 可选[距离.流程组] = , 协调器排名: 整型 = 0, 无分布式: 布尔类型 = 错误, 规划器: 可选[保存规划器] = , ) -> 元数据: 火炬._C._log_api_usage_once("torch.distributed.checkpoint.save_state_dict") distW = _DistWrapper(进程组, no_dist, 协调器排名) 如果 规划器 : 规划器 = 默认保存规划器() 断言 规划器 全局元数据 = 检查点参数 = {} 如果 (检查点 ID := getattr(存储写入器, "checkpoint_id", )) : ckpt_kwargs["checkpoint_id"] = ckpt_id ckpt_kwargs["进程组"] = distW. @_dcp_method_logger(**ckpt_kwargs) 定义 local_step(): 断言 规划器 存储元信息 = 存储写入器.存储元数据() 如果 "存储元数据" 检查.签名(规划器.设置计划器).参数: warnings.warn( "已更新 SavePlanner.set_up_planner 的函数定义" "以包含 storage_meta 参数。请更新您的实现" 包含此参数。 ) 规划器.设置计划器(state_dict, distW.is_coordinator) # 类型:忽略[调用参数,参数类型] else: 规划器.设置计划器( state_dict=state_dict, 存储元数据=存储元数据, is_coordinator=distW.is_coordinator, ) 存储写入器.设置存储写入器(distW.is_coordinator) 本地计划 = 规划器.创建本地计划() 本地计划 = 存储写入器.准备本地计划(本地计划) 返回 本地计划 @_dcp_method_logger(**ckpt_kwargs) 定义 global_step(所有本地计划): 非局部 全局元数据 断言 规划器 所有本地计划, 全局元数据 = 规划器.创建全局计划(所有本地计划) 所有本地计划 = 存储写入器.准备全局计划(所有本地计划) 返回 所有本地计划 central_plan: 保存计划 = distW.减少分散("plan", local_step, global_step) @_dcp_method_logger(**ckpt_kwargs) 定义 写数据(): 断言 规划器 final_local_plan = 规划器.完成计划(central_plan) 所有写入 = 存储写入器.写数据(最终本地计划, 规划器) 所有写入.等待() 返回 所有写入.() @_dcp_method_logger(**ckpt_kwargs) 定义 完成检查点(所有结果): 断言 全局元数据 存储编写器.完成(元数据=全局元数据, 结果=所有结果) 返回 全局元数据 返回 distW.全量归约("写", 写数据, 完成检查点)

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源