快捷键

torch.distributed.checkpoint.staging 的源代码

来自 打字 导入 可选, 可运行检查
来自 typing_extensions 导入 协议

来自 torch.distributed._state_dict_utils 导入 _copy_state_dict, _create_cpu_state_dict
来自 torch.distributed.checkpoint.metadata 导入 STATE_DICT_TYPE


全部 = ["AsyncStager", "BlockingAsyncStager"]


[文档]@runtime_checkable AsyncStager(协议): "" 此协议旨在为 dcp.async_save 提供定制和可扩展性,使用户 在执行常规 dcp.save 路径之前并行化如何定制数据准备过程。 预期的操作顺序(具体定义在`torch.distributed.state_dict_saver.async_save`中) 如下: 1. AsyncStager.stage_data(state_dict): 这次调用给 AsyncStager 提供了“阶段”的机会 状态字典。在此背景下,分阶段的期望和目的是创建一个“训练安全” 状态字典的表示,意味着在分阶段完成后对模块数据的任何更新 不应反映在此方法返回的状态字典中。例如,在默认情况下 如果整个状态字典在 CPU RAM 上创建了一个副本并返回到这里,允许用户 继续训练而不会改变正在序列化的数据。 2. 在阶段返回的状态_dict 上并行调用 dcp.save。此调用负责 用于序列化 state_dict 并将其写入存储。 3. 如果 AsyncStager.should_synchronize_after_execute 为 True,则此方法将在执行后立即被调用 序列化线程启动并在从 dcp.async_save 返回之前。如果此设置为 False, 该假设是用户已为同步目的定义了一个自定义同步点 优化训练循环中的保存延迟(例如,通过重叠预存操作) 前向/反向传递),用户负责在适当的时候调用 `AsyncStager.synchronize_staging` "文档" 默认为 True,因为通常情况下是同步进行阶段 _synchronize_after_execute: 布尔类型 = 真实 @property 定义 执行后同步() -> 布尔: "" 是否在执行阶段后同步。 "文档" 返回 ._执行后同步
[文档] def stage(self, state_dict: STATE_DICT_TYPE) -> STATE_DICT_TYPE: ```python # 输入文本 input_text = '"""' # 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API) def translate_to_simplified_chinese(text): # 假设的翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` 返回`state_dict`的“分阶段”副本。对分阶段副本的期望是它 从阶段调用完成后发生的任何更新中免疫。 ```python # 输入文本 input_text = '"""' # 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API) def translate_to_simplified_chinese(text): # 假设的翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` raise NotImplementedError( f"{self.__class__.__name__}必须实现 stage 方法" )
[文档] def synchronize_staging(self) -> None: "" 如果`stage`以某种方式是异步的,则应调用此方法以确保分阶段完成,并且可以安全地开始修改原始的`state_dict` 是的,在修改原始`state_dict`之前,请确保分阶段操作已经完成 ""
[文档] BlockingAsyncStager(AsyncStager): "" AsyncStager 的一个实现,该实现将 state_dict 状态字典放置在 CPU RAM 中,并在复制完成前进行阻塞。 此实现还提供了一种使用固定内存优化阶段延迟的选项。 注意:在这种情况下,synchronize_staging 是一个空操作。 "文档" 默认为 True,因为通常情况下是同步部署。 _synchronize_after_execute: 布尔类型 = 定义 初始化( , 缓存已部署状态字典: 布尔类型 = 错误, 类型检查: 布尔类型 = 错误, ): "" 初始化 BlockingAsyncStager。 参数: cache_staged_state_dict: 是否缓存已分阶段的状态字典。此选项会降低分阶段延迟 ,但会增加内存使用。此外,如果此参数设置为 True,则预期 阶段器被维护并重复用于多个 dcp.async_save 调用。默认为 False。 type_check:在 cpu_offload 期间是否执行类型检查。默认为 False。 "文档" .缓存阶段状态字典 = 缓存阶段状态字典 .类型检查 = 类型检查 .状态字典缓存: 可选[状态字典类型] =
[文档] def stage(self, 状态字典: STATE_DICT_TYPE) -> 状态字典: TYPE: "" 返回`state_dict`在 CPU 上的副本。 "" 如果没有 self.cache_staged_state_dict: staged_state_dict = _create_cpu_state_dict(state_dict) _copy_state_dict(state_dict, staged_state_dict, type_check=self.type_check) return staged_state_dict if self.state_dict_cache is None: self.state_dict_cache = _create_cpu_state_dict(state_dict, pin_memory=True) return _copy_state_dict(state_dict, self.state_dict_cache)
[文档] def synchronize_staging(self) -> None: """ 无操作函数,因为预发布阶段是阻塞的。 """

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源