# mypy: 允许未类型化装饰器
# mypy: 允许未类型化定义
导入
操作系统
导入
警告
来自
打字
导入
任何,
角色,
可选,
联合
来自 typing_extensions
导入
已弃用
导入
火炬
导入 torch.distributed
是 dist
来自 torch.distributed.checkpoint.default_planner
导入
空状态字典加载规划器
来自
torch.distributed.checkpoint 的记录器
导入
_dcp_method 记录器
来自 torch.distributed.checkpoint.stateful
导入
状态化的
来自
._存储工具
导入
_存储设置
来自
.默认规划器
导入
默认加载规划器
来自
.规划器
导入
装载计划,
载入规划器
来自
.存储
导入
存储读取器
来自
.工具
导入 _api_bc_check, _DistWrapper, _profile
全部 = ["load_state_dict",
加载]
[文档]@已弃用(
"``load_state_dict`` 已弃用,将在未来的版本中删除。"
"请使用 `load`。"
category=FutureWarning
)
def 加载状态字典(
state_dict: 字典[str, 任意类型],
storage_reader: 存储读取器,
process_group: Optional[dist.ProcessGroup] = None,
coordinator_rank: int = 0,
no_dist: 布尔 = False,
planner: Optional[LoadPlanner] = None,
) -> None:
"""此方法已弃用。请切换到 'load'。"""
storage_reader.reset()
with _profile():
# TODO: 在这里测试返回 `load` 是否可行。
return _load_state_dict(
state_dict,
storage_reader,
process_group,
coordinator_rank,
no_dist,
规划器,
)
[文档]@_dcp_method_logger(
记录异常=True)
@_api_bc_check
定义
加载(
state_dict: 字典[
字符串,
任何
],
*,
检查点 ID:
联盟[
字符串,
操作系统.PathLike,
无] =
无,
存储读取器:
可选[StorageReader] =
无,
规划器:
可选[LoadPlanner] =
无,
进程组:
可选[
距离.
流程组] =
无,
无区分:
布尔类型 =
错误,
) -> 无:
""
以 SPMD 风格将检查点加载到分布式状态字典中。
每个 rank 必须提供给此的`state_dict`具有相同的键。
API. 键值不匹配可能导致挂起或错误。如果不确定,可以使用
`utils._assert_same_keys` API 进行校验(但可能会产生通信
成本)。
每个等级将尝试读取最少的必要数据
满足请求的 `state_dict`。当加载 :class:`ShardedTensor`
或 :class:`DTensor` 实例时,每个秩只读取其本地分片的数据。
对于每个具有 `state_dict` 和 `load_state_dict` 方法的 ``Stateful`` 对象,
加载将首先调用 ``state_dict``,然后尝试反序列化。
在反序列化完成后,一次性加载 `load_state_dict`。
对于每个非“Stateful”对象,加载将反序列化对象,然后替换
将其存储在`state_dict`中,与反序列化对象一起。
.. 警告::
所有在 `state_dict` 中的张量都必须分配在其
在调用此函数之前,请指定目标设备。
所有非张量数据均使用 `torch.load()` 加载并在原地修改。
在 state_dict 上。
.. 警告::
用户必须在根模块上调用 `load_state_dict` 以确保加载。
后处理和非张量数据正确传播。
.. 注意:
如果没有初始化进程组,此函数将假定意图
将检查点加载到本地进程。这可以很有用,因为
本地推理的情况,以及使用常规张量(与分片张量不同)
或分片张量)
.. 注意:
假设秩 0 是协调器秩。
参数:
state_dict (Dict[str, Any]): 要加载检查点到的状态字典。
checkpoint_id (Union[str, os.PathLike, None]):
检查点实例的 ID。checkpoint_id 的含义取决于存储方式。它可以是一个文件夹或文件的路径。
它可以是文件夹或文件的路径。
它也可以是键,如果存储是键值存储的话。
(默认:``None``)
storage_reader(可选[StorageReader]):
用于执行读取的 StorageWriter 实例。如果这不是
指定,DCP 将自动根据读取器推断
检查点 ID。如果检查点 ID 也为 None,将引发异常
(默认:``None``)
规划器(可选[LoadPlanner]):
LoadPlanner 的实例。如果没有指定,将使用默认
规划器。(默认:``None``)
process_group (Optional[ProcessGroup]):
用于跨等级同步的 ProcessGroup。
(默认:``None``)
no_dist (布尔值):如果 ``True``,此函数将假定意图是加载
一个不使用跨秩同步的检查点。(默认:``False``)
返回:
无。
示例
>>> # xdoctest: +SKIP
>>> my_model = MyModule()
>>> 优化器 = Adagrad(my_model.parameters())
>>> 模型状态字典 = my_model.state_dict()
>>> fs_storage_reader = torch.distributed.checkpoint.FileSystemReader(
... "/checkpoint/1"
... )
>>> torch.distributed.checkpoint.load_state_dict(
>>> state_dict=model_state_dict,
>>> storage_reader=fs_storage_reader,
>>> )
>>> # module.load_state_dict() 函数可能包含自定义步骤
>>> # 需要刷新状态字典,必须调用它
>>> # 确保正确行为。
>>> my_model.load_state_dict(model_state_dict)
.. 注意::
load_state_dict 使用集体操作来协调跨进程的读取。
对于基于 NCCL 的进程组,对象的内部张量表示必须在通信之前移动到 GPU 设备上。
在这种情况下,使用的设备由 `torch.cuda.current_device()` 提供,
并且用户有责任确保这一点,以便每个
对象的内部张量表示必须在通信之前移动到 GPU 设备上。
该排名有一个独立的 GPU,通过`torch.cuda.set_device()`设置。
"文档"
no_dist = no_dist 或者 (
非
距离.
是否可用())
或者 (
非
距离.
已初始化())
如果 no_dist:
warnings.warn(
"torch.distributed 被禁用、不可用或未初始化,假设意图是在单个进程中加载。"
)
与 _profile():
storage_reader = 角色(
StorageReader, _storage_setup(存储读取器,
检查点 ID,
读取器=True)
)
所有排名必须在其提供的 `state_dict` 中具有相同的键。
有关详细信息,请参阅文档。
确保所有排名加载值的顺序相同
在同一顺序中
键 =
排序(state_dict.
键())
状态 ful_sd = {}
为 key
在
键:
如果 key
非
在 state_dict:
continue
元素 = state_dict[
键]
状态 ful_sd[
键] = (
元素.state_dict()
如果 isinstance(
元素,
状态化)
否则
元素
)
_加载状态字典(
state_dict=状态化_sd,
存储读取器=
存储读取器,
进程组=
进程组,
无区分=
无区分,
规划器=
规划器,
)
为 key
在
键:
如果 key
非
在 state_dict:
continue
元素 = state_dict[
键]
如果 isinstance(
元素,
有状态的):
如果状态字典是一个有状态的对象,
DCP 在原始状态字典中进行原地加载。
元素.
加载状态字典(statetful_sd[
键
]\)
else:
否则,用加载的状态字典替换状态字典。
state_dict[键] =
状态 ful_sd[
键]
定义
_加载状态字典(
state_dict: 字典[
字符串,
任何
],
存储读取器: StorageReader,
进程组:
可选[
距离.
流程组] =
无,
协调器等级:
整型 = 0,
无分布式:
布尔类型 =
错误,
规划器:
可选[LoadPlanner] =
无,
) -> 无:
火炬._C._log_api_usage_once("torch.distributed.checkpoint.load_state_dict")
distW = _DistWrapper(进程组,
非 no_dist,
协调器等级)
如果
规划器
是
无:
规划器 =
默认加载规划器()
模型参数 = {}
如果 (
模型 ID := getattr(
存储读取器,
检查点 ID,
无))
是
非
无:
模型参数[
"检查点 ID"] =
检查点标识符
模型参数[
"进程组"] = distW.
组
@_dcp_method_logger(**ckpt_kwargs)
定义 local_step():
断言
规划器
是
非
无
元数据 =
存储读取器.
读取元数据()
规划器.
设置计划器(state_dict,
元数据, distW.
是否为协调器)
存储读取器.
设置存储读取器(
元数据, distW.
是否为协调器)
本地计划 =
规划器.
创建本地计划()
本地计划 =
存储读取器.
准备本地计划(
本地计划)
返回
本地计划
@_dcp_method_logger(**检查点参数)
定义
全局步数(
所有本地计划):
断言
规划器
是
非
无
所有本地计划 =
规划器.
创建全局计划(
所有本地计划)
所有本地计划 =
存储读取器.
准备全局计划(
所有本地计划)
返回
所有本地计划
中央计划:
装载计划 = distW.
减少分散("plan", local_step, global_step)
@_dcp_method_logger(**ckpt_kwargs)
定义
读取数据():
断言
规划器
是
非
无
final_local_plan = 规划器.
完成计划(central_plan)
all_reads = 存储读取器.
读取数据(
最终本地计划,
规划器)
所有读取.
等待()
返回
无
_ = distW.全局聚合(
阅读,
读取数据)
定义
从键中加载状态字典(
键:
可选[
联盟[
集合[
字符串
],
字符串]] =
无,
*,
检查点 ID:
联盟[
字符串,
操作系统.PathLike,
无] =
无,
存储读取器:
可选[StorageReader] =
无,
进程组:
可选[
距离.
流程组] =
无,
) -> 字典[
字符串,
任何
]
""
仅从检查点加载指定的键,如果没有指定键,则加载整个
检查点将被加载。注意,此方法将检查点完全加载到
当前进程且未分发。
.. 警告::
.. 警告::
所有非张量数据均使用 `torch.load()` 加载
.. 注意:
与常规模式相反,此函数不接收状态字典作为输入
并且不就地加载。相反,直接初始化并从文件中读取新的状态字典
。
.. 注意:
如果没有初始化进程组,此函数将假定意图
将检查点加载到本地进程。这可以很有用,因为
本地推理的情况,以及使用常规张量(与 DTensor 相对)
或 ShardedTensor)
.. 注意:
假设 Rank 0 是协调器进程。
参数:
keys (Optional[Union[set[str], str]]):
加载在此集合中指定的任何密钥。如果没有指定密钥,则加载整个检查点
已加载。
checkpoint_id (Union[str, os.PathLike, None]):
此检查点实例的 ID。checkpoint_id 的含义取决于存储。它可以是一个文件夹或文件的路径。
它可以是文件夹或文件的路径。
它也可以是键,如果存储是键值存储的话。
(默认:``None``)
storage_reader(可选[StorageReader]):
用于执行读取的 StorageWriter 实例。如果这不是
指定,DCP 将自动根据读取器推断
检查点 ID。如果检查点 ID 也为 None,将引发异常
(默认:``None``)
进程组(Optional[ProcessGroup]):
用于跨等级同步的进程组。
(默认:``None``)
返回:
从指定键获取的状态字典
"文档"
火炬._C._log_api_usage_once(
"torch.distributed.checkpoint._load_state_dict_from_keys"
)
无_dist =
非 (
距离.
是否可用()
和
距离.
已初始化())
如果
无_dist:
warnings.warn(
"torch.distributed 不可用或未初始化,假设意图是在单个进程中加载。"
)
存储读取器 =
角色(
StorageReader, 存储设置(
存储读取器,
检查点 ID,
读取器=True)
)
如果 isinstance(
键,
字符串):
键 = {
键}
SD 卡:
字典[
字符串,
任何] = {}
加载状态字典(
state_dict=sd,
存储读取器=
存储读取器,
进程组=
进程组,
无障碍=
无障碍,
规划器=
空状态字典加载规划器(
键=
键
或者
集合()),
)
返回 sd