# mypy: 允许未类型化定义
导入
集合
导入 dataclasses
导入
输入/输出
导入
操作符
导入
操作系统
导入
酸菜
导入
队列
导入
线程
导入 uuid
导入
警告
从 abc
导入 ABC,
抽象方法
从 collections.abc
导入
生成器,
迭代器,
迭代器,
序列
从 contextlib
导入 contextmanager
从 dataclasses
导入
数据类
从
输入/输出
导入
不支持的操作
从 pathlib
导入 Path
从
打字
导入
任何,
可调用,
角色, IO,
可选,
联合
Python 3.12 中引入作为 collections.abc.Buffer
从 typing_extensions
导入
缓冲区
导入
火炬
从
火炬
导入
张量
从 torch._utils
导入
获取可用设备类型,
获取设备模块
从 torch.distributed._shard._utils
导入
窄化张量通过索引
从 torch.distributed.checkpoint._extension
导入 (
扩展注册表,
流转换扩展,
)
从 torch.distributed.checkpoint.metadata
导入
元数据,
状态字典类型,
存储元数据
从 torch.distributed.checkpoint.planner
导入 (
装载项目类型,
装载计划,
装载规划器,
读取项目,
保存计划,
保存规划器,
编写项目,
编写项目类型,
)
from torch.distributed.checkpoint.staging 导入
阻塞异步分阶段器
from torch.distributed.checkpoint.storage 导入 (
存储读取器,
存储写入器,
写入结果,
)
from torch.distributed.checkpoint.utils 导入 _create_file_view
from torch.futures 导入
未来
全部 = [
文件系统写入器,
文件系统读取器,
文件系统,
文件系统基类]
_metadata_fn: 字符串 = ".metadata"
@dataclass
类 _StorageInfo:
"这是每条记录的存储信息。"
相对路径:
字符串
偏移量:
整型
长度:
整型
转换描述符:
可选[
序列[
字符串]] =
无
def __getstate__(我):
返回 {k: v
为 k, v
在
我.
字典.
项目() if v
是
不 None}
@dataclass
类
_存储前缀:
前缀:
字符串
默认后缀 = ".distcp"
def _generate_uuid() -> 字符串:
返回
字符串(uuid.uuid4())
类 _TensorLoader(ABC):
@abstractmethod
def 添加(
我,
尺寸:
整数,
对象:
对象) -> None:
通过
@abstractmethod
def start_loading(我) -> None:
通过
@abstractmethod
def 值(
我) ->
迭代器[
元组[
PyTorch.
张量,
对象
]]
通过
类
_串行 CPU 加载器(
_张量加载器):
def __init__(我, resolve_fun:
可调用) -> None:
我.resolve_fun = resolve_fun
我.
项目:
列表[
元组[
整数,
对象]] =
输入文本为空,请提供需要翻译的文本
def 添加(
我,
尺寸:
整数,
对象:
对象) -> None:
我.
项目.
追加((
尺寸,
对象))
def start_loading(我) -> None:
通过
def 值(
我) ->
迭代器[
元组[
PyTorch.
张量,
对象
]]
为 _,
对象
在
我.
项目:
张量 =
我.resolve_fun(
对象).detach()
张量 =
张量.cpu()
如果
张量.
存储().
尺寸() !=
张量.
元素数量():
张量 =
张量.
克隆()
产生 (
张量,
对象,
)
类
_重叠 CPU 加载器(
_张量加载器):
def __init__(
我,
解析函数:
可调用,
流:
可选[
火把.
流] =
无,
在飞行阈值:
整型 = 1_000_000,
) -> 无:
我.
解析函数 =
解析函数
我.
项目:
列表[
元组[
整数,
对象]] =
输入文本为空,请提供需要翻译的文本
我.
飞行阈值 =
飞行阈值
我.
飞行数据 = 0
我.
当前项目:
集合.
双端队列 =
集合.
双端队列()
我.
索引 = 0
我.
开始 =
假
我.
设备类型 = (
流.
设备类型 if
流
否则
_获取可用设备类型()
)
我.
设备模块 =
_获取设备模块(
我.
设备类型)
我.
流 =
角色(
PyTorch.cuda.
流,
流
或者
我.
设备模块.current_stream()
)
如果
我.
流 !=
我.
设备模块.current_stream():
我.
流.
等待流(
我.
设备模块.current_stream())
@property
def 完成(
我) ->
布尔:
返回
我.
索引 >= len(
我.
项目)
def 排空(
我) ->
列表[
元组[
PyTorch.
张量,
对象
]]
已排空 =
输入文本为空,请提供需要翻译的文本
如果
我.
飞行中的数据 >=
我.
在飞行阈值:
我.
流.
同步()
当
我.
在飞行数据 >=
我.
在飞行阈值:
val = 我.
当前项目.popleft()
我.
飞行数据 -= val[0].
元素数量() * val[0].
元素大小()
疲惫.
追加(val)
返回
疲惫
def _补水(
我) ->
无:
与
我.
设备模块.
流(
我.
流):
当
不
我.
_完成
并且
我.
飞行数据 <
我.
飞行阈值:
_, 对象 =
我.
项目[
我.
索引]
我.
索引 += 1
张量 =
我.
解析函数(
对象).detach()
如果
张量.
设备.
类型 ==
我.
设备类型:
张量 =
张量.
到(
设备="cpu",
非阻塞=True)
elif 张量.
设备 ==
PyTorch.
设备("cpu"):
如果 (
张量.
未类型化存储().
尺寸()
!= 张量.
元素数量() *
张量.
元素大小
):
# 这将强制张量既连续又具有最小存储空间
张量 =
张量.
克隆()
我.
当前项.
追加(
(
张量,
对象,
)
)
我.
在飞行数据 +=
张量.
元素数量() *
张量.
元素大小()
def 完成(
我) ->
迭代器[
元组[
PyTorch.
张量,
对象
]]
断言
我.
完成
如果 len(
我.
当前项目) > 0:
我.
流.
同步()
返回
我.
当前项目
def 添加(
我,
尺寸:
整数,
对象:
对象) -> None:
如果
我.
开始:
raise 运行时错误(
"加载开始后不能添加项目")
我.
项目.
追加((
尺寸,
对象))
def 开始加载(
我) -> None:
如果
我.
已开始:
返回
我.
已开始 =
真实
我.
项目.
排序(
键=
操作符.itemgetter(0))
我.
_加满()
def 值(
我) ->
迭代器[
元组[
PyTorch.
张量,
对象
]]
我.
开始加载()
当
不
我.
_完成:
耗尽 =
我.
_排空()
我._refill()
yield from drained
yield from 我._finish()
类 _StorageWriterTransforms:
""
这是实验性的,可能会移动到其他地方
未来。它在这里是为了在我们仍在进行时最小化更改。
学习和收集反馈。
"文档"
def __init__(
我,
扩展:
可选[
序列[
流转换扩展]] =
无
) -> None:
""
如果 extensions 参数为 None,这意味着实现
应提供它选择的任何默认值。一个空的
序列表示不应使用任何扩展。在此
时间,默认扩展序列为空。
"文档"
我.
扩展 = ()
如果
扩展
是
无
否则
扩展
def 转换保存流(
我,
写入项:
写入项,
原始流:
输入/输出.IOBase
) -> 元组[IO[
字节
],
列表[
字符串
]]
为了避免文件描述符泄露,转换器的关闭必须
# 将级联到包装流,但由于此函数可以
# 追加到原始流,我们无法关闭实际流。
所以,我们用这个来包裹原始流
关闭()使其成为空操作,并且一旦所有文件关闭
# 被附加。
类
不关闭写入器(
输入/输出.
IO 基础):
def __init__(我,
原始:
输入/输出.
IO 基础):
我.
原始 =
原始
def 可写入(
我) ->
布尔:
返回
真实
def 写(
我, b:
缓冲区) ->
整数:
返回
我.
原始.
写(b)
def 关闭(
我):
我.
清空()
我.
原始.
清空()
但不闭合。
转换为 =
角色(IO[
字节
], NoCloseWriter(
原始流))
为
示例
在
我.
扩展:
转换为 =
ext: ex.
转换为(
转换为)
返回 (
转换为, [
ext: ex.
获取描述符()
为
示例
在
反转(
我.
扩展
)]])
def _项目大小(
项目:
写入项目) ->
整数:
大小 = 1
断言
项目.
张量数据
是
不
无
# 无法使用 math.prod,因为 PT 需要支持较旧的 Python
为 s
在
项目.
张量数据.
尺寸:
大小 *= s
dtype = 项目.
张量数据.
属性.dtype
返回
大小 *
PyTorch.
_工具.
元素大小(
数据类型)
def 按大小和类型分割(
箱数:
整数,
项目:
列表[
写入项
]\) ->
列表[
列表[
写入项
]]
如果
箱数 == 1:
返回 [
项目]
bytes_w = [wi 为 wi
在
项目
如果 wi.
类型 ==
写入项类型.
字节输入输出]
tensor_w = [wi 为
为
在
项目
如果
为.
类型 !=
写入项类型.
字节输入输出]
桶:
列表[
列表[
写入项]] =
空列表
为 _
在
范围(
箱数)]
桶大小 = [0
为 _
在
范围(
箱数)]
矩阵_w.
排序(
键=_item_size, reverse=True)
为 i, wi
在
列举(bytes_w):
buckets[i % 箱数].
追加(wi)
为 wi
在
张量_w:
# TODO 替换为头文件
索引 =
最小(
列举(
桶大小),
键=
操作符.itemgetter(1))[0]
桶[
索引].
追加(wi)
桶大小[
索引] += _item_size(wi)
返回 buckets
def _write_item(
转换:
存储写入转换,
流:
输入/输出.
IO 基础,
数据:
联盟[
输入/输出.BytesIO,
PyTorch.
张量
],
写入项: WriteItem,
存储键:
字符串,
安全张量:
布尔 =
假的,
) -> 写入结果:
偏移 =
流.
告诉()
(转换为,
转换描述符) =
转换.
转换保存流(
写入项,
流
)
如果
编写项.
类型 ==
写入类型.
字节输入输出:
断言 isinstance(
数据,
输入/输出.BytesIO)
转换为.
写(
数据.
获取缓冲区())
else:
断言 isinstance(
数据,
PyTorch.
张量)
断言
数据.
设备 ==
PyTorch.
设备("cpu")
如果
否。
安全张量:
PyTorch.
保存(
数据,
转换为)
转换为.
关闭()
如果
否。
安全张量
或者 isinstance(
数据,
输入/输出.BytesIO):
长度 =
流.
告诉() -
偏移
else:
长度 =
数据.
元素数量() *
数据.
元素大小()
# 为了与早期版本保持一致,请将此字段留空。
# 如果没有扩展,请将此字段留空以保持元数据。
信息转换描述 = (
无
如果 len(
转换描述符) == 0
否则
转换描述符
)
返回
写入结果(
索引=
写入条目.
索引,
字节数大小=
长度,
存储数据=
_存储信息(
存储键,
偏移量,
长度,
转换描述符=
信息转换描述符,
),
)
def 从队列中写入文件(
创建流:
可调用,
文件队列:
队列.
队列,
结果队列:
队列.
队列,
规划器:
保存规划器,
转换:
_存储写入转换,
在飞行阈值:
整数,
使用_fsync:
布尔,
线程数:
整数,
安全张量:
布尔,
) -> None:
尝试:
当 True:
文件名,
存储键,
写入项 =
文件队列.
非阻塞获取()
加载器: _TensorLoader
自定义后端名称 =
PyTorch._C._get_privateuse1_backend_name()
自定义设备模块 = getattr(
PyTorch,
自定义后端名称, None)
# TODO: 使用 OverlappingCpuLoader 在多个线程中创建会带来显著的
# 性能下降,观察到与 CUDA 流同步有关。我们
# 应该尝试修复这个问题,并在所有线程情况下使用_OverlappingCpuLoader
如果 (
thread_count == 1
并且 (
PyTorch.cuda.
是否可用()
或者 (
自定义设备模块
并且
自定义设备模块.
是否可用())
)
并且 inflight_threshhold > 0
):
加载器 =
_重叠 CPU 加载器(
规划器.
解析数据,
在空中阈值=
在空中阈值,
)
否则:
加载器 =
_串行 CPU 加载器(
规划器.
解析数据,
)
矩阵_w = [wi
为 wi
在
写入项
如果 wi.
类型 !=
写入项类型.
字节输入输出]
为
写入项目
在
张量_w:
加载器.
添加(
_项目大小(
编写项目),
编写项目)
加载器.
开始加载()
字节数_w = [wi
为 wi
在
写入项
如果 wi.
类型 ==
写入项目类型.
字节输入输出]
写入结果 =
输入文本为空,请提供需要翻译的文本
与
创建流(
文件名,
wb)
是
流:
为
写入项目
在 bytes_w:
数据 =
规划器.resolve_data(write_item)
write_results.追加(
写条目(
转换,
流,
数据,
写条目,
存储键,
安全张量,
)
)
张量字典 = {}
为
张量,
编写项目
在
加载器.
值():
断言
张量.
是 CPU
编写结果.
追加(
__编写项目(
转换,
流,
张量,
编写项目,
存储键,
安全张量,
)
)
张量字典[
编写项目.
索引.
完全限定名] =
张量
如果
安全张量:
来自 safetensors.torch
导入
保存
# 类型:忽略[导入未找到]
流.
写(
保存(
张量字典))
如果
使用_fsync:
尝试:
操作系统.fsync(
流.
文件描述符())
除了 (
属性错误,
不支持的操作):
操作系统.sync()
流.
关闭()
result_queue.放置(write_results)
除了
队列.
空的:
通过
类
文件系统基类(ABC):
@contextmanager
@abstractmethod
def 创建流(
我,
路径:
联盟[
字符串,
操作系统.PathLike
],
模式:
字符串
) -> 生成器[
输入/输出.
IO 基础, None, None
] ...
@abstractmethod
def 连接路径(
我,
路径:
联盟[
字符串,
操作系统.PathLike
],
后缀:
字符串
) -> 联盟[
字符串,
操作系统.PathLike
] ...
@abstractmethod
def 重命名(
我,
路径:
联盟[
字符串,
操作系统.PathLike
],
新路径:
联盟[
字符串,
操作系统.PathLike]
) -> 无: ...
@abstractmethod
def 初始化路径(
我,
路径:
联盟[
字符串,
操作系统.PathLike
]\) ->
联盟[
字符串,
操作系统.PathLike
] ...
@abstractmethod
def 建立目录(
我,
路径:
联盟[
字符串,
操作系统.PathLike
]\) ->
无: ...
@classmethod
@abstractmethod
定义
验证校验点 ID(
类,
检查点 ID:
联盟[
字符串,
操作系统.PathLike
]\) ->
布尔: ...
@abstractmethod
定义
存在(
我,
路径:
联盟[
字符串,
操作系统.PathLike
]\) ->
布尔: ...
@abstractmethod
定义
删除文件(
我,
路径:
联盟[
字符串,
操作系统.PathLike
]\) ->
无: ...
类
文件系统(
文件系统基类):
@contextmanager
定义
创建流(
我,
路径:
联盟[
字符串,
操作系统.PathLike
],
模式:
字符串
) -> 生成器[
输入/输出.
IO 基础,
无,
无
]
如果
否。 isinstance(
路径,
路径):
路径 =
路径(
路径)
与
路径.
打开(
模式)
是
流:
产生
角色(
输入/输出.
IO 基础,
流)
定义
拼接路径(
我,
路径:
联盟[
字符串,
操作系统.PathLike
],
后缀:
字符串
) -> 联盟[
字符串,
操作系统.PathLike
]
如果
否。 isinstance(
路径,
路径):
路径 =
路径(
路径)
返回
路径 /
后缀
定义
初始化路径(
我,
路径:
联盟[
字符串,
操作系统.PathLike
]\) ->
联盟[
字符串,
操作系统.PathLike
]
如果
否。 isinstance(
路径,
路径):
路径 =
路径(
路径)
返回
路径
定义
重命名(
自身,
路径:
联盟[
字符串, os.PathLike
],
新路径:
联盟[
字符串, os.PathLike]
) -> 无:
如果
不 isinstance(
路径,
路径):
路径 =
路径(
路径)
路径.
重命名(
角色(
路径,
新路径))
def 建立目录(
我,
路径:
联盟[
字符串, os.PathLike]) ->
无:
如果
不 isinstance(
路径,
路径):
路径 =
路径(
路径)
路径.
建立目录(parents=True, exist_ok=True)
@classmethod
def 验证校验点 ID(
类,
校验点 ID:
联盟[
字符串,
操作系统.PathLike
]\) ->
布尔:
if isinstance(校验点 ID,
路径):
返回
真实
if ://
在
字符串(
检查点 ID):
返回
假
为 p
在
路径(
检查点 ID).parents:
如果 p.
存在()
和
操作系统.
访问(
字符串(p),
操作系统.
写操作权限):
返回
真实
返回
假
def 存在(
我,
路径:
联盟[
字符串,
操作系统.PathLike
]\) ->
布尔:
如果
否。 isinstance(
路径,
路径):
路径 =
路径(
路径)
返回
路径.
存在()
定义
删除文件(
我,
路径:
联盟[
字符串,
操作系统.PathLike
]\) ->
无:
如果
否。 isinstance(
路径,
路径):
路径 =
路径(
路径)
路径.
解链()
类
文件系统写入器(
存储写入器):
""
基本实现使用文件 I/O 的 StorageWriter。
本实现做出以下假设和简化:
*检查点路径是一个空目录或不存在。
*文件创建是原子的
检查点由每个写入请求的一个文件组成,
以及一个包含序列化元数据的 `.metadata` 文件。
"文档"
定义 __init__(
我,
路径:
联盟[
字符串,
操作系统.PathLike
],
单个文件每等级:
布尔类型 = True,
同步文件:
布尔类型 = True,
线程数:
整型 = 1,
每线程预取副本:
整型 = 10_000_000,
覆盖:
布尔类型 = True,
_扩展:
可选[
序列[
流转换扩展]] =
无,
*参数:
任何,
**kwargs: 任何,
) -> 无:
""
初始化指向 `path` 的写入器。
参数:
检查点将被写入的目录路径。
single_file_per_rank: 每个 rank 生成一个文件,而不是每个 tensor/blob 生成一个文件。默认为 True。
sync_files: 强制将文件同步到永久存储。默认为 True。
thread_count: 用于写入的 IO 线程数。默认为 1。
在保存前从 GPU 预复制多少字节。默认为 10MB。
是否允许覆盖现有检查点。默认为 True。
_extensions: 应用到输出流的扩展(实验性)
注意。如果禁用 sync_files,则在失败的情况下无法保证检查点的一致性。
"文档"
超级().__init__()
我.
文件系统 =
文件系统()
我.
路径 =
我.
文件系统.
初始化路径(
路径)
我.
单个文件每等级 =
单个文件每等级
我.
同步文件 =
同步文件
我.
线程数 =
线程数
我.
每线程预取 =
每线程预取
我.
保存 ID =
生成 UUID()
我.
覆盖 =
覆盖
我.
转换 =
存储写入转换(
扩展)
定义
重置(
我,
检查点 ID:
联盟[
字符串,
操作系统.PathLike,
无] =
无) ->
无:
如果
检查点 ID:
我.
路径 =
我.
文件系统.
初始化路径(
检查点 ID)
我.
保存 ID =
生成 UUID()
定义
设置存储写入器(
我,
是否为协调器:
布尔) ->
无:
通过
定义
准备本地计划(
我,
计划:
保存计划 SavePlan) ->
保存计划 SavePlan:
我.
文件系统.
建立目录(
我.
路径)
如果
我.
文件系统.
存在(
我.
元数据路径):
如果
我.
覆盖:
warnings.warn(
f"检测到现有检查点,在"{
我.
元数据路径}
,由于"{
我.
覆盖"=}
。
"在 PyTorch 2.5 之前的版本中,`overwrite`默认为 False。将此变量设置为 True 以"
"保留此功能或当发现现有检查点时引发错误。"
)
else:
raise 运行时错误(f
"检查点已存在且"{
我.
覆盖=}.")
返回
规划
定义
准备全局计划(
我,
计划:
列表[
保存计划 SavePlan
]\) ->
列表[
保存计划 SavePlan
]
新计划 = [
dataclasses.替换(
计划,
存储数据=
存储前缀(f
"__"{i}
_))
为 i,
规划
在
列举(
计划)
]
返回
新计划
定义
写数据(
我,
计划:
保存计划 SavePlan,
规划器:
保存计划,
) -> 未来[
列表[
写结果
]]
存储计划:
存储前缀 =
计划.
存储数据
文件数量 = 0
定义
生成文件():
非局部
文件数量
文件名 = f"{
存储计划.
前缀}{
文件数量}{
默认后缀}"
文件数量 += 1
返回
文件名
文件队列:
队列.
队列 =
队列.
队列()
如果
自身.
单文件每排名:
为
桶
在
按大小和类型分割(
自身.
线程数,
计划.
项目):
文件名 =
生成文件()
路径 =
自身.
文件系统.
连接路径(
自身.
路径,
文件名)
文件队列.
放置((
路径,
文件名,
桶))
else:
为
项目
在
计划.
项目:
文件名 =
生成文件()
路径 =
自身.
文件系统.
拼接路径(
自身.
路径,
文件名)
文件队列.
放置((
路径,
文件名, [
项目]))
返回
自身.
_写入数据(
规划器,
文件队列)
定义
写入数据(
自身,
规划器:
保存规划器,
文件队列:
队列.
队列,
安全张量:
布尔类型 =
错误,
) -> 未来[
列表[
写入结果
]]
结果队列:
队列.
队列 =
队列.
队列()
线程 =
输入文本为空,请提供需要翻译的文本
为 _
在
范围(1,
自身.
线程数):
t = 线程.Thread(
目标=
从队列中写入文件,
参数=(
自身.
文件系统.
创建流,
文件队列,
结果队列,
规划器,
自身.
转换,
自身.
每线程预取副本,
自身.
同步文件,
自身.
线程数,
安全张量,
),
)
t.开始()
线程.
追加(t)
从队列中写入文件(
创建流=
自身.
文件系统.
创建流,
文件队列=
文件队列,
结果队列=
结果队列,
规划器=
规划器,
转换=
自身.
转换,
在飞行阈值=
自身.
每线程预取副本,
使用_fsync=
自身.
同步文件,
线程数=
自身.
线程数,
安全张量=
安全张量,
)
为 t
在
线程:
t.连接()
res = 输入文本为空,请提供需要翻译的文本
try:
当 True:
res += 结果队列.
非阻塞获取()
除了
队列.
空的:
fut: 未来[
列表[
写入结果]] =
未来()
fut.设置结果(
资源)
返回 fut
定义
完成(
自身,
元数据:
元数据,
结果:
列表[
列表[
写入结果]]) ->
无:
存储 md = {}
为
写入列表
在
结果:
存储 md.
更新({wr.
索引: wr.
存储数据
为 wr
在
写入列表})
元数据.
存储数据 =
存储元数据
元数据.
存储元信息 =
我.
存储元数据()
临时路径 =
角色(
路径,
我.
文件系统.
连接路径(
我.
路径, f"{_metadata_fn}.tmp"))
与
我.
文件系统.
创建流(
临时路径,
wb)
是
元数据文件:
pickle.导出(
元数据,
元数据文件)
如果
我.
同步文件:
try:
操作系统.
文件同步(
元数据文件.
文件描述符())
除了 (
属性错误,
不支持的操作):
操作系统.
同步()
# 删除,如果存在其他检查点。
如果
自身.
文件系统.
存在(
自身.
元数据路径):
自身.
文件系统.
删除文件(
自身.
元数据路径)
自身.
文件系统.
重命名(
临时路径,
自身.
元数据路径)
定义
存储元数据(
自身) ->
可选[
存储元数据
]
返回
存储元数据(
检查点 ID=
我.
检查点 ID,
保存 ID=
我.
保存 ID)
@property
定义
元数据路径(
我) ->
联盟[
字符串,
操作系统.PathLike
]
返回
角色(
路径,
我.
文件系统.
连接路径(
我.
路径, _metadata_fn))
@property
定义
检查点 ID(
我) ->
联盟[
字符串,
操作系统.PathLike
]
""
返回将要用于保存检查点的检查点 ID。
"文档"
返回
我.
路径
@classmethod
定义
验证校验点 ID(
类,
检查点 ID:
联盟[
字符串,
操作系统.PathLike
]\) ->
布尔:
返回
文件系统.
验证校验点 ID(
检查点 ID)
类
_存储读取转换:
""
这只是实验性的,很可能会移动到其他地方。
未来。它在这里是为了在学习和收集反馈的过程中尽量减少改动。
它在这里是为了在学习和收集反馈的过程中尽量减少改动。
"文档"
定义
初始化(
我,
扩展注册表:
可选[
扩展注册表] =
无) ->
无:
自身.
扩展注册表 = (
扩展注册表()
如果
扩展注册表
是
无
否则
扩展注册表
)
定义
转换加载流(
自身,
读取项:
读取项目,
转换描述符:
序列[
字符串
],
原始流: IO[
字节
],
) -> IO[字节
]
扩展 =
我.
扩展注册表.
从描述符列表(
转换描述符)
转换自 =
原始流
为
示例
在
扩展:
如果 isinstance(
ext: ex,
流式转换扩展):
从...转换 =
ext: ex.
从...转换(
从...转换)
返回
转换自
[文档]
类
文件系统读取器(
存储读取器):
定义 __init__(
我,
路径:
联盟[
字符串,
操作系统.PathLike
],
_扩展注册表:
可选[
扩展注册表] =
无,
# 实验
) -> 无:
超级().__init__()
我.
文件系统 =
文件系统()
我.
路径 =
我.
文件系统.
初始化路径(
路径)
我.
存储数据:
字典[
任何,
任何] = {}
我.
加载 ID =
生成 UUID()
我.
转换 =
存储读取转换(
扩展注册表)
定义
切片文件(
我,
文件, sinfo:
_存储信息) -> IO[
字节
]
返回
角色(IO[
字节
],
_创建文件视图(
文件, sinfo.
偏移量, sinfo.
长度))
定义
重置(
我,
检查点 ID:
联盟[
字符串,
操作系统.PathLike,
无] =
无) ->
无:
我.
存储数据 = {}
如果
检查点 ID:
我.
路径 =
我.
文件系统.
初始化路径(
检查点 ID)
我.
加载 ID =
生成 UUID()
定义
读取数据(
我, plan:
装载计划,
规划器:
装载规划器) ->
未来[
无
]
按文件分组请求
按文件:
字典[
字符串,
列表[
读取项目]] = {}
为
读取项
在 plan.
项目:
项目描述:
_存储信息 =
我.
存储数据[
阅读条目.
存储索引]
路径 =
项目描述.
相对路径
每个文件.setdefault(
路径,
空列表.
追加(
阅读条目)
为
相对路径,
需求
在
每个文件.
项目():
新路径 =
我.
文件系统.
拼接路径(
我.
路径,
相对路径)
与
我.
文件系统.
创建流(
新路径,
rb)
是
流:
# TODO 排序偏移量并缓存读取
为 req
在
需求:
item_md = 我.storage_data[req.
存储索引]
file_slice = 我.
切片文件(
流,
项目_md)
转换_from =
我.
转换.
转换加载流(
req,
此字段在旧版本中不存在
# 实现提供回退机制。
项目元数据.
转换描述符
或者 (),
文件切片,
)
如果 req.
类型 ==
装载项目类型.
字节输入输出:
读取字节 =
输入/输出.BytesIO(
从转换.
阅读(-1))
读取字节数.
搜索(0)
规划器.
加载字节数(req,
读取字节数)
else:
如果
转换自.
可寻址的():
可寻址 =
转换自
else:
因为 torch.load 需要可寻址的输入,所以读取转换
流式传输并存储输出(如有需要)
可寻址 =
输入/输出.BytesIO(
转换自.
阅读(-1))
可寻址的.
搜索(0)
张量 =
角色(
张量,
火炬.
加载(
可寻址的,
地图位置="cpu",
仅权重=True,
),
)
张量 =
通过索引缩小张量(
张量, req.
存储偏移量, req.lengths
)
目标张量 =
规划器.
解析张量(req).detach()
断言
目标张量.
尺寸() ==
张量.
尺寸(), (
f"需求{req.
存储索引}
不匹配大小{
目标张量.
尺寸()}
与{
张量.
尺寸()}"
)
目标张量.
复制_(
张量)
规划器.
提交张量(req,
目标张量)
fut: 未来 =
未来()
fut.设置结果(
无)
返回 fut
在 StorageReader 中实现抽象函数
定义
读取元数据(
我) ->
元数据:
路径 =
我.
文件系统.
连接路径(
我.
路径,
".元数据")
与
我.
文件系统.
创建流(
路径,
rb)
是
元数据文件:
元数据 = pickle.
加载(
元数据文件)
如果 getattr(
元数据,
存储元数据,
无)
是
无:
元数据.
存储元数据 =
存储元数据()
元数据.
存储元数据.
加载 ID =
我.
加载 ID
返回
元数据
定义
设置存储读取器(
我,
元数据:
元数据,
是否为协调器:
布尔) ->
无:
我.
存储数据 =
元数据.
存储数据
断言
我.
存储数据
是
否。
无
定义
准备本地计划(
我, plan:
装载计划) ->
装载计划:
返回
规划
定义
准备全局计划(
我,
计划:
列表[
装载计划
]\) ->
列表[
装载计划
]
返回
计划
@property
定义
检查点 ID(
我) ->
联盟[
字符串,
操作系统.PathLike
]
""
返回用于加载检查点的检查点 ID。
"文档"
返回
我.
路径
@classmethod
定义
验证校验点 ID(
类,
检查点 ID:
联盟[
字符串,
操作系统.PathLike
]\) ->
布尔:
返回
文件系统.
验证校验点 ID(
检查点 ID)
[文档]
类
文件系统写入器(
文件系统写入器,
阻塞异步分步器):
""
基本实现使用文件 I/O 的 StorageWriter。
本实现做出以下假设和简化:
检查点路径是一个空目录或不存在目录。
文件创建是原子的
检查点由每个写请求的一个文件组成
以及一个包含序列化元数据的 `.metadata` 文件。
"文档"
定义
初始化(
我,
路径:
联盟[
字符串,
操作系统.PathLike
],
单文件每秩:
布尔类型 = True,
同步文件:
布尔类型 = True,
线程数:
整型 = 1,
每线程预取:
整型 = 10_000_000,
缓存阶段状态字典:
布尔类型 =
错误,
覆盖:
布尔类型 = True,
_扩展:
可选[
序列[
流转换扩展]] =
无,
) -> 无:
""
初始化指向 `path` 的写入器。
参数:
检查点将被写入的目录路径。
single_file_per_rank: 每个 rank 生成一个文件,而不是每个 tensor/blob 生成一个文件。默认为 True。
sync_files: 强制将文件同步到永久存储。默认为 True。
thread_count: 用于写入的 IO 线程数。默认为 1。
每线程预复制:在保存前从 GPU 复制多少字节。默认 10MB。
缓存阶段状态字典:是否缓存阶段状态字典。此选项降低阶段延迟
以增加内存使用为代价。此外,如果将此参数设置为 True,则预期
该 stager 被维护并重复用于多个 dcp.async_save 调用。默认为 False。
覆盖:是否允许覆盖现有检查点。默认为 True。
_extensions:应用于输出流的扩展(实验性)
注意:如果禁用 sync_files,则在失败的情况下无法保证检查点的一致性。
"文档"
_FileSystemWriter.初始化(
我,
路径=
路径,
单个文件每等级=
单个文件每等级,
同步文件=
同步文件,
线程数=
线程数,
每线程预取=
每线程预取,
覆盖=
覆盖,
扩展=
扩展,
)
阻塞异步分阶段器.
初始化(
我,
缓存分阶段状态字典=
缓存分阶段状态字典,
)
[文档] def stage(self, state_dict: 状态字典类型) -> 状态字典类型:
"""AsyncStager.stage 的覆盖"""
# 在异步情况下,状态字典已经在 CPU 上,所以保持这个
# 缓冲区没有意义
self.per_thread_copy_ahead = 0
返回 super().stage(state_dict)