快捷键

torch.distributed.checkpoint.state_dict_loader 的源代码

# mypy: 允许未类型化装饰器
# mypy: 允许未类型化定义
导入 操作系统
导入 警告
来自 打字 导入 任何, 角色, 可选, 联合
来自 typing_extensions 导入 已弃用

导入 火炬
导入 torch.distributed  dist
来自 torch.distributed.checkpoint.default_planner 导入 空状态字典加载规划器
来自 torch.distributed.checkpoint 的记录器 导入 _dcp_method 记录器
来自 torch.distributed.checkpoint.stateful 导入 状态化的

来自 ._存储工具 导入 _存储设置
来自 .默认规划器 导入 默认加载规划器
来自 .规划器 导入 装载计划, 载入规划器
来自 .存储 导入 存储读取器
来自 .工具 导入 _api_bc_check, _DistWrapper, _profile


全部 = ["load_state_dict", 加载]


[文档]@已弃用( "``load_state_dict`` 已弃用,将在未来的版本中删除。" "请使用 `load`。" category=FutureWarning ) def 加载状态字典( state_dict: 字典[str, 任意类型], storage_reader: 存储读取器, process_group: Optional[dist.ProcessGroup] = None, coordinator_rank: int = 0, no_dist: 布尔 = False, planner: Optional[LoadPlanner] = None, ) -> None: """此方法已弃用。请切换到 'load'。""" storage_reader.reset() with _profile(): # TODO: 在这里测试返回 `load` 是否可行。 return _load_state_dict( state_dict, storage_reader, process_group, coordinator_rank, no_dist, 规划器, )
[文档]@_dcp_method_logger(记录异常=True) @_api_bc_check 定义 加载( state_dict: 字典[字符串, 任何], *, 检查点 ID: 联盟[字符串, 操作系统.PathLike, ] = , 存储读取器: 可选[StorageReader] = , 规划器: 可选[LoadPlanner] = , 进程组: 可选[距离.流程组] = , 无区分: 布尔类型 = 错误, ) -> : "" 以 SPMD 风格将检查点加载到分布式状态字典中。 每个 rank 必须提供给此的`state_dict`具有相同的键。 API. 键值不匹配可能导致挂起或错误。如果不确定,可以使用 `utils._assert_same_keys` API 进行校验(但可能会产生通信 成本)。 每个等级将尝试读取最少的必要数据 满足请求的 `state_dict`。当加载 :class:`ShardedTensor` 或 :class:`DTensor` 实例时,每个秩只读取其本地分片的数据。 对于每个具有 `state_dict` 和 `load_state_dict` 方法的 ``Stateful`` 对象, 加载将首先调用 ``state_dict``,然后尝试反序列化。 在反序列化完成后,一次性加载 `load_state_dict`。 对于每个非“Stateful”对象,加载将反序列化对象,然后替换 将其存储在`state_dict`中,与反序列化对象一起。 .. 警告:: 所有在 `state_dict` 中的张量都必须分配在其 在调用此函数之前,请指定目标设备。 所有非张量数据均使用 `torch.load()` 加载并在原地修改。 在 state_dict 上。 .. 警告:: 用户必须在根模块上调用 `load_state_dict` 以确保加载。 后处理和非张量数据正确传播。 .. 注意: 如果没有初始化进程组,此函数将假定意图 将检查点加载到本地进程。这可以很有用,因为 本地推理的情况,以及使用常规张量(与分片张量不同) 或分片张量) .. 注意: 假设秩 0 是协调器秩。 参数: state_dict (Dict[str, Any]): 要加载检查点到的状态字典。 checkpoint_id (Union[str, os.PathLike, None]): 检查点实例的 ID。checkpoint_id 的含义取决于存储方式。它可以是一个文件夹或文件的路径。 它可以是文件夹或文件的路径。 它也可以是键,如果存储是键值存储的话。 (默认:``None``) storage_reader(可选[StorageReader]): 用于执行读取的 StorageWriter 实例。如果这不是 指定,DCP 将自动根据读取器推断 检查点 ID。如果检查点 ID 也为 None,将引发异常 (默认:``None``) 规划器(可选[LoadPlanner]): LoadPlanner 的实例。如果没有指定,将使用默认 规划器。(默认:``None``) process_group (Optional[ProcessGroup]): 用于跨等级同步的 ProcessGroup。 (默认:``None``) no_dist (布尔值):如果 ``True``,此函数将假定意图是加载 一个不使用跨秩同步的检查点。(默认:``False``) 返回: 无。 示例 >>> # xdoctest: +SKIP >>> my_model = MyModule() >>> 优化器 = Adagrad(my_model.parameters()) >>> 模型状态字典 = my_model.state_dict() >>> fs_storage_reader = torch.distributed.checkpoint.FileSystemReader( ... "/checkpoint/1" ... ) >>> torch.distributed.checkpoint.load_state_dict( >>> state_dict=model_state_dict, >>> storage_reader=fs_storage_reader, >>> ) >>> # module.load_state_dict() 函数可能包含自定义步骤 >>> # 需要刷新状态字典,必须调用它 >>> # 确保正确行为。 >>> my_model.load_state_dict(model_state_dict) .. 注意:: load_state_dict 使用集体操作来协调跨进程的读取。 对于基于 NCCL 的进程组,对象的内部张量表示必须在通信之前移动到 GPU 设备上。 在这种情况下,使用的设备由 `torch.cuda.current_device()` 提供, 并且用户有责任确保这一点,以便每个 对象的内部张量表示必须在通信之前移动到 GPU 设备上。 该排名有一个独立的 GPU,通过`torch.cuda.set_device()`设置。 "文档" no_dist = no_dist 或者 ( 距离.是否可用()) 或者 ( 距离.已初始化()) 如果 no_dist: warnings.warn( "torch.distributed 被禁用、不可用或未初始化,假设意图是在单个进程中加载。" ) _profile(): storage_reader = 角色( StorageReader, _storage_setup(存储读取器, 检查点 ID, 读取器=True) ) 所有排名必须在其提供的 `state_dict` 中具有相同的键。 有关详细信息,请参阅文档。 确保所有排名加载值的顺序相同 在同一顺序中 = 排序(state_dict.()) 状态 ful_sd = {} key : 如果 key state_dict: continue 元素 = state_dict[] 状态 ful_sd[] = ( 元素.state_dict() 如果 isinstance(元素, 状态化) 否则 元素 ) _加载状态字典( state_dict=状态化_sd, 存储读取器=存储读取器, 进程组=进程组, 无区分=无区分, 规划器=规划器, ) key : 如果 key state_dict: continue 元素 = state_dict[] 如果 isinstance(元素, 有状态的): 如果状态字典是一个有状态的对象, DCP 在原始状态字典中进行原地加载。 元素.加载状态字典(statetful_sd[]\) else: 否则,用加载的状态字典替换状态字典。 state_dict[] = 状态 ful_sd[]
定义 _加载状态字典( state_dict: 字典[字符串, 任何], 存储读取器: StorageReader, 进程组: 可选[距离.流程组] = , 协调器等级: 整型 = 0, 无分布式: 布尔类型 = 错误, 规划器: 可选[LoadPlanner] = , ) -> : 火炬._C._log_api_usage_once("torch.distributed.checkpoint.load_state_dict") distW = _DistWrapper(进程组, no_dist, 协调器等级) 如果 规划器 : 规划器 = 默认加载规划器() 模型参数 = {} 如果 (模型 ID := getattr(存储读取器, 检查点 ID, )) : 模型参数["检查点 ID"] = 检查点标识符 模型参数["进程组"] = distW. @_dcp_method_logger(**ckpt_kwargs) 定义 local_step(): 断言 规划器 元数据 = 存储读取器.读取元数据() 规划器.设置计划器(state_dict, 元数据, distW.是否为协调器) 存储读取器.设置存储读取器(元数据, distW.是否为协调器) 本地计划 = 规划器.创建本地计划() 本地计划 = 存储读取器.准备本地计划(本地计划) 返回 本地计划 @_dcp_method_logger(**检查点参数) 定义 全局步数(所有本地计划): 断言 规划器 所有本地计划 = 规划器.创建全局计划(所有本地计划) 所有本地计划 = 存储读取器.准备全局计划(所有本地计划) 返回 所有本地计划 中央计划: 装载计划 = distW.减少分散("plan", local_step, global_step) @_dcp_method_logger(**ckpt_kwargs) 定义 读取数据(): 断言 规划器 final_local_plan = 规划器.完成计划(central_plan) all_reads = 存储读取器.读取数据(最终本地计划, 规划器) 所有读取.等待() 返回 _ = distW.全局聚合(阅读, 读取数据) 定义 从键中加载状态字典( : 可选[联盟[集合[字符串], 字符串]] = , *, 检查点 ID: 联盟[字符串, 操作系统.PathLike, ] = , 存储读取器: 可选[StorageReader] = , 进程组: 可选[距离.流程组] = , ) -> 字典[字符串, 任何] "" 仅从检查点加载指定的键,如果没有指定键,则加载整个 检查点将被加载。注意,此方法将检查点完全加载到 当前进程且未分发。 .. 警告:: .. 警告:: 所有非张量数据均使用 `torch.load()` 加载 .. 注意: 与常规模式相反,此函数不接收状态字典作为输入 并且不就地加载。相反,直接初始化并从文件中读取新的状态字典 .. 注意: 如果没有初始化进程组,此函数将假定意图 将检查点加载到本地进程。这可以很有用,因为 本地推理的情况,以及使用常规张量(与 DTensor 相对) 或 ShardedTensor) .. 注意: 假设 Rank 0 是协调器进程。 参数: keys (Optional[Union[set[str], str]]): 加载在此集合中指定的任何密钥。如果没有指定密钥,则加载整个检查点 已加载。 checkpoint_id (Union[str, os.PathLike, None]): 此检查点实例的 ID。checkpoint_id 的含义取决于存储。它可以是一个文件夹或文件的路径。 它可以是文件夹或文件的路径。 它也可以是键,如果存储是键值存储的话。 (默认:``None``) storage_reader(可选[StorageReader]): 用于执行读取的 StorageWriter 实例。如果这不是 指定,DCP 将自动根据读取器推断 检查点 ID。如果检查点 ID 也为 None,将引发异常 (默认:``None``) 进程组(Optional[ProcessGroup]): 用于跨等级同步的进程组。 (默认:``None``) 返回: 从指定键获取的状态字典 "文档" 火炬._C._log_api_usage_once( "torch.distributed.checkpoint._load_state_dict_from_keys" ) 无_dist = (距离.是否可用() 距离.已初始化()) 如果 无_dist: warnings.warn( "torch.distributed 不可用或未初始化,假设意图是在单个进程中加载。" ) 存储读取器 = 角色( StorageReader, 存储设置(存储读取器, 检查点 ID, 读取器=True) ) 如果 isinstance(, 字符串): = {} SD 卡: 字典[字符串, 任何] = {} 加载状态字典( state_dict=sd, 存储读取器=存储读取器, 进程组=进程组, 无障碍=无障碍, 规划器=空状态字典加载规划器(= 或者 集合()), ) 返回 sd

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源