# mypy: 允许未类型化装饰器
# mypy: 允许未类型化定义
导入
检查
导入
操作系统
导入
警告
来自 concurrent.futures
导入
未来
来自
枚举
导入
枚举
来自
打字
导入
角色,
可选,
联合
来自 typing_extensions
导入
已弃用
导入
火炬
导入 torch.distributed
是 dist
来自 torch.distributed._state_dict_utils
导入 _copy_state_dict, _create_cpu_state_dict
来自 torch.distributed.checkpoint._async_executor
导入 ( # noqa: TC001
_AsyncCheckpointExecutor,
)
来自 torch.distributed.checkpoint._async_process_executor
导入 (
_ProcessBasedAsyncCheckpointExecutor,
)
来自 torch.distributed.checkpoint._async_thread_executor
导入 (
_ThreadBasedAsyncCheckpointExecutor,
)
来自 torch.distributed.checkpoint._storage_utils
导入
_存储设置
来自 torch.distributed.checkpoint.default_planner
导入
默认保存计划器
来自
torch.distributed.checkpoint 的记录器
导入
_dcp_method 记录器
来自 torch.distributed.checkpoint.metadata
导入
元数据, STATE_DICT_TYPE
来自 torch.distributed.checkpoint.planner
导入
保存计划,
保存计划器
来自 torch.distributed.checkpoint.staging
导入
异步阶段器
来自 torch.distributed.checkpoint.stateful
导入
状态化的
来自 torch.distributed.checkpoint.storage
导入
存储写入器
来自 torch.distributed.distributed_c10d
导入
_获取默认组
来自
.工具
导入 _api_bc_check,
_分发包装器,
_配置文件
全部 = [
保存状态字典,
保存,
异步保存,
异步检查点器类型]
[文档]类 AsyncCheckpointerType(Enum):
异步检查点类型枚举
THREAD = "线程"
PROCESS = "进程"
[文档]@已弃用(
"``save_state_dict`` 已弃用,将在未来版本中删除。"
"请使用 `save` 代替。"
category=FutureWarning
)
def 保存状态字典(
state_dict: 状态字典类型,
storage_writer: 存储写入器,
process_group: Optional[dist.ProcessGroup] = None,
coordinator_rank: 整数 = 0,
no_dist: 布尔 = False,
planner: Optional[SavePlanner] = None,
) -> Metadata:
"此方法已过时。请切换到 'save'。”"
storage_writer.reset()
# TODO: 在此处测试返回 'save'。”
with _profile():
return _save_state_dict(
state_dict,
storage_writer,
process_group,
协调器等级,
无分布,
规划器,
)
[文档]@_dcp_method_logger(
记录异常=True) # type: ignore[arg-type]
@_api_bc_check
定义
保存(
state_dict: 状态字典类型,
*,
检查点 ID:
联盟[
字符串,
操作系统.PathLike,
无] =
无,
存储写入器:
可选[
存储写入器] =
无,
规划器:
可选[
保存规划器] =
无,
进程组:
可选[
距离.
流程组] =
无,
无分布式:
布尔类型 =
错误,
) -> 元数据:
""
以 SPMD 风格保存分布式模型。
这个函数与 `torch.save()` 不同,因为它只处理每个秩的本地碎片。
`ShardedTensor` 和 `DTensor` 通过每个秩只保存其本地碎片来处理。
对于每个具有 `state_dict` 和 `load_state_dict` 的 `Stateful` 对象,
保存将调用 `state_dict` 然后进行序列化。
.. 警告::
无法保证 PyTorch 版本之间的向后兼容性
为保存状态字典。
.. 警告::
如果使用 `process_group` 参数,请确保只有其 ranks
调用 `save_state_dict` 并确保状态字典中的所有数据都属于它。
.. 注意::
当保存 FSDP 的`ShardingStrategy.HYBRID_SHARD`的检查点时,应该只有一个分片组调用`save_state_dict`,并且相应的进程组需要被传入。
如果没有进程组可用,此函数假定意图是保存。
如果没有进程组可用,此函数假定意图是保存。
.. 注意::
如果没有进程组可用,此函数假定意图是保存。
本地进程中的 state_dict。
.. 注意:
假设 Rank 0 是协调器 rank。
参数:
state_dict (Dict[str, Any]): 要保存的状态字典。
checkpoint_id (Union[str, os.PathLike, None]):
此检查点实例的 ID。checkpoint_id 的含义取决于存储方式。
它可以是文件夹或文件的路径。
如果存储是键值存储,它也可以是一个键。
(默认:无)
storage_writer(可选[StorageWriter]):
存储写入器实例用于执行写入。如果这不是
指定的,DCP 将自动根据作者进行推断
检查点 ID。如果检查点 ID 也为 None,将抛出异常。
(默认:``None``)
规划器(Optional[SavePlanner]):
SavePlanner 的实例。如果没有指定,将使用默认值。
规划器将被使用。(默认:``None``)
process_group(可选[ProcessGroup]):
用于跨等级同步的 ProcessGroup。
(默认:``None``)
no_dist (bool):
如果为 ``True``,则此函数将假定意图是加载
一个不使用跨秩同步的检查点。
(默认:`False`)
返回:
元数据:保存的检查点的元数据对象。
示例:
>>> # xdoctest: +SKIP
>>> my_model = MyModule()
>>> state_dict = {"model": my_model}
>>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter(
... "/checkpoint/1"
... )
>>> torch.distributed.checkpoint.save(
>>> state_dict=state_dict,
>>> storage_writer=fs_storage_writer,
>>> )
.. 注意::
save_state_dict 使用集体操作来协调跨 rank 的写入。
基于 NCCL 的过程组,内部张量表示
对象必须在通信之前移动到 GPU 设备上。
在这种情况下,所使用的设备由 `torch.cuda.current_device()` 给出
用户有责任确保这一点被设置正确
每个等级都有一个独立的 GPU,通过 `torch.cuda.set_device()` 设置。
"文档"
火炬._C._log_api_usage_once("torch.distributed.checkpoint.save")
no_dist = no_dist 或者 (
非
距离.
是否可用())
或者 (
非
距离.
已初始化())
如果
无分布式:
warnings.warn(
"torch.distributed 已被禁用、不可用或未初始化,假设意图是在单个进程中保存。"
)
与
_配置文件():
存储写入器 =
角色(
存储写入器,
存储设置(
存储写入器,
检查点 ID,
读取器=
错误)
)
返回
保存状态字典(
state_dict=状态到状态字典(state_dict),
存储写入器=
存储写入器,
进程组=
进程组,
无分布式=
无错误,
规划器=
规划器,
)
[文档]@_dcp_method_logger(
记录异常=True)
定义
异步保存(
state_dict: 状态字典类型,
*,
检查点 ID:
联盟[
字符串,
操作系统.PathLike,
无] =
无,
存储写入器:
可选[
存储写入器] =
无,
规划器:
可选[
保存规划器] =
无,
进程组:
可选[
距离.
流程组] =
无,
异步检查点类型: AsyncCheckpointerType = AsyncCheckpointerType.
线程,
) -> 未来:
异步版本的 `save`。此代码首先将 `state_dict` 解耦到
暂存存储(默认为 CPU 内存),然后在单独的线程中调用`save`。
.. 警告::
此功能为实验性,可能随时更改。
参数:
状态字典(Dict[str, Any]):要保存的状态字典。
checkpoint_id (Union[str, os.PathLike, None]):
此检查点实例的 ID。checkpoint_id 的含义取决于存储方式。
它可以是文件夹或文件的路径。
如果存储是键值存储,它也可以是一个键。
(默认:``None``)
storage_writer (可选[StorageWriter]):
StorageWriter 实例,用于执行'stage'和'save'。如果
这未指定,DCP 将自动根据存储器推断写入器
检查点 ID。如果检查点 ID 也为 None,将抛出异常。
(默认:``None``)
规划器(Optional[SavePlanner]):
SavePlanner 的实例。如果没有指定,将使用默认值。
规划器将被使用。(默认:``None``)
process_group(可选[ProcessGroup]):
用于跨等级同步的 ProcessGroup。
(默认:``None``)
返回:
未来:保存操作返回的结果元数据对象。
示例:
>>> # xdoctest: +SKIP
>>> my_model = MyModule()
>>> state_dict = {"model": my_model}
>>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter(
... "/checkpoint/1"
... )
>>> checkpoint_future = torch.distributed.checkpoint.async_save(
>>> state_dict=state_dict,
>>> storage_writer=fs_storage_writer,
>>> )
...
>>> # ... 进行一些工作 ...
...
>>> checkpoint_future.result()
"文档"
火炬._C._log_api_usage_once(
torch.distributed.checkpoint.async_save)
如果
距离.
是否可用()
和
距离.
已初始化():
pg = 进程组
或者
获取默认组()
断言 (
火炬.
设备("cpu")
在 pg._device_types
# 类型: 忽略[attr-defined]
), (
"异步保存必须启用 CPU 后端;尝试使用 'cpu:gloo,cuda:nccl' 初始化进程组"
)
存储写入器 =
角色(
存储写入器,
_存储设置(
存储写入器,
检查点 ID,
读取器=
错误)
)
状态字典 =
_状态到状态字典(state_dict)
如果 isinstance(
存储写入器, AsyncStager):
阶段状态字典 =
存储写入器.
阶段(state_dict)
else: 为未实现 AsyncStager 的 storage_writers 提供 bwc 支持
阶段化状态字典 = _create_cpu_state_dict(state_dict)
_copy_state_dict(state_dict, 阶段化状态字典,
类型检查=
错误)
执行器:
异步检查点执行器 = (
基于进程的异步检查点执行器()
如果
异步检查点类型 ==
异步检查点类型枚举.
处理
否则
_基于线程的异步检查点执行器()
)
f: 未来 =
执行器.
执行保存(
阶段化状态字典,
检查点 ID=
检查点 ID,
存储编写器=
存储编写器,
规划器=
规划器,
进程组=
进程组,
)
如果 (
isinstance(存储编写器, AsyncStager)
和
存储编写器.
执行后应同步
):
存储写入器.
同步预发布()
返回 f
定义
_将状态转换为状态字典(state_dict:
状态字典类型) ->
状态字典类型:
创建`state_dict`的浅拷贝,其中`state_dict`为每个 Stateful 对象调用。
stateful_state_dict = {}
为
键,
元素
在 state_dict.
项目():
stateful_state_dict[键] = (
元素.state_dict()
如果 isinstance(
元素, Stateful)
否则
元素
)
返回
状态字典
定义
保存状态字典(
state_dict: 状态字典类型,
存储写入器:
存储写入器,
进程组:
可选[
距离.
流程组] =
无,
协调器排名:
整型 = 0,
无分布式:
布尔类型 =
错误,
规划器:
可选[
保存规划器] =
无,
) -> 元数据:
火炬._C._log_api_usage_once("torch.distributed.checkpoint.save_state_dict")
distW = _DistWrapper(进程组,
非 no_dist,
协调器排名)
如果
规划器
是
无:
规划器 =
默认保存规划器()
断言
规划器
是
非
无
全局元数据 =
无
检查点参数 = {}
如果 (
检查点 ID := getattr(
存储写入器, "checkpoint_id",
无))
是
非
无:
ckpt_kwargs["checkpoint_id"] = ckpt_id
ckpt_kwargs["进程组"] = distW.
组
@_dcp_method_logger(**ckpt_kwargs)
定义 local_step():
断言
规划器
是
非
无
存储元信息 =
存储写入器.
存储元数据()
如果
"存储元数据"
非
在
检查.
签名(
规划器.
设置计划器).
参数:
warnings.warn(
"已更新 SavePlanner.set_up_planner 的函数定义"
"以包含 storage_meta 参数。请更新您的实现"
包含此参数。
)
规划器.
设置计划器(state_dict, distW.is_coordinator)
# 类型:忽略[调用参数,参数类型]
else:
规划器.
设置计划器(
state_dict=state_dict,
存储元数据=
存储元数据,
is_coordinator=distW.is_coordinator,
)
存储写入器.
设置存储写入器(distW.is_coordinator)
本地计划 =
规划器.
创建本地计划()
本地计划 =
存储写入器.
准备本地计划(
本地计划)
返回
本地计划
@_dcp_method_logger(**ckpt_kwargs)
定义 global_step(
所有本地计划):
非局部
全局元数据
断言
规划器
是
非
无
所有本地计划,
全局元数据 =
规划器.
创建全局计划(
所有本地计划)
所有本地计划 =
存储写入器.
准备全局计划(
所有本地计划)
返回
所有本地计划
central_plan: 保存计划 = distW.
减少分散("plan", local_step, global_step)
@_dcp_method_logger(**ckpt_kwargs)
定义
写数据():
断言
规划器
是
非
无
final_local_plan = 规划器.
完成计划(central_plan)
所有写入 =
存储写入器.
写数据(
最终本地计划,
规划器)
所有写入.
等待()
返回
所有写入.
值()
@_dcp_method_logger(**ckpt_kwargs)
定义
完成检查点(
所有结果):
断言
全局元数据
是
非
无
存储编写器.
完成(
元数据=
全局元数据,
结果=
所有结果)
返回
全局元数据
返回 distW.
全量归约(
"写",
写数据,
完成检查点)