torch.distributed.checkpoint.staging 的源代码
来自
打字
导入
可选,
可运行检查
来自 typing_extensions
导入
协议
来自 torch.distributed._state_dict_utils
导入 _copy_state_dict, _create_cpu_state_dict
来自 torch.distributed.checkpoint.metadata
导入 STATE_DICT_TYPE
全部 = ["AsyncStager", "BlockingAsyncStager"]
[文档]@runtime_checkable
类 AsyncStager(
协议):
""
此协议旨在为 dcp.async_save 提供定制和可扩展性,使用户
在执行常规 dcp.save 路径之前并行化如何定制数据准备过程。
预期的操作顺序(具体定义在`torch.distributed.state_dict_saver.async_save`中)
如下:
1. AsyncStager.stage_data(state_dict):
这次调用给 AsyncStager 提供了“阶段”的机会
状态字典。在此背景下,分阶段的期望和目的是创建一个“训练安全”
状态字典的表示,意味着在分阶段完成后对模块数据的任何更新
不应反映在此方法返回的状态字典中。例如,在默认情况下
如果整个状态字典在 CPU RAM 上创建了一个副本并返回到这里,允许用户
继续训练而不会改变正在序列化的数据。
2. 在阶段返回的状态_dict 上并行调用 dcp.save。此调用负责
用于序列化 state_dict 并将其写入存储。
3. 如果 AsyncStager.should_synchronize_after_execute 为 True,则此方法将在执行后立即被调用
序列化线程启动并在从 dcp.async_save 返回之前。如果此设置为 False,
该假设是用户已为同步目的定义了一个自定义同步点
优化训练循环中的保存延迟(例如,通过重叠预存操作)
前向/反向传递),用户负责在适当的时候调用 `AsyncStager.synchronize_staging`
。
"文档"
默认为 True,因为通常情况下是同步进行阶段
_synchronize_after_execute: 布尔类型 =
真实
@property
定义
执行后同步(
我) ->
布尔:
""
是否在执行阶段后同步。
"文档"
返回
我.
_执行后同步
[文档] def stage(self, state_dict: STATE_DICT_TYPE) -> STATE_DICT_TYPE:
```python
# 输入文本
input_text = '"""'
# 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 假设的翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
返回`state_dict`的“分阶段”副本。对分阶段副本的期望是它
从阶段调用完成后发生的任何更新中免疫。
```python
# 输入文本
input_text = '"""'
# 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 假设的翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
raise NotImplementedError(
f"{self.__class__.__name__}必须实现 stage 方法"
)
[文档] def synchronize_staging(self) -> None:
""
如果`stage`以某种方式是异步的,则应调用此方法以确保分阶段完成,并且可以安全地开始修改原始的`state_dict`
是的,在修改原始`state_dict`之前,请确保分阶段操作已经完成
""
[文档]
类 BlockingAsyncStager(AsyncStager):
""
AsyncStager 的一个实现,该实现将 state_dict 状态字典放置在 CPU RAM 中,并在复制完成前进行阻塞。
此实现还提供了一种使用固定内存优化阶段延迟的选项。
注意:在这种情况下,synchronize_staging 是一个空操作。
"文档"
默认为 True,因为通常情况下是同步部署。
_synchronize_after_execute: 布尔类型 =
假
定义
初始化(
我,
缓存已部署状态字典:
布尔类型 =
错误,
类型检查:
布尔类型 =
错误,
):
""
初始化 BlockingAsyncStager。
参数:
cache_staged_state_dict: 是否缓存已分阶段的状态字典。此选项会降低分阶段延迟
,但会增加内存使用。此外,如果此参数设置为 True,则预期
阶段器被维护并重复用于多个 dcp.async_save 调用。默认为 False。
type_check:在 cpu_offload 期间是否执行类型检查。默认为 False。
"文档"
我.
缓存阶段状态字典 =
缓存阶段状态字典
我.
类型检查 =
类型检查
我.
状态字典缓存:
可选[
状态字典类型] =
无
[文档] def stage(self, 状态字典: STATE_DICT_TYPE) -> 状态字典: TYPE:
""
返回`state_dict`在 CPU 上的副本。
""
如果没有 self.cache_staged_state_dict:
staged_state_dict = _create_cpu_state_dict(state_dict)
_copy_state_dict(state_dict, staged_state_dict, type_check=self.type_check)
return staged_state_dict
if self.state_dict_cache is None:
self.state_dict_cache = _create_cpu_state_dict(state_dict, pin_memory=True)
return _copy_state_dict(state_dict, self.state_dict_cache)
[文档] def synchronize_staging(self) -> None:
"""
无操作函数,因为预发布阶段是阻塞的。
"""