快捷键

torch.distributed.checkpoint.filesystem 的源代码

# mypy: 允许未类型化定义
导入 集合
导入 dataclasses
导入 输入/输出
导入 操作符
导入 操作系统
导入 酸菜
导入 队列
导入 线程
导入 uuid
导入 警告
 abc 导入 ABC, 抽象方法
 collections.abc 导入 生成器, 迭代器, 迭代器, 序列
 contextlib 导入 contextmanager
 dataclasses 导入 数据类
 输入/输出 导入 不支持的操作
 pathlib 导入 Path
 打字 导入 任何, 可调用, 角色, IO, 可选, 联合

Python 3.12 中引入作为 collections.abc.Buffer
 typing_extensions 导入 缓冲区

导入 火炬
 火炬 导入 张量
 torch._utils 导入 获取可用设备类型, 获取设备模块
 torch.distributed._shard._utils 导入 窄化张量通过索引
 torch.distributed.checkpoint._extension 导入 (
    扩展注册表,
    流转换扩展,
)
 torch.distributed.checkpoint.metadata 导入 元数据, 状态字典类型, 存储元数据
 torch.distributed.checkpoint.planner 导入 (
    装载项目类型,
    装载计划,
    装载规划器,
    读取项目,
    保存计划,
    保存规划器,
    编写项目,
    编写项目类型,
)
from torch.distributed.checkpoint.staging 导入 阻塞异步分阶段器
from torch.distributed.checkpoint.storage 导入 (
    存储读取器,
    存储写入器,
    写入结果,
)
from torch.distributed.checkpoint.utils 导入 _create_file_view
from torch.futures 导入 未来


全部 = [文件系统写入器, 文件系统读取器, 文件系统, 文件系统基类]

_metadata_fn: 字符串 = ".metadata"


@dataclass
 _StorageInfo:
    "这是每条记录的存储信息。"

    相对路径: 字符串
    偏移量: 整型
    长度: 整型
    转换描述符: 可选[序列[字符串]] = 

    def __getstate__():
        返回 {k: v  k, v  .字典.项目() if v   None}


@dataclass
 _存储前缀:
    前缀: 字符串


默认后缀 = ".distcp"


def _generate_uuid() -> 字符串:
    返回 字符串(uuid.uuid4())


 _TensorLoader(ABC):
    @abstractmethod
    def 添加(, 尺寸: 整数, 对象: 对象) -> None:
        通过

    @abstractmethod
    def start_loading() -> None:
        通过

    @abstractmethod
    def () -> 迭代器[元组[PyTorch.张量, 对象]]
        通过


 _串行 CPU 加载器(_张量加载器):
    def __init__(, resolve_fun: 可调用) -> None:
        .resolve_fun = resolve_fun
        .项目: 列表[元组[整数, 对象]] = 输入文本为空,请提供需要翻译的文本

    def 添加(, 尺寸: 整数, 对象: 对象) -> None:
        .项目.追加((尺寸, 对象))

    def start_loading() -> None:
        通过

    def () -> 迭代器[元组[PyTorch.张量, 对象]]
         _, 对象  .项目:
            张量 = .resolve_fun(对象).detach()
            张量 = 张量.cpu()
            如果 张量.存储().尺寸() != 张量.元素数量():
                张量 = 张量.克隆()
            产生 (
                张量,
                对象,
            )


 _重叠 CPU 加载器(_张量加载器):
    def __init__(
        ,
        解析函数: 可调用,
        : 可选[火把.] = ,
        在飞行阈值: 整型 = 1_000_000,
    ) -> :
        .解析函数 = 解析函数
        .项目: 列表[元组[整数, 对象]] = 输入文本为空,请提供需要翻译的文本
        .飞行阈值 = 飞行阈值
        .飞行数据 = 0
        .当前项目: 集合.双端队列 = 集合.双端队列()
        .索引 = 0
        .开始 = 
        .设备类型 = (
            .设备类型 if  否则 _获取可用设备类型()
        )
        .设备模块 = _获取设备模块(.设备类型)
        . = 角色(
            PyTorch.cuda.,  或者 .设备模块.current_stream()
        )
        如果 . != .设备模块.current_stream():
            ..等待流(.设备模块.current_stream())

    @property
    def 完成() -> 布尔:
        返回 .索引 >= len(.项目)

    def 排空() -> 列表[元组[PyTorch.张量, 对象]]
        已排空 = 输入文本为空,请提供需要翻译的文本
        如果 .飞行中的数据 >= .在飞行阈值:
            ..同步()
         .在飞行数据 >= .在飞行阈值:
            val = .当前项目.popleft()
            .飞行数据 -= val[0].元素数量() * val[0].元素大小()
            疲惫.追加(val)
        返回 疲惫

    def _补水() -> :
         .设备模块.(.):
              ._完成 并且 .飞行数据 < .飞行阈值:
                _, 对象 = .项目[.索引]
                .索引 += 1
                张量 = .解析函数(对象).detach()
                如果 张量.设备.类型 == .设备类型:
                    张量 = 张量.(设备="cpu", 非阻塞=True)
                elif 张量.设备 == PyTorch.设备("cpu"):
                    如果 (
                        张量.未类型化存储().尺寸()
                        != 张量.元素数量() * 张量.元素大小
                    ):
                        # 这将强制张量既连续又具有最小存储空间
                        张量 = 张量.克隆()

                .当前项.追加(
                    (
                        张量,
                        对象,
                    )
                )
                .在飞行数据 += 张量.元素数量() * 张量.元素大小()

    def 完成() -> 迭代器[元组[PyTorch.张量, 对象]]
        断言 .完成
        如果 len(.当前项目) > 0:
            ..同步()
        返回 .当前项目

    def 添加(, 尺寸: 整数, 对象: 对象) -> None:
        如果 .开始:
            raise 运行时错误("加载开始后不能添加项目")
        .项目.追加((尺寸, 对象))

    def 开始加载() -> None:
        如果 .已开始:
            返回
        .已开始 = 真实
        .项目.排序(=操作符.itemgetter(0))
        ._加满()

    def () -> 迭代器[元组[PyTorch.张量, 对象]]
        .开始加载()
          ._完成:
            耗尽 = ._排空()
            ._refill()
            yield from drained

        yield from ._finish()


 _StorageWriterTransforms:
    ""
这是实验性的,可能会移动到其他地方
未来。它在这里是为了在我们仍在进行时最小化更改。
学习和收集反馈。
"文档"

    def __init__(
        , 扩展: 可选[序列[流转换扩展]] = 
    ) -> None:
        ""
如果 extensions 参数为 None,这意味着实现
应提供它选择的任何默认值。一个空的
序列表示不应使用任何扩展。在此
时间,默认扩展序列为空。
"文档"
        .扩展 = () 如果 扩展   否则 扩展

    def 转换保存流(
        , 写入项: 写入项, 原始流: 输入/输出.IOBase
    ) -> 元组[IO[字节], 列表[字符串]]
        为了避免文件描述符泄露,转换器的关闭必须
        # 将级联到包装流,但由于此函数可以
        # 追加到原始流,我们无法关闭实际流。
        所以,我们用这个来包裹原始流
        关闭()使其成为空操作,并且一旦所有文件关闭
        # 被附加。

         不关闭写入器(输入/输出.IO 基础):
            def __init__(, 原始: 输入/输出.IO 基础):
                .原始 = 原始

            def 可写入() -> 布尔:
                返回 真实

            def (, b: 缓冲区) -> 整数:
                返回 .原始.(b)

            def 关闭():
                .清空()
                .原始.清空()
                但不闭合。

        转换为 = 角色(IO[字节], NoCloseWriter(原始流))

         示例  .扩展:
            转换为 = ext: ex.转换为(转换为)

        返回 (转换为, [ext: ex.获取描述符()  示例  反转(.扩展)]])


def _项目大小(项目: 写入项目) -> 整数:
    大小 = 1
    断言 项目.张量数据   
    # 无法使用 math.prod,因为 PT 需要支持较旧的 Python
     s  项目.张量数据.尺寸:
        大小 *= s

    dtype = 项目.张量数据.属性.dtype
    返回 大小 * PyTorch._工具.元素大小(数据类型)


def 按大小和类型分割(箱数: 整数, 项目: 列表[写入项]\) -> 列表[列表[写入项]]
    如果 箱数 == 1:
        返回 [项目]

    bytes_w = [wi  wi  项目 如果 wi.类型 == 写入项类型.字节输入输出]
    tensor_w = [wi    项目 如果 .类型 != 写入项类型.字节输入输出]

    : 列表[列表[写入项]] = 空列表  _  范围(箱数)]
    桶大小 = [0  _  范围(箱数)]

    矩阵_w.排序(=_item_size, reverse=True)

     i, wi  列举(bytes_w):
        buckets[i % 箱数].追加(wi)

     wi  张量_w:
        # TODO 替换为头文件
        索引 = 最小(列举(桶大小), =操作符.itemgetter(1))[0]
        [索引].追加(wi)
        桶大小[索引] += _item_size(wi)

    返回 buckets


def _write_item(
    转换: 存储写入转换,
    : 输入/输出.IO 基础,
    数据: 联盟[输入/输出.BytesIO, PyTorch.张量],
    写入项: WriteItem,
    存储键: 字符串,
    安全张量: 布尔 = 假的,
) -> 写入结果:
    偏移 = .告诉()

    (转换为, 转换描述符) = 转换.转换保存流(
        写入项, 
    )

    如果 编写项.类型 == 写入类型.字节输入输出:
        断言 isinstance(数据, 输入/输出.BytesIO)
        转换为.(数据.获取缓冲区())
    else:
        断言 isinstance(数据, PyTorch.张量)
        断言 数据.设备 == PyTorch.设备("cpu")
        如果 否。 安全张量:
            PyTorch.保存(数据, 转换为)

    转换为.关闭()

    如果 否。 安全张量 或者 isinstance(数据, 输入/输出.BytesIO):
        长度 = .告诉() - 偏移
    else:
        长度 = 数据.元素数量() * 数据.元素大小()

    # 为了与早期版本保持一致,请将此字段留空。
    # 如果没有扩展,请将此字段留空以保持元数据。
    信息转换描述 = (
         如果 len(转换描述符) == 0 否则 转换描述符
    )

    返回 写入结果(
        索引=写入条目.索引,
        字节数大小=长度,
        存储数据=_存储信息(
            存储键,
            偏移量,
            长度,
            转换描述符=信息转换描述符,
        ),
    )


def 从队列中写入文件(
    创建流: 可调用,
    文件队列: 队列.队列,
    结果队列: 队列.队列,
    规划器: 保存规划器,
    转换: _存储写入转换,
    在飞行阈值: 整数,
    使用_fsync: 布尔,
    线程数: 整数,
    安全张量: 布尔,
) -> None:
    尝试:
         True:
            文件名, 存储键, 写入项 = 文件队列.非阻塞获取()
            加载器: _TensorLoader

            自定义后端名称 = PyTorch._C._get_privateuse1_backend_name()
            自定义设备模块 = getattr(PyTorch, 自定义后端名称, None)

            # TODO: 使用 OverlappingCpuLoader 在多个线程中创建会带来显著的
            # 性能下降,观察到与 CUDA 流同步有关。我们
            # 应该尝试修复这个问题,并在所有线程情况下使用_OverlappingCpuLoader
            如果 (
                thread_count == 1
                并且 (
                    PyTorch.cuda.是否可用()
                    或者 (自定义设备模块 并且 自定义设备模块.是否可用())
                )
                并且 inflight_threshhold > 0
            ):
                加载器 = _重叠 CPU 加载器(
                    规划器.解析数据,
                    在空中阈值=在空中阈值,
                )
            否则:
                加载器 = _串行 CPU 加载器(
                    规划器.解析数据,
                )

            矩阵_w = [wi  wi  写入项 如果 wi.类型 != 写入项类型.字节输入输出]
             写入项目  张量_w:
                加载器.添加(_项目大小(编写项目), 编写项目)
            加载器.开始加载()

            字节数_w = [wi  wi  写入项 如果 wi.类型 == 写入项目类型.字节输入输出]
            写入结果 = 输入文本为空,请提供需要翻译的文本

             创建流(文件名, wb)  :
                 写入项目  bytes_w:
                    数据 = 规划器.resolve_data(write_item)
                    write_results.追加(
                        写条目(
                            转换,
                            ,
                            数据,
                            写条目,
                            存储键,
                            安全张量,
                        )
                    )

                张量字典 = {}
                 张量, 编写项目  加载器.():
                    断言 张量.是 CPU
                    编写结果.追加(
                        __编写项目(
                            转换,
                            ,
                            张量,
                            编写项目,
                            存储键,
                            安全张量,
                        )
                    )
                    张量字典[编写项目.索引.完全限定名] = 张量

                如果 安全张量:
                    来自 safetensors.torch 导入 保存  # 类型:忽略[导入未找到]

                    .(保存(张量字典))

                如果 使用_fsync:
                    尝试:
                        操作系统.fsync(.文件描述符())
                    除了 (属性错误, 不支持的操作):
                        操作系统.sync()
                .关闭()
            result_queue.放置(write_results)
    除了 队列.空的:
        通过


 文件系统基类(ABC):
    @contextmanager
    @abstractmethod
    def 创建流(
        , 路径: 联盟[字符串, 操作系统.PathLike], 模式: 字符串
    ) -> 生成器[输入/输出.IO 基础, None, None] ...

    @abstractmethod
    def 连接路径(
        , 路径: 联盟[字符串, 操作系统.PathLike], 后缀: 字符串
    ) -> 联盟[字符串, 操作系统.PathLike] ...

    @abstractmethod
    def 重命名(
        , 路径: 联盟[字符串, 操作系统.PathLike], 新路径: 联盟[字符串, 操作系统.PathLike]
    ) -> : ...

    @abstractmethod
    def 初始化路径(, 路径: 联盟[字符串, 操作系统.PathLike]\) -> 联盟[字符串, 操作系统.PathLike] ...

    @abstractmethod
    def 建立目录(, 路径: 联盟[字符串, 操作系统.PathLike]\) -> : ...

    @classmethod
    @abstractmethod
    定义 验证校验点 ID(, 检查点 ID: 联盟[字符串, 操作系统.PathLike]\) -> 布尔: ...

    @abstractmethod
    定义 存在(, 路径: 联盟[字符串, 操作系统.PathLike]\) -> 布尔: ...

    @abstractmethod
    定义 删除文件(, 路径: 联盟[字符串, 操作系统.PathLike]\) -> : ...


 文件系统(文件系统基类):
    @contextmanager
    定义 创建流(
        , 路径: 联盟[字符串, 操作系统.PathLike], 模式: 字符串
    ) -> 生成器[输入/输出.IO 基础, , ]
        如果 否。 isinstance(路径, 路径):
            路径 = 路径(路径)
         路径.打开(模式)  :
            产生 角色(输入/输出.IO 基础, )

    定义 拼接路径(
        , 路径: 联盟[字符串, 操作系统.PathLike], 后缀: 字符串
    ) -> 联盟[字符串, 操作系统.PathLike]
        如果 否。 isinstance(路径, 路径):
            路径 = 路径(路径)
        返回 路径 / 后缀

    定义 初始化路径(, 路径: 联盟[字符串, 操作系统.PathLike]\) -> 联盟[字符串, 操作系统.PathLike]
        如果 否。 isinstance(路径, 路径):
            路径 = 路径(路径)
        返回 路径

    定义 重命名(
        自身, 路径: 联盟[字符串, os.PathLike], 新路径: 联盟[字符串, os.PathLike]
    ) -> :
        如果  isinstance(路径, 路径):
            路径 = 路径(路径)

        路径.重命名(角色(路径, 新路径))

    def 建立目录(, 路径: 联盟[字符串, os.PathLike]) -> :
        如果  isinstance(路径, 路径):
            路径 = 路径(路径)
        路径.建立目录(parents=True, exist_ok=True)

    @classmethod
    def 验证校验点 ID(, 校验点 ID: 联盟[字符串, 操作系统.PathLike]\) -> 布尔:
        if isinstance(校验点 ID, 路径):
            返回 真实

        if ://  字符串(检查点 ID):
            返回 

         p  路径(检查点 ID).parents:
            如果 p.存在()  操作系统.访问(字符串(p), 操作系统.写操作权限):
                返回 真实

        返回 

    def 存在(, 路径: 联盟[字符串, 操作系统.PathLike]\) -> 布尔:
        如果 否。 isinstance(路径, 路径):
            路径 = 路径(路径)
        返回 路径.存在()

    定义 删除文件(, 路径: 联盟[字符串, 操作系统.PathLike]\) -> :
        如果 否。 isinstance(路径, 路径):
            路径 = 路径(路径)
        路径.解链()


 文件系统写入器(存储写入器):
    ""
基本实现使用文件 I/O 的 StorageWriter。

本实现做出以下假设和简化:

*检查点路径是一个空目录或不存在。
*文件创建是原子的

检查点由每个写入请求的一个文件组成,
以及一个包含序列化元数据的 `.metadata` 文件。

"文档"

    定义 __init__(
        ,
        路径: 联盟[字符串, 操作系统.PathLike],
        单个文件每等级: 布尔类型 = True,
        同步文件: 布尔类型 = True,
        线程数: 整型 = 1,
        每线程预取副本: 整型 = 10_000_000,
        覆盖: 布尔类型 = True,
        _扩展: 可选[序列[流转换扩展]] = ,
        *参数: 任何,
        **kwargs: 任何,
    ) -> :
        ""
初始化指向 `path` 的写入器。

参数:
检查点将被写入的目录路径。
single_file_per_rank: 每个 rank 生成一个文件,而不是每个 tensor/blob 生成一个文件。默认为 True。
sync_files: 强制将文件同步到永久存储。默认为 True。
thread_count: 用于写入的 IO 线程数。默认为 1。
在保存前从 GPU 预复制多少字节。默认为 10MB。
是否允许覆盖现有检查点。默认为 True。
_extensions: 应用到输出流的扩展(实验性)

注意。如果禁用 sync_files,则在失败的情况下无法保证检查点的一致性。
"文档"
        超级().__init__()
        .文件系统 = 文件系统()
        .路径 = .文件系统.初始化路径(路径)
        .单个文件每等级 = 单个文件每等级
        .同步文件 = 同步文件
        .线程数 = 线程数
        .每线程预取 = 每线程预取
        .保存 ID = 生成 UUID()
        .覆盖 = 覆盖
        .转换 = 存储写入转换(扩展)

    定义 重置(, 检查点 ID: 联盟[字符串, 操作系统.PathLike, ] = ) -> :
        如果 检查点 ID:
            .路径 = .文件系统.初始化路径(检查点 ID)
        .保存 ID = 生成 UUID()

    定义 设置存储写入器(, 是否为协调器: 布尔) -> :
        通过

    定义 准备本地计划(, 计划: 保存计划 SavePlan) -> 保存计划 SavePlan:
        .文件系统.建立目录(.路径)
        如果 .文件系统.存在(.元数据路径):
            如果 .覆盖:
                warnings.warn(
                    f"检测到现有检查点,在"{.元数据路径},由于"{.覆盖"=}
                    "在 PyTorch 2.5 之前的版本中,`overwrite`默认为 False。将此变量设置为 True 以"
                    "保留此功能或当发现现有检查点时引发错误。"
                )
            else:
                raise 运行时错误(f"检查点已存在且"{.覆盖=}.")

        返回 规划

    定义 准备全局计划(, 计划: 列表[保存计划 SavePlan]\) -> 列表[保存计划 SavePlan]
        新计划 = [
            dataclasses.替换(计划, 存储数据=存储前缀(f"__"{i}_))
             i, 规划  列举(计划)
        ]
        返回 新计划

    定义 写数据(
        ,
        计划: 保存计划 SavePlan,
        规划器: 保存计划,
    ) -> 未来[列表[写结果]]
        存储计划: 存储前缀 = 计划.存储数据
        文件数量 = 0

        定义 生成文件():
            非局部 文件数量
            文件名 = f"{存储计划.前缀}{文件数量}{默认后缀}"
            文件数量 += 1
            返回 文件名

        文件队列: 队列.队列 = 队列.队列()
        如果 自身.单文件每排名:
               按大小和类型分割(自身.线程数, 计划.项目):
                文件名 = 生成文件()
                路径 = 自身.文件系统.连接路径(自身.路径, 文件名)
                文件队列.放置((路径, 文件名, ))
        else:
             项目  计划.项目:
                文件名 = 生成文件()
                路径 = 自身.文件系统.拼接路径(自身.路径, 文件名)
                文件队列.放置((路径, 文件名, [项目]))

        返回 自身._写入数据(规划器, 文件队列)

    定义 写入数据(
        自身,
        规划器: 保存规划器,
        文件队列: 队列.队列,
        安全张量: 布尔类型 = 错误,
    ) -> 未来[列表[写入结果]]
        结果队列: 队列.队列 = 队列.队列()

        线程 = 输入文本为空,请提供需要翻译的文本
         _  范围(1, 自身.线程数):
            t = 线程.Thread(
                目标=从队列中写入文件,
                参数=(
                    自身.文件系统.创建流,
                    文件队列,
                    结果队列,
                    规划器,
                    自身.转换,
                    自身.每线程预取副本,
                    自身.同步文件,
                    自身.线程数,
                    安全张量,
                ),
            )
            t.开始()
            线程.追加(t)

        从队列中写入文件(
            创建流=自身.文件系统.创建流,
            文件队列=文件队列,
            结果队列=结果队列,
            规划器=规划器,
            转换=自身.转换,
            在飞行阈值=自身.每线程预取副本,
            使用_fsync=自身.同步文件,
            线程数=自身.线程数,
            安全张量=安全张量,
        )

         t  线程:
            t.连接()

        res = 输入文本为空,请提供需要翻译的文本
        try:
             True:
                res += 结果队列.非阻塞获取()
        除了 队列.空的:
            fut: 未来[列表[写入结果]] = 未来()
            fut.设置结果(资源)
            返回 fut

    定义 完成(自身, 元数据: 元数据, 结果: 列表[列表[写入结果]]) -> :
        存储 md = {}
         写入列表  结果:
            存储 md.更新({wr.索引: wr.存储数据  wr  写入列表})
        元数据.存储数据 = 存储元数据

        元数据.存储元信息 = .存储元数据()

        临时路径 = 角色(路径, .文件系统.连接路径(.路径, f"{_metadata_fn}.tmp"))
         .文件系统.创建流(临时路径, wb)  元数据文件:
            pickle.导出(元数据, 元数据文件)
            如果 .同步文件:
                try:
                    操作系统.文件同步(元数据文件.文件描述符())
                除了 (属性错误, 不支持的操作):
                    操作系统.同步()

        # 删除,如果存在其他检查点。
        如果 自身.文件系统.存在(自身.元数据路径):
            自身.文件系统.删除文件(自身.元数据路径)

        自身.文件系统.重命名(临时路径, 自身.元数据路径)

    定义 存储元数据(自身) -> 可选[存储元数据]
        返回 存储元数据(检查点 ID=.检查点 ID, 保存 ID=.保存 ID)

    @property
    定义 元数据路径() -> 联盟[字符串, 操作系统.PathLike]
        返回 角色(路径, .文件系统.连接路径(.路径, _metadata_fn))

    @property
    定义 检查点 ID() -> 联盟[字符串, 操作系统.PathLike]
        ""
返回将要用于保存检查点的检查点 ID。
"文档"
        返回 .路径

    @classmethod
    定义 验证校验点 ID(, 检查点 ID: 联盟[字符串, 操作系统.PathLike]\) -> 布尔:
        返回 文件系统.验证校验点 ID(检查点 ID)


 _存储读取转换:
    ""
这只是实验性的,很可能会移动到其他地方。
未来。它在这里是为了在学习和收集反馈的过程中尽量减少改动。
它在这里是为了在学习和收集反馈的过程中尽量减少改动。
"文档"

    定义 初始化(, 扩展注册表: 可选[扩展注册表] = ) -> :
        自身.扩展注册表 = (
            扩展注册表() 如果 扩展注册表   否则 扩展注册表
        )

    定义 转换加载流(
        自身,
        读取项: 读取项目,
        转换描述符: 序列[字符串],
        原始流: IO[字节],
    ) -> IO[字节]
        扩展 = .扩展注册表.从描述符列表(转换描述符)
        转换自 = 原始流
         示例  扩展:
            如果 isinstance(ext: ex, 流式转换扩展):
                从...转换 = ext: ex.从...转换(从...转换)
        返回 转换自


[文档] 文件系统读取器(存储读取器): 定义 __init__( , 路径: 联盟[字符串, 操作系统.PathLike], _扩展注册表: 可选[扩展注册表] = , # 实验 ) -> : 超级().__init__() .文件系统 = 文件系统() .路径 = .文件系统.初始化路径(路径) .存储数据: 字典[任何, 任何] = {} .加载 ID = 生成 UUID() .转换 = 存储读取转换(扩展注册表) 定义 切片文件(, 文件, sinfo: _存储信息) -> IO[字节] 返回 角色(IO[字节], _创建文件视图(文件, sinfo.偏移量, sinfo.长度)) 定义 重置(, 检查点 ID: 联盟[字符串, 操作系统.PathLike, ] = ) -> : .存储数据 = {} 如果 检查点 ID: .路径 = .文件系统.初始化路径(检查点 ID) .加载 ID = 生成 UUID() 定义 读取数据(, plan: 装载计划, 规划器: 装载规划器) -> 未来[] 按文件分组请求 按文件: 字典[字符串, 列表[读取项目]] = {} 读取项 plan.项目: 项目描述: _存储信息 = .存储数据[阅读条目.存储索引] 路径 = 项目描述.相对路径 每个文件.setdefault(路径, 空列表.追加(阅读条目) 相对路径, 需求 每个文件.项目(): 新路径 = .文件系统.拼接路径(.路径, 相对路径) .文件系统.创建流(新路径, rb) : # TODO 排序偏移量并缓存读取 req 需求: item_md = .storage_data[req.存储索引] file_slice = .切片文件(, 项目_md) 转换_from = .转换.转换加载流( req, 此字段在旧版本中不存在 # 实现提供回退机制。 项目元数据.转换描述符 或者 (), 文件切片, ) 如果 req.类型 == 装载项目类型.字节输入输出: 读取字节 = 输入/输出.BytesIO(从转换.阅读(-1)) 读取字节数.搜索(0) 规划器.加载字节数(req, 读取字节数) else: 如果 转换自.可寻址的(): 可寻址 = 转换自 else: 因为 torch.load 需要可寻址的输入,所以读取转换 流式传输并存储输出(如有需要) 可寻址 = 输入/输出.BytesIO(转换自.阅读(-1)) 可寻址的.搜索(0) 张量 = 角色( 张量, 火炬.加载( 可寻址的, 地图位置="cpu", 仅权重=True, ), ) 张量 = 通过索引缩小张量( 张量, req.存储偏移量, req.lengths ) 目标张量 = 规划器.解析张量(req).detach() 断言 目标张量.尺寸() == 张量.尺寸(), ( f"需求{req.存储索引}不匹配大小{目标张量.尺寸()}{张量.尺寸()}" ) 目标张量.复制_(张量) 规划器.提交张量(req, 目标张量) fut: 未来 = 未来() fut.设置结果() 返回 fut 在 StorageReader 中实现抽象函数 定义 读取元数据() -> 元数据: 路径 = .文件系统.连接路径(.路径, ".元数据") .文件系统.创建流(路径, rb) 元数据文件: 元数据 = pickle.加载(元数据文件) 如果 getattr(元数据, 存储元数据, ) : 元数据.存储元数据 = 存储元数据() 元数据.存储元数据.加载 ID = .加载 ID 返回 元数据 定义 设置存储读取器(, 元数据: 元数据, 是否为协调器: 布尔) -> : .存储数据 = 元数据.存储数据 断言 .存储数据 否。 定义 准备本地计划(, plan: 装载计划) -> 装载计划: 返回 规划 定义 准备全局计划(, 计划: 列表[装载计划]\) -> 列表[装载计划] 返回 计划 @property 定义 检查点 ID() -> 联盟[字符串, 操作系统.PathLike] "" 返回用于加载检查点的检查点 ID。 "文档" 返回 .路径 @classmethod 定义 验证校验点 ID(, 检查点 ID: 联盟[字符串, 操作系统.PathLike]\) -> 布尔: 返回 文件系统.验证校验点 ID(检查点 ID)
[文档] 文件系统写入器(文件系统写入器, 阻塞异步分步器): "" 基本实现使用文件 I/O 的 StorageWriter。 本实现做出以下假设和简化: 检查点路径是一个空目录或不存在目录。 文件创建是原子的 检查点由每个写请求的一个文件组成 以及一个包含序列化元数据的 `.metadata` 文件。 "文档" 定义 初始化( , 路径: 联盟[字符串, 操作系统.PathLike], 单文件每秩: 布尔类型 = True, 同步文件: 布尔类型 = True, 线程数: 整型 = 1, 每线程预取: 整型 = 10_000_000, 缓存阶段状态字典: 布尔类型 = 错误, 覆盖: 布尔类型 = True, _扩展: 可选[序列[流转换扩展]] = , ) -> : "" 初始化指向 `path` 的写入器。 参数: 检查点将被写入的目录路径。 single_file_per_rank: 每个 rank 生成一个文件,而不是每个 tensor/blob 生成一个文件。默认为 True。 sync_files: 强制将文件同步到永久存储。默认为 True。 thread_count: 用于写入的 IO 线程数。默认为 1。 每线程预复制:在保存前从 GPU 复制多少字节。默认 10MB。 缓存阶段状态字典:是否缓存阶段状态字典。此选项降低阶段延迟 以增加内存使用为代价。此外,如果将此参数设置为 True,则预期 该 stager 被维护并重复用于多个 dcp.async_save 调用。默认为 False。 覆盖:是否允许覆盖现有检查点。默认为 True。 _extensions:应用于输出流的扩展(实验性) 注意:如果禁用 sync_files,则在失败的情况下无法保证检查点的一致性。 "文档" _FileSystemWriter.初始化( , 路径=路径, 单个文件每等级=单个文件每等级, 同步文件=同步文件, 线程数=线程数, 每线程预取=每线程预取, 覆盖=覆盖, 扩展=扩展, ) 阻塞异步分阶段器.初始化( , 缓存分阶段状态字典=缓存分阶段状态字典, )
[文档] def stage(self, state_dict: 状态字典类型) -> 状态字典类型: """AsyncStager.stage 的覆盖""" # 在异步情况下,状态字典已经在 CPU 上,所以保持这个 # 缓冲区没有意义 self.per_thread_copy_ahead = 0 返回 super().stage(state_dict)

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源