torch.distributed.checkpoint.planner 的源代码
导入 abc
导入
输入/输出
导入
操作符
from dataclasses 导入
数据类
from 枚举
导入
自动,
枚举
from functools 导入 reduce
from 打字
导入
任何,
可选,
联合
导入
火炬
from torch.distributed.checkpoint.metadata 导入 (
ChunkStorageMetadata,
元数据,
元数据索引,
状态字典类型,
存储元数据,
张量属性,
)
全部 = [
"写入项目类型",
"加载项目类型",
"张量写入数据",
"写入项目",
"读取项目",
"保存计划",
"加载计划",
"保存规划器",
负载规划器,
]
类
写入项目类型(
枚举):
张量 =
自动()
数据块 =
自动()
字节输入输出 =
自动()
类
加载项目类型(
枚举):
张量 =
自动()
字节输入输出 =
自动()
@dataclass(冻结=True)
类
张量写入数据:
数据块:
块存储元数据
属性:
张量属性
尺寸:
PyTorch.
大小
[docs]@dataclass(frozen=True)
类 WriteItem:
"""数据类,用于存储需要写入存储的信息。"""
index: MetadataIndex
类型:WriteItemType
# 如果是张量写入,则存在值
tensor_data: Optional[TensorWriteData] = None
[文档] def tensor_storage_size(self) -> Optional[int]:
"空字符串"
计算底层张量的存储大小,如果不是张量写入,则为 None。
返回:
可选[int] 存储大小,如果有底层张量,则为以字节为单位的存储大小。
"""
如果 self.tensor_data 为 None:
返回 None
numels = reduce(operator.mul, self.tensor_data.size, 1)
dtype_size = torch._utils._element_size(self.tensor_data.properties.dtype)
return numels * dtype_size
[文档]@dataclass(frozen=True)
class ReadItem:
# 读取项目
# 类型:加载项目类型
# 在 state_dict 中索引
dest_index: 元数据索引
目标张量中的偏移量
dest_offsets: torch.Size
# 检查点中的索引
storage_index: 元数据索引
# 偏移量到检查点数据
# 存储偏移量:torch.Size
# 要复制的超立方体的大小
# 长度:torch.Size
[文档]@dataclass(frozen=True)
class 保存计划:
items: list[写入项]
storage_data: 任意 = None
planner_data: Any = None
# 这用于指示应该使用缓存的计划来写入数据。
# 使用缓存的计划来写入数据。
usable: bool = True
[文档]@dataclass
class LoadPlan:
items: list[ReadItem]
storage_data: Any = None
planner_data: Any = None
[文档]
类
保存规划器(abc.ABC):
""
定义 save_state_dict 使用的协议的抽象类。
SavePlanners 是具有状态的对象,可以用来自定义整个保存过程。
SavePlanner 充当 state_dict 的访问代理,因此对它的任何转换都将对整个进程可见。
任何对它的转换都将对整个进程可见。
规划器子类在 save_state_dict 过程中可以预期以下调用顺序:
1) set_up_planner - 在所有 rank 上调用。
标记检查点保存的开始。
2) 在所有进程中调用 create_local_plan。
处理状态字典并生成一个将要发送进行全局规划的 `SavePlan`。
3) 在协调器进程中调用 create_global_plan。
从所有等级获取 SavePlan 并做出任何全局决策。
4) finish_plan - 在所有等级上调用。
这给每个等级一个调整全局规划决策的机会。
5) resolve_data - 在每个等级上多次调用
在`state_dict`中查找存储层的值以进行写入。
建议用户扩展 DefaultSavePlanner 而不是直接扩展此接口,因为大多数更改都可以通过单个方法的更改来表示。
常见的扩展模式有 3 种:
常见的扩展模式有 3 种:
重写 state_dict。这是扩展保存过程最简单的方法,因为它不需要理解 SavePlan 的工作原理:
不需要了解 SavePlan 的复杂性:
>>> # xdoctest: +SKIP("undefined vars")
>>> class RenamePlanner(DefaultSavePlanner):
>>> def set_up_planner(
>>> self,
>>> state_dict: 状态字典类型,
>>> storage_meta: 可选[存储元数据],
>>> is_coordinator: 布尔值,
>>> ) -> None:
>>> # 将所有键名前缀设置为 `foo_`
>>> super().set_up_planner({"foo_" + k: v for k, v in state_dict.items()}, storage_meta, is_coordinator)
修改本地计划和查找,同时进行。这在精细控制数据持久化方式时很有用
>>> # xdoctest: +SKIP("undefined vars")
>>> class FP16Planner(DefaultSavePlanner):
>>> def create_local_plan(self):
>>> plan = super().create_local_plan()
>>> for p in plan:
>>> 如果 p.tensor_data 不为 None:
>>> p.tensor_data.properties.dtype = torch.float16
>>> 返回 plan
...
>>> def resolve_data(self, write_item):
>>> item = super().resolve_data(write_item)
>>> return item if write_item.type == WriteItemType.BYTE_IO else item.to(torch.float16)
使用全局规划步骤来做出每个 rank 无法单独做出的中心决策
>>> # xdoctest: +SKIP("undefined vars")
>>> from itertools import zip_longest
>>> 从 dataclasses 导入 replace
>>> class DDPLoadBalancingPlanner(DefaultSavePlanner):
>>> # 这使用默认的本地计划行为,所有非分片写入都在 rank 0
>>> # 此示例不处理 ShardedTensors
>>> def create_global_plan(self, all_plans):
>>> iters = [iter(all_plans[0].items)] * len(all_plans)
>>> items_per_rank = [
>>> [item for item in items if item is not None]
>>> for items in zip(*zip_longest(*iters), strict=True)
>>> ]
>>> all_plans = [
>>> replace(plan, items=items)
>>> 对于 plan 和 items,使用 zip 函数进行组合,strict 参数设置为 True
>>> ]
>>> 返回 super().create_global_plan(all_plans)
最后,一些规划器需要在检查点中保存额外的元数据,这是
通过每个等级贡献其本地计划中的数据项来完成
全球规划器将它们汇总:
>>> # xdoctest: +SKIP("undefined vars")
>>> 类 SaveExtraDataPlanner(默认保存规划器):
>>> def create_local_plan(self) -> 保存计划:
>>> plan = super().create_local_plan()
>>> return replace(plan, planner_data="per-rank-data")
...
>>> def create_global_plan(self, all_plans: List[SavePlan]) -> Tuple[List[SavePlan], Metadata]:
>>> global_plan, metadata = super().create_global_plan(all_plans)
>>> merged_data = [p.planner_data for p in global_plan]
>>> metadata = replace(metadata, planner_data=merged_data)
>>> return global_plan, metadata
"文档"
将当前排名通过 `create_local_plan` API 计算出的保存计划保存
缓存到本地排名
_缓存的保存计划:
字典[
字符串,
保存计划 SavePlan] = {}
当前排名的最终保存计划
这是由 `create_local_plan` API 创建的计划合并而成的。
以及给定排名的 `create_global_plan` 的结果。
这是 `finish_plan` API 计算出的最终计划。
该计划随后被发送到 `write_data`。
本地缓存
_缓存的最终保存计划:
字典[
字符串,
保存计划 SavePlan] = {}
所有排名的本地计划的集合。
这是`create_global_plan` API 的输入。
# 缓存于协调器节点。
_所有计划已缓存:
字典[
字符串,
列表[
保存计划 SavePlan]] = {}
# 由 `create_global_plan` API 计算的全局检查点计划。
# 缓存于协调器节点。
_cached_global_plan: 字典[
字符串,
列表[
保存计划 SavePlan]] = {}
[文档] @abc.abstractmethod
def set_up_planner(
self,
state_dict: 状态字典类型,
storage_meta: Optional[存储元数据] = None,
is_coordinator: 布尔 = False,
) -> None:
“”
初始化此规划器以保存 `state_dict`。
实现应将这些值保存下来,因为它们在保存过程中不会提供。
这将在所有进程中调用。
"""
[文档] @abc.abstractmethod
def 创建本地计划(self) -> 保存计划:
"""
计算当前排名的保存计划。
这将被汇总并传递给 create_global_plan。
计划器特定的数据可以通过 SavePlan::planner_data 传递。
这将在所有排名上被调用。
"""
[文档] @abc.abstractmethod
def 创建全局计划(
self, 所有计划: list[SavePlan]
() -> tuple[list[SavePlan], Metadata]
"""
计算全局检查点计划并返回每个 rank 的本地计划。
这只在协调器 rank 上调用。
"""
[文档] @abc.abstractmethod
def finish_plan(self, new_plan: SavePlan) -> SavePlan:
"""
合并由 `create_local_plan` 创建的计划和 `create_global_plan` 的结果。
这将在所有进程中调用。
"""
[文档] @abc.abstractmethod
def resolve_data(self, write_item: WriteItem) -> Union[torch.Tensor, io.BytesIO]:
"""
将 ``write_item`` 从 ``state_dict`` 转换并准备存储,确保幂等性和线程安全。
在 ``state_dict`` 中查找与 ``write_item`` 关联的对象,并应用任何
在存储层消费之前进行转换(如序列化)。
在每个 rank 上多次调用,至少在最终的 SavePlan 中的每个 WriteItem 上调用一次。
此方法应该是幂等的且线程安全的。StorageWriter 实现可以自由地按需调用它。
它们可以随时调用它,次数不限。
任何分配内存的转换都应该在调用该方法时懒加载,以减少检查点所需的峰值内存。
当返回张量时,它们可以在任何设备或格式上,它们也可以是视图。
这是存储层的责任,找出如何保存它们。
它是存储层的责任,找出如何保存它们。
```python
# 输入文本
input_text = '"""'
# 翻译函数(此处为示例,实际翻译功能需调用真实的翻译 API)
def translate_to_simplified_chinese(text):
# 假设的翻译结果
return text
# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
[文档]
类
加载规划器:
""
定义了用于 load_state_dict 的协议,用于规划加载过程的抽象类。
LoadPlanner 是具有状态的对象,可以用来自定义整个加载过程。
LoadPlanner 作为 state_dict 的访问代理,因此对它的任何转换都会生效。
将对整个流程可见。
在加载状态字典期间,规划器子类可以预期以下调用顺序:
1) set_up_planner - 在所有进程中调用。
信号表示开始加载检查点。
2) 在所有进程中调用 create_local_plan。
处理状态字典并生成一个 `LoadPlan`,该计划将被发送进行全局规划。
3) 在协调器进程中调用 create_global_plan。
从所有进程接收 LoadPlan 并做出任何全局决策。
4) load_bytes - 在每个 rank 上被多次调用
这只会在 state_dict 中的每个非张量值上调用一次。
5) resolve_tensor 和 commit_tensor - 在每个 rank 上被多次调用
它们会在 state_dict 中的每个 Tensor 值上成对调用。
用户建议扩展 DefaultLoadPlanner 而不是直接扩展此接口
大多数更改都可以通过单个方法的更改来表示。
扩展通常有两种模式:
重写 state_dict。这是扩展加载过程最简单的方法,因为它
不需要理解 LoadPlan 的工作细节。我们需要
保持对原始 state_dict 的引用,以便在原地加载时进行操作
我们需要能够原地执行它
>>> # xdoctest: +SKIP("undefined vars")
>>> 类 RenamePlanner(DefaultLoadPlanner):
>>> def set_up_planner(
>>> self,
>>> state_dict: 状态字典类型,
>>> metadata: 元数据,
>>> is_coordinator: 布尔类型,
>>> ) -> None:
>>> self.original_state_dict = state_dict
>>> state_dict = {"foo_" + k: v for k, v in state_dict.items()}
...
>>> if self.flatten_sharded_tensors:
>>> state_dict = _flatten_sharded_tensors(state_dict)
...
>>> if self.flatten_state_dict:
>>> state_dict, self.mappings = flatten_state_dict(state_dict)
...
>>> self.state_dict = state_dict
>>> self.metadata = metadata
>>> self.is_coordinator = is_coordinator
...
>>> def load_bytes(self, read_item, value):
>>> # 删除 "foo_" 前缀
>>> self.original_state_dict[read_item.dest_index.fqn[4:]] = torch.load(value, weights_only=False)
修改 resolve_tensor 和 commit_tensor 以处理加载时转换。
>>> # xdoctest: +SKIP("undefined vars")
>>> class MetaModelMaterialize(DefaultSavePlanner):
>>> def resolve_tensor(self, read_item):
>>> tensor = super().resolve_tensor(read_item)
>>> return torch.empty_like(tensor, device="cpu")
...
>>> def commit_tensor(self, read_item, tensor):
>>> self.state_dict[read_item.dest_index.fqn] = tensor
"文档"
[文档] @abc.abstractmethod
def set_up_planner(
self,
state_dict: 状态字典类型,
metadata: 可选[元数据] = None,
is_coordinator: 布尔 = False,
) -> None:
"``"
初始化此实例以将数据加载到 `state_dict` 中。
. 注意:这将在每个 rank 上调用。
"``"
[文档] @abc.abstractmethod
def create_local_plan(self) -> LoadPlan:
"""
根据由 set_up_planner 提供的 state_dict 和元数据创建一个 LoadPlan。
注意。这将在每个等级上被调用。
“”
[文档] @abc.abstractmethod
def create_global_plan(self, global_plan: list[LoadPlan]) -> list[LoadPlan]:
""
计算全局负载计划并返回每个进程的负载计划。
. 注意:仅在协调进程上调用此操作
""
[文档] @abc.abstractmethod
def finish_plan(self, central_plan: LoadPlan) -> LoadPlan:
“接受协调器的计划并返回最终的 LoadPlan。”
[文档] @abc.abstractmethod
def load_bytes(self, read_item: ReadItem, value: io.BytesIO) -> None:
"""
加载由 ``read_item`` 和 ``value`` 描述的项目。
此方法预计将就地修改底层 state_dict 状态字典。
``value``的内容由用于生成正在加载的检查点的 SavePlanner 定义
正在加载的检查点的检查点内容
"""
[文档] def resolve_bytes(self, read_item: ReadItem) -> io.BytesIO:
"""
返回用于由 StorageReader 加载 `read_item` 的 BytesIO。
BytesIO 应与底层 state_dict 中的一个别名,因为 StorageReader 将替换其内容。
"""
抛出未实现异常("LoadPlanner.resolve_bytes 尚未实现")
[文档] @abc.abstractmethod
def resolve_tensor(self, read_item: ReadItem) -> torch.Tensor:
""
返回由 `read_item` 描述的张量,供 StorageReader 使用以加载 `read_item`。
该张量应与底层 state_dict 中的一个别名,因为 StorageReader 将替换其内容。
如果由于任何原因无法实现这一点,规划器可以使用 `commit_tensor` 方法来复制数据。
回到 state_dict 中的那个。
"""
[文档] @abc.abstractmethod
def commit_tensor(self, read_item: ReadItem, tensor: torch.Tensor) -> None:
“”
一旦 StorageReader 将数据加载到`tensor`中,就调用一次。
提供的 tensor 与调用`resolve_tensor`返回的 tensor 相同。
只有当 LoadPlanner 需要在处理`tensor`之前进行后处理时,此方法才是必需的。
将其复制回 state_dict 中的那个。
张量的内容将遵循其设备同步模型。
"""