快捷键

torch.distributed.checkpoint.planner 的源代码

导入 abc
导入 输入/输出
导入 操作符
from dataclasses 导入 数据类
from 枚举 导入 自动, 枚举
from functools 导入 reduce
from 打字 导入 任何, 可选, 联合

导入 火炬
from torch.distributed.checkpoint.metadata 导入 (
    ChunkStorageMetadata,
    元数据,
    元数据索引,
    状态字典类型,
    存储元数据,
    张量属性,
)


全部 = [
    "写入项目类型",
    "加载项目类型",
    "张量写入数据",
    "写入项目",
    "读取项目",
    "保存计划",
    "加载计划",
    "保存规划器",
    负载规划器,
]


 写入项目类型(枚举):
    张量 = 自动()
    数据块 = 自动()
    字节输入输出 = 自动()


 加载项目类型(枚举):
    张量 = 自动()
    字节输入输出 = 自动()


@dataclass(冻结=True)
 张量写入数据:
    数据块: 块存储元数据
    属性: 张量属性
    尺寸: PyTorch.大小


[docs]@dataclass(frozen=True) 类 WriteItem: """数据类,用于存储需要写入存储的信息。""" index: MetadataIndex 类型:WriteItemType # 如果是张量写入,则存在值 tensor_data: Optional[TensorWriteData] = None
[文档] def tensor_storage_size(self) -> Optional[int]: "空字符串" 计算底层张量的存储大小,如果不是张量写入,则为 None。 返回: 可选[int] 存储大小,如果有底层张量,则为以字节为单位的存储大小。 """ 如果 self.tensor_data 为 None: 返回 None numels = reduce(operator.mul, self.tensor_data.size, 1) dtype_size = torch._utils._element_size(self.tensor_data.properties.dtype) return numels * dtype_size
[文档]@dataclass(frozen=True) class ReadItem: # 读取项目 # 类型:加载项目类型 # 在 state_dict 中索引 dest_index: 元数据索引 目标张量中的偏移量 dest_offsets: torch.Size # 检查点中的索引 storage_index: 元数据索引 # 偏移量到检查点数据 # 存储偏移量:torch.Size # 要复制的超立方体的大小 # 长度:torch.Size
[文档]@dataclass(frozen=True) class 保存计划: items: list[写入项] storage_data: 任意 = None planner_data: Any = None # 这用于指示应该使用缓存的计划来写入数据。 # 使用缓存的计划来写入数据。 usable: bool = True
[文档]@dataclass class LoadPlan: items: list[ReadItem] storage_data: Any = None planner_data: Any = None
[文档] 保存规划器(abc.ABC): "" 定义 save_state_dict 使用的协议的抽象类。 SavePlanners 是具有状态的对象,可以用来自定义整个保存过程。 SavePlanner 充当 state_dict 的访问代理,因此对它的任何转换都将对整个进程可见。 任何对它的转换都将对整个进程可见。 规划器子类在 save_state_dict 过程中可以预期以下调用顺序: 1) set_up_planner - 在所有 rank 上调用。 标记检查点保存的开始。 2) 在所有进程中调用 create_local_plan。 处理状态字典并生成一个将要发送进行全局规划的 `SavePlan`。 3) 在协调器进程中调用 create_global_plan。 从所有等级获取 SavePlan 并做出任何全局决策。 4) finish_plan - 在所有等级上调用。 这给每个等级一个调整全局规划决策的机会。 5) resolve_data - 在每个等级上多次调用 在`state_dict`中查找存储层的值以进行写入。 建议用户扩展 DefaultSavePlanner 而不是直接扩展此接口,因为大多数更改都可以通过单个方法的更改来表示。 常见的扩展模式有 3 种: 常见的扩展模式有 3 种: 重写 state_dict。这是扩展保存过程最简单的方法,因为它不需要理解 SavePlan 的工作原理: 不需要了解 SavePlan 的复杂性: >>> # xdoctest: +SKIP("undefined vars") >>> class RenamePlanner(DefaultSavePlanner): >>> def set_up_planner( >>> self, >>> state_dict: 状态字典类型, >>> storage_meta: 可选[存储元数据], >>> is_coordinator: 布尔值, >>> ) -> None: >>> # 将所有键名前缀设置为 `foo_` >>> super().set_up_planner({"foo_" + k: v for k, v in state_dict.items()}, storage_meta, is_coordinator) 修改本地计划和查找,同时进行。这在精细控制数据持久化方式时很有用 >>> # xdoctest: +SKIP("undefined vars") >>> class FP16Planner(DefaultSavePlanner): >>> def create_local_plan(self): >>> plan = super().create_local_plan() >>> for p in plan: >>> 如果 p.tensor_data 不为 None: >>> p.tensor_data.properties.dtype = torch.float16 >>> 返回 plan ... >>> def resolve_data(self, write_item): >>> item = super().resolve_data(write_item) >>> return item if write_item.type == WriteItemType.BYTE_IO else item.to(torch.float16) 使用全局规划步骤来做出每个 rank 无法单独做出的中心决策 >>> # xdoctest: +SKIP("undefined vars") >>> from itertools import zip_longest >>> 从 dataclasses 导入 replace >>> class DDPLoadBalancingPlanner(DefaultSavePlanner): >>> # 这使用默认的本地计划行为,所有非分片写入都在 rank 0 >>> # 此示例不处理 ShardedTensors >>> def create_global_plan(self, all_plans): >>> iters = [iter(all_plans[0].items)] * len(all_plans) >>> items_per_rank = [ >>> [item for item in items if item is not None] >>> for items in zip(*zip_longest(*iters), strict=True) >>> ] >>> all_plans = [ >>> replace(plan, items=items) >>> 对于 plan 和 items,使用 zip 函数进行组合,strict 参数设置为 True >>> ] >>> 返回 super().create_global_plan(all_plans) 最后,一些规划器需要在检查点中保存额外的元数据,这是 通过每个等级贡献其本地计划中的数据项来完成 全球规划器将它们汇总: >>> # xdoctest: +SKIP("undefined vars") >>> 类 SaveExtraDataPlanner(默认保存规划器): >>> def create_local_plan(self) -> 保存计划: >>> plan = super().create_local_plan() >>> return replace(plan, planner_data="per-rank-data") ... >>> def create_global_plan(self, all_plans: List[SavePlan]) -> Tuple[List[SavePlan], Metadata]: >>> global_plan, metadata = super().create_global_plan(all_plans) >>> merged_data = [p.planner_data for p in global_plan] >>> metadata = replace(metadata, planner_data=merged_data) >>> return global_plan, metadata "文档" 将当前排名通过 `create_local_plan` API 计算出的保存计划保存 缓存到本地排名 _缓存的保存计划: 字典[字符串, 保存计划 SavePlan] = {} 当前排名的最终保存计划 这是由 `create_local_plan` API 创建的计划合并而成的。 以及给定排名的 `create_global_plan` 的结果。 这是 `finish_plan` API 计算出的最终计划。 该计划随后被发送到 `write_data`。 本地缓存 _缓存的最终保存计划: 字典[字符串, 保存计划 SavePlan] = {} 所有排名的本地计划的集合。 这是`create_global_plan` API 的输入。 # 缓存于协调器节点。 _所有计划已缓存: 字典[字符串, 列表[保存计划 SavePlan]] = {} # 由 `create_global_plan` API 计算的全局检查点计划。 # 缓存于协调器节点。 _cached_global_plan: 字典[字符串, 列表[保存计划 SavePlan]] = {}
[文档] @abc.abstractmethod def set_up_planner( self, state_dict: 状态字典类型, storage_meta: Optional[存储元数据] = None, is_coordinator: 布尔 = False, ) -> None: “” 初始化此规划器以保存 `state_dict`。 实现应将这些值保存下来,因为它们在保存过程中不会提供。 这将在所有进程中调用。 """
[文档] @abc.abstractmethod def 创建本地计划(self) -> 保存计划: """ 计算当前排名的保存计划。 这将被汇总并传递给 create_global_plan。 计划器特定的数据可以通过 SavePlan::planner_data 传递。 这将在所有排名上被调用。 """
[文档] @abc.abstractmethod def 创建全局计划( self, 所有计划: list[SavePlan] () -> tuple[list[SavePlan], Metadata] """ 计算全局检查点计划并返回每个 rank 的本地计划。 这只在协调器 rank 上调用。 """
[文档] @abc.abstractmethod def finish_plan(self, new_plan: SavePlan) -> SavePlan: """ 合并由 `create_local_plan` 创建的计划和 `create_global_plan` 的结果。 这将在所有进程中调用。 """
[文档] @abc.abstractmethod def resolve_data(self, write_item: WriteItem) -> Union[torch.Tensor, io.BytesIO]: """ 将 ``write_item`` 从 ``state_dict`` 转换并准备存储,确保幂等性和线程安全。 在 ``state_dict`` 中查找与 ``write_item`` 关联的对象,并应用任何 在存储层消费之前进行转换(如序列化)。 在每个 rank 上多次调用,至少在最终的 SavePlan 中的每个 WriteItem 上调用一次。 此方法应该是幂等的且线程安全的。StorageWriter 实现可以自由地按需调用它。 它们可以随时调用它,次数不限。 任何分配内存的转换都应该在调用该方法时懒加载,以减少检查点所需的峰值内存。 当返回张量时,它们可以在任何设备或格式上,它们也可以是视图。 这是存储层的责任,找出如何保存它们。 它是存储层的责任,找出如何保存它们。 ```python # 输入文本 input_text = '"""' # 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API) def translate_to_simplified_chinese(text): # 假设的翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ```
[文档] 加载规划器: "" 定义了用于 load_state_dict 的协议,用于规划加载过程的抽象类。 LoadPlanner 是具有状态的对象,可以用来自定义整个加载过程。 LoadPlanner 作为 state_dict 的访问代理,因此对它的任何转换都会生效。 将对整个流程可见。 在加载状态字典期间,规划器子类可以预期以下调用顺序: 1) set_up_planner - 在所有进程中调用。 信号表示开始加载检查点。 2) 在所有进程中调用 create_local_plan。 处理状态字典并生成一个 `LoadPlan`,该计划将被发送进行全局规划。 3) 在协调器进程中调用 create_global_plan。 从所有进程接收 LoadPlan 并做出任何全局决策。 4) load_bytes - 在每个 rank 上被多次调用 这只会在 state_dict 中的每个非张量值上调用一次。 5) resolve_tensor 和 commit_tensor - 在每个 rank 上被多次调用 它们会在 state_dict 中的每个 Tensor 值上成对调用。 用户建议扩展 DefaultLoadPlanner 而不是直接扩展此接口 大多数更改都可以通过单个方法的更改来表示。 扩展通常有两种模式: 重写 state_dict。这是扩展加载过程最简单的方法,因为它 不需要理解 LoadPlan 的工作细节。我们需要 保持对原始 state_dict 的引用,以便在原地加载时进行操作 我们需要能够原地执行它 >>> # xdoctest: +SKIP("undefined vars") >>> 类 RenamePlanner(DefaultLoadPlanner): >>> def set_up_planner( >>> self, >>> state_dict: 状态字典类型, >>> metadata: 元数据, >>> is_coordinator: 布尔类型, >>> ) -> None: >>> self.original_state_dict = state_dict >>> state_dict = {"foo_" + k: v for k, v in state_dict.items()} ... >>> if self.flatten_sharded_tensors: >>> state_dict = _flatten_sharded_tensors(state_dict) ... >>> if self.flatten_state_dict: >>> state_dict, self.mappings = flatten_state_dict(state_dict) ... >>> self.state_dict = state_dict >>> self.metadata = metadata >>> self.is_coordinator = is_coordinator ... >>> def load_bytes(self, read_item, value): >>> # 删除 "foo_" 前缀 >>> self.original_state_dict[read_item.dest_index.fqn[4:]] = torch.load(value, weights_only=False) 修改 resolve_tensor 和 commit_tensor 以处理加载时转换。 >>> # xdoctest: +SKIP("undefined vars") >>> class MetaModelMaterialize(DefaultSavePlanner): >>> def resolve_tensor(self, read_item): >>> tensor = super().resolve_tensor(read_item) >>> return torch.empty_like(tensor, device="cpu") ... >>> def commit_tensor(self, read_item, tensor): >>> self.state_dict[read_item.dest_index.fqn] = tensor "文档"
[文档] @abc.abstractmethod def set_up_planner( self, state_dict: 状态字典类型, metadata: 可选[元数据] = None, is_coordinator: 布尔 = False, ) -> None: "``" 初始化此实例以将数据加载到 `state_dict` 中。 . 注意:这将在每个 rank 上调用。 "``"
[文档] @abc.abstractmethod def create_local_plan(self) -> LoadPlan: """ 根据由 set_up_planner 提供的 state_dict 和元数据创建一个 LoadPlan。 注意。这将在每个等级上被调用。 “”
[文档] @abc.abstractmethod def create_global_plan(self, global_plan: list[LoadPlan]) -> list[LoadPlan]: "" 计算全局负载计划并返回每个进程的负载计划。 . 注意:仅在协调进程上调用此操作 ""
[文档] @abc.abstractmethod def finish_plan(self, central_plan: LoadPlan) -> LoadPlan: “接受协调器的计划并返回最终的 LoadPlan。”
[文档] @abc.abstractmethod def load_bytes(self, read_item: ReadItem, value: io.BytesIO) -> None: """ 加载由 ``read_item`` 和 ``value`` 描述的项目。 此方法预计将就地修改底层 state_dict 状态字典。 ``value``的内容由用于生成正在加载的检查点的 SavePlanner 定义 正在加载的检查点的检查点内容 """
[文档] def resolve_bytes(self, read_item: ReadItem) -> io.BytesIO: """ 返回用于由 StorageReader 加载 `read_item` 的 BytesIO。 BytesIO 应与底层 state_dict 中的一个别名,因为 StorageReader 将替换其内容。 """ 抛出未实现异常("LoadPlanner.resolve_bytes 尚未实现")
[文档] @abc.abstractmethod def resolve_tensor(self, read_item: ReadItem) -> torch.Tensor: "" 返回由 `read_item` 描述的张量,供 StorageReader 使用以加载 `read_item`。 该张量应与底层 state_dict 中的一个别名,因为 StorageReader 将替换其内容。 如果由于任何原因无法实现这一点,规划器可以使用 `commit_tensor` 方法来复制数据。 回到 state_dict 中的那个。 """
[文档] @abc.abstractmethod def commit_tensor(self, read_item: ReadItem, tensor: torch.Tensor) -> None: “” 一旦 StorageReader 将数据加载到`tensor`中,就调用一次。 提供的 tensor 与调用`resolve_tensor`返回的 tensor 相同。 只有当 LoadPlanner 需要在处理`tensor`之前进行后处理时,此方法才是必需的。 将其复制回 state_dict 中的那个。 张量的内容将遵循其设备同步模型。 """

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源