• 文档 >
  • 模块代码 >
  • torch >
  • torch.distributed >
  • torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook
快捷键

torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook 的源代码

# mypy: 允许未类型化定义
导入 记录日志
导入 数学
来自 集合 导入 defaultdict

导入 火炬
导入 torch.distributed 作为 dist
来自 torch.distributed 导入 distributed_c10d
来自 torch.utils._typing_utils 导入 not_none

来自 . 导入 默认钩子 作为 默认


__all__ = ["PowerSGD 状态", "powerSGD 钩子", "批处理 powerSGD 钩子"]

日志记录器 = 记录日志.获取日志记录器(__name__)


def _正交化(矩阵, epsilon=0):
    ""
在正交化一批矩阵时,决定使用 Gram-Schmidt 还是 QR 分解。

QR 分解不支持半精度,但通常在秩大于 2 时速度更快。
```python
# 假设输入文本为:
input_text = '"""'

# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
    # 这里应该调用真实的翻译 API 进行翻译
    # 由于示例中不使用真实的 API,以下为模拟翻译结果
    return text

# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
    断言 长度(矩阵.shape) == 3  矩阵.shape[2]  矩阵.shape[1]

    num_matrices = 矩阵.shape[0]
    排名 = 矩阵.shape[2]
    dtype = 矩阵.dtype
    如果 排名  2  dtype  [火炬.float16, 火炬.bfloat16]:
        _orthogonalize_gram_schmidt(矩阵, epsilon=epsilon)
    否则:
        火炬.线性代数.二维码(
            矩阵,
            输出=(
                矩阵,
                火炬.空的(
                    矩阵数量, 排名, 排名, 设备=矩阵.设备, 数据类型=dtype
                ),
            ),
        )


def _正交化_gram_schmidt(矩阵, epsilon=0):
    ""
应用 Gram-Schmidt 过程对一批矩阵进行正交化。

如果 epsilon 为 0,则此操作等价于`torch.qr(matrices, out=(matrices, _))`,
```python
# 假设输入文本为:
input_text = '"""'

# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
    # 这里应该调用真实的翻译 API 进行翻译
    # 由于示例中不使用真实的 API,以下为模拟翻译结果
    return text

# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```
    列数 = 矩阵.shape[2]
     i  范围(列数):
        标准化第 i 列。
         = 矩阵[:, :, i : i + 1]
        如果这里不加 epsilon,可能会因为梯度消失而导致除以零。
        如果输入的矩阵批次覆盖了至少一个完整层的梯度,则不需要这个 epsilon。
        在神经网络中。
        如果 epsilon == 0:
            注意,如果使用 FP16,列**2 可能会下溢/上溢。
            可能需要考虑乘以一个缩放因子并在之后除以它,或者使用 bfloat16。
            try:
                 /= 火炬.归一化(, 暗淡=1, 保持维度=True)
            除了 ZeroDivisionError:
                记录器.错误(
                    "要正交化的矩阵至少有一列全为 0。请设置一个小的值,例如 1e-8。"
                    "在 PowerSGD 状态中设置为`orthogonalization_epsilon`。"
                )
                从 NaN 恢复到 0s 的值。
                .填充_(0.0)
        否则:
             /= 火炬.归一化(, 暗淡=1, 保持维度=True) + epsilon
        将其投影到其余部分并移除。
        如果 i + 1 < 列数:
            剩余 = 矩阵[:, :, i + 1 ]
            剩余 -= 火炬.总和( * 剩余, 暗淡=1, 保持维度=True) * 


def _应该压缩(
    行数, 列数, 矩阵逼近秩, 最小压缩率
):
    ""
推荐如果给定的张量值得压缩。

返回一个建议,即根据参数描述的 2D 张量是否值得压缩。
包括描述压缩预期节省的统计数据。我们考虑当``min_compression_rate``小于未压缩大小与压缩大小的比值时进行压缩。
当``min_compression_rate``小于未压缩大小除以压缩大小时,我们进行压缩。
未压缩大小等于``num_rows``乘以``num_cols``。
压缩大小等于(``num_rows``加``num_cols``)乘以``matrix_approximation_rank``。

这个函数的结果是一个元组,形式为(compression_recommendation, uncompressed_el_count, compressed_el_count),其中:

当张量值得压缩时,compression_recommendation 为 true,否则为 false(见上文);

uncompressed_el_count 是未压缩的元素数量,即 ``num_rows`` * ``num_cols``;并且,

compress_el_count 是压缩后的元素数量,即(``num_rows`` + ``num_cols``) * ``matrix_approximation_rank``。
```python
# 假设输入文本为:
input_text = '"""'

# 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API)
def translate_to_simplified_chinese(text):
    # 这里应该调用真实的翻译 API 进行翻译
    # 由于示例中不使用真实的 API,以下为模拟翻译结果
    return text

# 输出翻译结果
translated_text = translate_to_simplified_chinese(input_text)
print(translated_text)
```  # 无需注意:B950
    解压缩大小 = 行数 * 列数
    压缩大小 = (行数 + 列数) * 矩阵逼近秩
    返回 (
        压缩大小 * 最小压缩率 < 解压缩大小,
        解压缩大小,
        压缩大小,
    )


def _报告压缩统计(, 状态):
    "报告压缩统计信息,频率为 PowerSGD 状态中指定的`compression_stats_logging_frequency`。"
    如果 .是最后一个()  状态.迭代 >= 状态.下一个统计报告:
        统计 = 状态.压缩统计()
        记录器.信息(
            "压缩统计:迭代"%s压缩前总数%s压缩后总数%s,"
            比率%s",
            状态.迭代,
            统计[1]
            统计[2]
            统计[0]
        )
        状态.下一个统计报告 = 状态.迭代 + 状态.压缩统计日志频率


[文档] PowerSGD 状态: r"" 在训练过程中存储算法的超参数和所有梯度的内部状态。 尤其是矩阵近似秩(matrix_approximation_rank)和 start_powerSGD_iter 是用户应该调整的主要超参数。 为了性能,我们建议将二进制超参数 `use_error_feedback` 和 `warm_start` 保持开启状态。 1. `matrix_approximation_rank` 控制压缩低秩张量的尺寸,这决定了压缩率。秩越低,压缩效果越强。 1.1. 如果 `matrix_approximation_rank` 设置得太低,模型的全局质量需要更多的训练步骤才能达到,或者永远无法达到,从而造成精度损失。 1.2. 增加 `matrix_approximation_rank` 会导致压缩的计算成本显著增加,并且精度可能不会超过某个 `matrix_approximation_rank` 阈值而进一步改善。 调整 `matrix_approximation_rank` 时,我们建议从 1 开始,以 2 的倍数(如指数网格搜索,1,2,4,...)增加,直到达到令人满意的精度。通常只使用 1-4 这么小的值。对于某些 NLP 任务(如原文附录 D 所示),这个值已增加到 32。 2. `start_powerSGD_iter` 将 PowerSGD 压缩延迟到 `start_powerSGD_iter` 步骤,并在 `start_powerSGD_iter` 步骤之前运行 vanilla allreduce。这种 **vanilla allreduce + PowerSGD** 的混合方案可以有效地提高精度,即使使用相对较小的 `matrix_approximation_rank`。这是因为训练阶段的开始通常对不精确的梯度非常敏感,过早地压缩梯度可能会导致训练快速进入次优轨迹,这可能会对精度产生不可恢复的影响。 调整 `start_powerSGD_iter` 时,我们建议从总训练步骤的 10% 开始,并增加它,直到达到令人满意的精度。如果训练中有预热阶段,`start_powerSGD_iter` 通常不应少于预热步骤的数量。 ``min_compression_rate`` 是压缩层时所需的最低压缩率。由于压缩带来的计算开销,只有当带宽可以节省足够时,张量才值得压缩,即 ``(num_rows + num_cols) * matrix_approximation_rank * min_compression_rate < num_rows * num_cols``。如果无法满足指定的压缩率阈值,张量将直接进行 allreduced 操作而不进行压缩。 PowerSGD 压缩开始后,每 ``compression_stats_logging_frequency`` 次迭代会记录压缩统计信息。 4. ``orthogonalization_epsilon`` 可以是一个非常小的值(例如,1e-8),在正交化步骤中添加到每个归一化的矩阵列中,以防止任何列全为 0 时的除以零错误。如果已经可以防止这种情况(例如,通过批量归一化),则建议使用 0 作为 epsilon 以提高精度。 ``batch_tensors_with_same_shape`` 控制是否在批处理操作中压缩和解压缩形状相同的张量以实现更高的并行性。请注意,您还应该增加桶的大小(即 DDP 构造函数中的 ``bucket_cap_mb`` 参数),以便更多相同形状的张量出现在同一个桶中。然而,这可能会减少计算和通信之间的重叠,并由于堆叠相同形状的张量而增加内存占用。如果压缩/解压缩计算是瓶颈,则将其设置为 ``True``。 ..警告:: 如果启用错误反馈或预热,DDP 中允许的 ``start_powerSGD_iter`` 的最小值为 2。 这是因为 DDP 中还有一个内部优化,在迭代 1 重建桶, 并且这可能会与重建过程之前记忆的任何张量发生冲突。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` # 无需注意:B950 __slots__ = [ "进程组", # 以下字段通常是用户需要调整的超参数。 矩阵逼近秩, start_powerSGD_iter, # 以下字段通常是用户很少需要调整的超参数。 "最小压缩率", "正交化容差", # 以下字段是推荐的二进制超参数,开启以提高性能和准确性。 "使用错误反馈", warm_start, 批量处理相同形状的张量, # 以下字段为内部状态。 rng, "错误字典", "p 内存字典", "q 内存字典", "迭代", 以下字段用于记录压缩统计信息。 "压缩前元素总数", 总压缩后元素数, 压缩统计日志频率, 下一个统计报告, ] def 初始化( 自身, 进程组, 矩阵逼近秩=1, 开始 powerSGD 迭代=1_000, 最小压缩率=2, 使用错误反馈=True, warm_start=True, 正交化容差=0, random_seed=0, compression_stats_logging_frequency=10_000, 批量处理形状相同的张量: 布尔类型 = 错误, ): 记录器.信息( "PowerSGD 配置:矩阵近似秩="%s; start_powerSGD_iter = %s; " "min_compression_rate = %s; orthogonalization_epsilon = %s使用错误反馈%swarm_start =%s输入文本翻译为简体中文为:"; " random_seed =%s; 压缩统计日志频率 =%s; 批处理相同形状的张量 =%s", 矩阵逼近秩, start_powerSGD_iter, 最小压缩率, 正交化容差, 使用错误反馈, 暖启动, 随机种子, 压缩统计日志频率, 批量处理形状相同的张量, ) 自身.流程组 = 流程组 自身.矩阵逼近秩 = 矩阵逼近秩 # 将 PowerSGD 压缩延迟到'start_powerSGD_iter'步骤可以有两个优点: 1) 结果表明,PowerSGD 可能会导致非平凡的精度损失, 即使矩阵逼近的秩增加到很大的值。 减轻精度损失,一种简单而有效的方法是混合 vanilla allreduce 使用 PowerSGD 进行(或更保守的压缩,如 FP16 压缩) 2) 在 DDP 中重建桶的过程有内部优化, 以节省内存空间。 此步骤在第一次迭代之后进行。 然而,这意味着输入桶化张量的形状可能会发生变化, 这将使错误反馈和预热实现的复杂化。 在前几次迭代中运行 vanilla allreduce 可以避免这种复杂性。 如果 (使用错误反馈 warm_start) start_powerSGD_iter 1: 提升 ValueError( 如果启用 `use_error_feedback` 或 `warm_start`,则 `start_powerSGD_iter` 应大于 1, 因为 PowerSGD 只能在 DDP 的前两次迭代之后应用。 ) 自身.start_powerSGD_iter = start_powerSGD_iter 自身.最小压缩率 = 最小压缩率 # 错误反馈通常对收敛和泛化都至关重要, # 因为 PowerSGD 是一个有偏压缩器, 压缩和解压缩随机梯度在期望中并不产生原始值。 此机制需要一个输入梯度的临时副本, 因此,它增加了峰值内存消耗,其大小等于梯度张量的大小。 然而,如果目标矩阵已知确实是精确低秩的(而不仅仅是低稳定秩), 有时可以在没有错误反馈的情况下收敛到最优解。 参见:http://proceedings.mlr.press/v54/yurtsever17a/yurtsever17a.pdf 自身.使用错误反馈 = 使用错误反馈 # 从前一次迭代中重用 P(s) 和 Q(s)。 # 这可以提高近似质量,从而提高准确性。 # 此外,通过避免在每一步初始化这些低秩张量, # 这也可以加速训练。 然而,这会以额外的内存为代价。 自身.热启动 = 热启动 可以使用一个非常小的值来防止由于梯度消失的正交化引起的除以零错误。 自身.正交化 epsilon = 正交化 epsilon 此 RNG 的目的是在迭代中为初始化 Q 生成不同的随机种子, 但对于所有 DDP 副本来说顺序相同。 不同迭代中的不同随机种子表示不同 SGD 步骤的梯度不同投影。 如果使用相同的随机投影, 永远不会同步的梯度之间的差异。 导入 numpy 作为 np 自身.rng = numpy.随机.RandomState(随机种子) # 由于所有输入桶只有一个状态实例, # 需要维护一个映射,将每个桶索引映射到局部错误。 自身.错误字典: 字典[int, 火炬.张量] = {} 自身.p_memory_dict: 字典[int, 火炬.张量] = {} 自身.q_memory_dict: 字典[int, 火炬.张量] = {} 训练循环中的迭代/步骤。 自身.迭代 = 0 压缩统计累计器 自身.压缩前元素总数 = 0 自身.压缩后元素总数 = 0 我们将在每 'compression_stats_logging_frequency' 次迭代后报告压缩统计信息 我们始终至少报告一次压缩统计信息。 自身.压缩统计信息记录频率 = 最大值( 1, 压缩统计信息记录频率 ) 自身.下次统计报告 = 0 批量处理形状相同的张量可以提高压缩/解压缩计算的并行性。 这需要更大的桶大小,以便使更多相同形状的张量出现在一个桶中,然而 这可能会减少计算与通信之间的重叠,并增加由于堆叠张量而造成的内存占用 # 如果压缩/解压缩计算是瓶颈,请开启。 自身.批量处理形状相同的张量 = 批量处理形状相同的张量
[文档] def __getstate__(self): r""" 返回一个 ``Dict[str, Any]``,该字典将被序列化并保存。 ``process_group`` 不可序列化,已被排除。 返回的状态中。 "``" logger.warning() "注意:进程组不可序列化,并从保存的状态中排除。" ) return { slot: getattr(self, slot) for slot in self.__slots__ if slot != "process_group" }
[文档] def __setstate__(self, state): r""" 接收提供的 ``state`` 并将其设置到这个 ``PowerSGDState`` 实例中。 ``process_group`` 设置为默认。 """ self.process_group = distributed_c10d._get_default_group() logger.warning( "注意:进程组将设置为默认组(即世界大小)。" 如果需要不同的组,请在加载 PowerSGD 状态后设置`self.process_group`。 ) for slot, value in state.items(): setattr(self, slot, value)
def maybe_increase_iter(自身
, ): """跟踪迭代并在本地 SGD 开始时触发日志消息。""" # 由于 bucket 0 是迭代中最后一个进行 allreduce 的 bucket。 仅在处理桶 0 时增加`iter`。 如果 .是最后一个(): 自身.iter += 1 如果 自身.iter == 自身.start_powerSGD_iter: 记录器.信息("在迭代后开始应用 PowerSGD"%s迭代次数。, 自身.迭代) def 压缩统计信息(自身): r"" 返回最新的压缩统计信息作为元组。 返回形式为 (压缩率, 压缩前元素数量, 压缩后元素数量) 的元组,其中: 压缩率是有效压缩率,即 (压缩前元素数量) / (压缩后元素数量); numel_before_compression 是应用压缩前的总元素数量;以及, numel_after_compression 是应用压缩后的总元素数量。 ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` # 无需注意:B950 压缩率 = ( 自身.压缩前元素总数 / 自身.压缩后元素总数 如果 自身.压缩后元素总数 > 0 否则 0 ) 返回 ( 压缩率, 自身.压缩前元素总数, 自身.压缩后元素总数, )
[文档]def powerSGD 钩子( 状态: PowerSGDState, : 距离.毕业桶 ) -> 火炬.期货.未来[火炬.张量]: r"" 实现 PowerSGD 算法。 此 DDP 通信钩子实现了 PowerSGD 梯度压缩 算法描述在《论文 》中。 当所有工作进程中的梯度张量汇总后,此钩子按照以下方式进行压缩: 压缩方法如下: 1. 将输入的展平 1D 梯度张量视为每个参数张量的列表,并将所有张量分为两组: 1.1 在 allreduce 之前应该压缩的张量,因为压缩可以在带宽上节省足够的空间。 1.2 其余的张量将直接进行 allreduce 而不进行压缩,包括所有的向量张量(对于偏置)。 2. 处理未压缩的张量: 2.1. 为这些未压缩的张量分配连续的内存,并将所有未压缩的张量作为一个批次进行 allreduce,不进行压缩; 2.2. 将连续内存中的单个未压缩张量复制回输入张量。 3. 处理应由 PowerSGD 压缩压缩的张量: 3.1. 对于每个张量 M,创建两个低秩张量 P 和 Q 以分解 M, 使得 M = PQ^T,其中 Q 从标准正态分布初始化并正交化; 3.2. 计算 Ps 中的每个 P,等于 MQ; 3.3. 将 Ps 作为一个批次进行 allreduce; 3.4. 对 Ps 中的每个 P 进行正交化; 3.5. 计算 Qs 中的每个 Q,约等于 M^TP; 3.6. 将所有 reduces 作为一批处理; 3.7. 计算所有压缩张量中的每个 M,其值约等于 PQ^T。 注意,此通信钩子在`state.start_powerSGD_iter`次迭代期间强制执行 vanilla allreduce。 这不仅给用户提供了更多控制速度提升和精度之间的权衡的能力, 但也有助于抽象化 DDP 内部优化的一些复杂性,为未来的通信钩子开发者提供便利。 参数: 状态(PowerSGDState):配置压缩率和支持错误反馈、热启动等信息的状态信息。 调整压缩配置,主要需要调整 `matrix_approximation_rank`、`start_powerSGD_iter` 和 `min_compression_rate`。 bucket (dist.GradBucket):存储多个变量张量批次的 1D 展平梯度张量的桶。 注意,由于 DDP 通信钩子仅支持单进程单设备模式, 因此这个桶中只存储一个张量。 返回: 未来通信处理器,用于就地更新梯度。 示例:: >>> # xdoctest: +SKIP >>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1, start_powerSGD_iter=10, min_compression_rate=0.5) >>> ddp_model.register_comm_hook(state, powerSGD_hook) ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` # 无需注意:B950 流程组 = 状态.流程组 要使用的组 = ( 流程组 如果 流程组 not None 否则 非空(距离.群组.世界) ) 世界大小 = 使用组.尺寸() # 输入张量是一个展平的 1 维张量。 输入张量 = .缓冲区() 在前`start_powerSGD_iter`次迭代中运行纯 allreduce。 如果 状态.迭代 < 状态.start_powerSGD_iter: 状态.可能增加迭代() 返回 默认._allreduce_fut(使用组, 输入张量) 在 `start_powerSGD_iter` 迭代后应用 PowerSGD。 设备 = 输入张量.设备 dtype = 输入张量.dtype 将前一个状态中的误差纳入梯度。 bucket_index = .索引() input_tensor_cp = None 总长度 = 输入张量.shape[0] 如果 状态.使用错误反馈: 如果 bucket_index 状态.错误字典: 输入张量.加_(状态.错误字典[桶索引]) 否则: 记录器.信息( "一个长度为零的张量"%s表示本地错误的创建。", 总长度, ) 状态.error_dict[桶索引] = 火炬.( 总长度, 设备=设备, 数据类型=dtype ) # 保留输入张量的副本, # 以便我们稍后计算由压缩引起的局部误差, 通过比较此副本和解压缩后的输入张量。 input_tensor_cp = 火炬.克隆(输入张量).detach() 将输入张量展开成每个参数的张量,以进行逐层压缩。 张量 = .渐变() 步骤 I:将所有张量分为两组, # 一个将在 allreduce 之前被压缩,另一个将直接进行 allreduce 而不压缩。 要压缩的张量, 未压缩的张量 = [] [] 总的 Ps 大小 = 0 总问题数量 = 0 张量 张量: 矩阵 = 张量.视图(张量.shape[0] -1) n, m = 矩阵.形状 矩阵逼近秩 = 最小(n, m, 状态.矩阵逼近秩) 压缩测试 = _应该压缩( n, m, 矩阵逼近秩, 状态.最小压缩率 ) 状态.压缩前元素总数 += 压缩测试[1] 如果 压缩测试[0]: 待压缩的张量.追加(矩阵) 总 Ps 大小 += n * 矩阵逼近秩 总 Qs 大小 += m * 矩阵逼近秩 状态.压缩后元素总数 += 压缩测试[2] 否则: 未压缩张量.追加(张量) 状态.压缩后元素总数 += 压缩测试[1] _报告压缩统计信息(, 状态) # 步骤 II:处理未压缩张量。 为这些张量分配连续内存以高效地进行 allreduce。 未压缩的张量内存 = ( 火炬.([张量.视图(-1) 张量 解压缩张量]) 如果 解压缩张量 否则 火炬.张量([], 设备=设备, 数据类型=数据类型) ) # 第 III 步:处理需要压缩的张量。 # 为 Ps 和 Qs 分配连续内存以高效地进行 allreduce。 # 如果启用预热启动,尽可能重用前一次迭代的 Ps 和 Qs。 # 当应用 PowerSGD 时,需要在第一次迭代中为 Ps 和 Qs 分配内存空间。 需要随机化查询字符串 = 如果 not 状态.热启动 bucket_index not 状态.物理内存字典: 需要随机化查询字符串 = 真实 如果禁用 warm-start,低秩张量将在每一步初始化。 仅在启用 warm-start 时记录,以避免垃圾信息。 如果 状态.warm_start: 记录器.信息( "分配长度为的连续内存"%s对于 Ps,长度为%s对于 Qs,分别。", total_Ps_size, total_Qs_size, ) 状态.p_memory_dict[桶索引] = 火炬.空的( total_Ps_size, 设备=设备, 数据类型=dtype ) 状态.q_memory_dict[桶索引] = 火炬.空的( 总问题大小, 设备=设备, 数据类型=dtype ) # 批量按形状压缩张量。 形状到张量 = defaultdict(列表) 张量 可压缩的张量: 形状到张量[张量.shape].追加(张量) # 该函数根据参数决定是否将形状相同的张量批量处理, # 因此以下过程可以共享相同的代码。 def 可能批量化的待压缩张量(): 张量 形状转换为张量.(): 如果 状态.批量处理形状相同的张量: 批处理大小 = 长度(张量) 如果 批处理大小 == 1: 使用原始张量以避免复制。 产生 张量[0].展平(0) 否则: 产生 火炬.(张量) 否则: 张量 张量: 产生 张量.展平(0) 创建指向已分配内存的 Ps 和 Qs。 可压缩张量 = [] ps = [] qs = [] p 索引 = 0 q_idx = 0 张量 可能批量的可压缩张量(): 批处理大小, n, m = 张量.形状 矩阵逼近秩 = 最小(n, m, 状态.矩阵逼近秩) 压缩张量.追加(张量) ps.追加( 状态.P 内存字典[桶索引] p_idx : p_idx + 批处理大小 * n * 矩阵逼近秩 ].视图(批处理大小, n, 矩阵逼近秩) ) 查询语句.追加( 状态.内存字典[桶索引] q_idx : q_idx + 批处理大小 * m * 矩阵逼近秩 ].视图(批处理大小, m, 矩阵逼近秩) ) p 索引 += 批处理大小 * n * 矩阵逼近秩 q_idx += 批处理大小 * m * 矩阵逼近秩 如果启用 warm-start,尽可能重用前一次迭代的 Qs,并跳过填充随机值。 例外情况是第一次迭代应用 PowerSGD 时。 如果 not 需要随机化 Qs: q qs: 正交化(q, 状态.正交化容差) 否则: 火炬.随机.fork_rng(设备=[]): # 避免全局更改种子,影响训练中其他地方的随机采样,请从该 RNG 分叉。 # 种子确保所有 DDP 副本的初始随机值相同。 每一步都应该有不同的种子。 由于在所有 CUDA 设备之间跨接 RNG 状态非常缓慢, 仅在 CPU 上分叉,然后将生成的张量移动到 CUDA 设备(通过覆盖 q)。 火炬.手动播种(状态.随机数生成器.随机整数(1_000_000_000)) q qs: q.复制_( 火炬.randn( *q.shape, 设备="cpu", 数据类型=数据类型, ) ) 正交化(q, 状态.正交化容差) 计算 Ps. 张量, q, p 压缩(张量压缩, 求解子空间, ps): 火炬.bmm(张量, q, 输出=p) 此 allreduce 仅应用于未压缩的张量, 因此它应该在上述压缩张量计算之前启动,以隐藏更多的通信成本。 然而,这在此刻需要单独的 future 链。 allreduce_contiguous_uncompressed_tensors_fut = 距离.all_reduce( 未压缩张量内存, 群组=使用组, async_op=真实 ).获取未来() def 解包未压缩张量并进行全量参数服务器同步(fut): 未压缩张量内存 = fut.()[0].div_(世界大小) 索引 = 0 张量 解压缩张量: 张量.复制_( 未压缩张量内存[索引 : 索引 + 张量.元素数量空括号].以查看方式(张量) ) 索引 += 张量.元素数量() 由于这些 Ps 将在之后进行正交化,无需除以世界大小。 返回 ( 距离.all_reduce( 状态.p_memory_dict[桶索引] 群组=使用组, async_op=真实 ) .获取未来() .等待()[0] ) def 计算查询(fut): 状态.p_memory_dict[桶索引] = fut.() p ps: 正交化(p, 状态.正交化容差) # 计算查询。 张量, p, q 压缩(可压缩的张量, ps, qs): 火炬.bmm(张量.转置(1, 2), p, 输出=q) # TODO:上述过程每迭代执行两次矩阵乘法+allreduce 步骤-- 一次左乘和一次右乘。 # 对于热启动,可以一次执行这样的一个步骤,并在它们之间交替。 # 全局同步 Qs. 返回 ( 距离.all_reduce( 状态.q_memory_dict[桶索引] 群组=使用组, async_op=真实 ) .获取未来() .等待()[0] ) def 解压缩(fut): 状态.q_memory_dict[桶索引] = fut.().div_(世界大小) p, q, 张量 压缩(ps, qs, 可压缩的张量): 火炬.bmm(p, q.转置(1, 2), 输出=张量) 将批处理张量复制回原始缓冲区。 如果 状态.具有相同形状的批处理张量: 张量 张量压缩: 如果 张量.shape[0] == 1: 跳过 batch_size 等于 1 的张量,因为它本身是原始张量。 继续 原始张量 = 形状到张量[张量.shape[1]] i, 原始张量 列举(原始张量): 原始张量.复制_(张量[i]) 如果 火炬.cuda.是否可用(): 火炬.cuda.同步(设备) 如果 状态.使用错误反馈: 记忆本地错误。 状态.错误字典[桶索引] = 输入张量_cp - 输入张量 如果 not 状态.预热启动: 状态.p_memory_dict.清晰() 状态.q_memory_dict.清晰() 状态.可能增加迭代次数() 返回 输入张量 返回 ( allreduce 连续未压缩张量的未来.然后( 解包未压缩张量并进行 allreduce 并行同步 ) .然后(计算查询) .然后(解压缩) )
[文档]def 批处理 PowerSGD 钩子( 状态: PowerSGD 状态, : 距离.毕业桶 ) -> 火炬.期货.未来[火炬.张量]: r"" 实现简化版 PowerSGD 算法。 此 DDP 通信钩子实现了一个简化的 PowerSGD 梯度压缩 算法描述在《论文 》中。 此变体不会逐层压缩梯度层 但是它压缩了所有梯度批量的展平输入张量。 因此,它比 :meth:`powerSGD_hook` **更快**。 但通常会导致 **精度更低**,除非 `matrix_approximation_rank` 为 1。 ..警告:: 增加这里的 ``matrix_approximation_rank`` 并不一定能提高准确性, 因为在不进行列/行对齐的情况下批处理每个参数的张量可能会破坏低秩结构。 因此,用户应首先考虑 :meth:`powerSGD_hook`, 并且只有当 ``matrix_approximation_rank`` 为 1 时能够达到令人满意的准确性时,才考虑这个变体。 当所有工作进程中的梯度张量汇总后,此钩子按照以下方式进行压缩: 压缩方法如下: 1. 将输入的展平 1D 梯度张量视为一个没有填充的方形张量 M; 2. 创建两个低秩张量 P 和 Q 来分解 M,使得 M = PQ^T,其中 Q 从标准正态分布初始化并正交化; 3. 计算 P,P 等于 MQ; 4. 全局减少 P; 5. 正交化 P; 6. 计算 Q,Q 约等于 M^TP; 7. 全局减少 Q; 8. 计算 M,M 约等于 PQ^T。 9. 截断输入张量至原始长度。 注意,此通信钩子在第一个 `state.start_powerSGD_iter` 迭代中强制执行纯 allreduce。 这不仅让用户对速度提升和精度之间的权衡有更多的控制, 还有助于抽象化 DDP 内部优化的一些复杂性,以供未来的通信钩子开发者使用。 参数: state (PowerSGDState):配置压缩率和支持错误反馈、预热启动等信息的状态。 调整压缩配置主要需要调整 `matrix_approximation_rank` 和 `start_powerSGD_iter`。 bucket (dist.GradBucket):存储多个按变量张量批量的 1D 展平梯度张量的桶。 注意,由于 DDP 通信钩子仅支持单进程单设备模式, 这个桶中只存储了一个张量。 返回: 未来通信处理器,用于就地更新梯度。 示例:: >>> # xdoctest: +SKIP >>> 状态 = PowerSGDState(process_group=process_group, matrix_approximation_rank=1) >>> ddp_model.register_comm_hook(状态, batched_powerSGD_hook) ```python # 假设输入文本为: input_text = '"""' # 翻译函数(此处仅为示例,实际翻译功能需要调用真实的翻译 API) def translate_to_simplified_chinese(text): # 这里应该调用真实的翻译 API 进行翻译 # 由于示例中不使用真实的 API,以下为模拟翻译结果 return text # 输出翻译结果 translated_text = translate_to_simplified_chinese(input_text) print(translated_text) ``` # 无需注意:B950 流程组 = 状态.流程组 要使用的组 = ( 流程组 如果 流程组 not None 否则 非空(距离.群组.世界) ) 世界大小 = 使用组.尺寸() 输入张量是一个展平的 1D 张量。 输入张量 = .缓冲区() 在前`start_powerSGD_iter`次迭代中运行 vanilla allreduce。 如果 状态.迭代 < 状态.start_powerSGD_iter: 状态.可能增加迭代次数() 返回 默认._allreduce_fut(使用组, 输入张量) 在 `start_powerSGD_iter` 次迭代后应用 PowerSGD。 设备 = 输入张量.设备 总长度 = 输入张量.shape[0] 状态.压缩前元素总数 += 总长度 将输入张量视为二维正方形张量,并在必要时填充 0。 正方形边长 = 数学.向上取整(数学.平方根(总长度)) 状态.压缩后元素总数 += ( 正方形边长 * 状态.矩阵逼近秩 * 2 ) 填充总长度 = 正方形边长**2 输入张量.调整大小(填充总长度) 输入张量[总长度:填充总长度].填充_(0) _报告压缩统计信息(, 状态) 将前一个状态中的错误融入梯度。 bucket_index = .索引() 输入张量_cp = None 如果 状态.使用错误反馈: 如果 bucket_index 状态.错误字典: 输入张量.加_(状态.错误字典[桶索引]) 否则: 记录器.信息( "长度为零的张量,表示局部错误。"%s创建表示局部错误的长度为零的张量。, 填充总长度, ) 状态.错误字典[桶索引] = 火炬.( 填充总长度, 设备=设备, 数据类型=输入张量.dtype ) 保持输入张量的副本, 以便我们稍后计算压缩引起的局部误差, 通过比较这个副本和解压缩后更新的输入张量。 input_tensor_cp = 火炬.克隆(输入张量).detach() 矩阵 = 输入张量.视图(边长平方, 边长) 如果可能,请重用前一次迭代的 P 和 Q。 当应用 PowerSGD 时,P 和 Q 的内存空间需要在第一次迭代中分配。 如果 not 状态.热启动 bucket_index not 状态.物理内存字典: 如果禁用预热启动,低秩张量将在每一步初始化。 仅在热启动时记录,以避免垃圾信息。 如果 状态.warm_start: 记录器.信息( 初始化低秩张量 P 和 Q,它们的大小均为%s x %s.", 正方形边长, 状态.矩阵逼近秩, ) def 创建低秩张量(填充随机值, 随机数生成器): 返回一个低秩二维张量,其边长为 square_side_length,矩阵逼近秩为 matrix_approximation_rank。 如果 填充随机值: 火炬.随机.fork_rng(设备=[]): # 避免全局更改种子并影响随机采样,请在此处分叉 RNG。 # 在训练的其他任何地方。 # 种子确保所有 DDP 副本的初始随机值相同。 每一步都应该有不同的种子。 由于在所有 CUDA 设备之间跨接 RNG 状态非常缓慢, # 只在 CPU 上分叉,然后将生成的张量移动到 CUDA 设备。 火炬.手动播种(随机数生成器.随机整数(1_000_000_000)) 返回 火炬.randn( 正方形边长, 状态.矩阵逼近秩, 设备="cpu", 数据类型=输入张量.数据类型, ).(设备) 否则: 返回 火炬.空的( 正方形边长, 状态.矩阵逼近秩, 设备=设备, 数据类型=输入张量.数据类型, ) 状态.物理内存字典[桶索引] = 创建低秩张量( 填充随机值=错误, 随机数生成器=状态.随机数生成器 ) 状态.q_memory_dict[桶索引] = 创建低秩张量( 填充随机值=True, 随机数生成器=状态.随机数生成器 ) 正交化(状态.q_memory_dict[桶索引]) 火炬.矩阵乘法( 矩阵, 状态.q_memory_dict[桶索引] 输出=状态.物理内存字典[桶索引] ) allreduce_p_fut = 距离.all_reduce( 状态.物理内存字典[桶索引] 群组=使用组, async_op=真实 ).获取未来() def compute_q(fut): 状态.物理内存字典[桶索引] = fut.()[0] 正交化(状态.物理内存字典[桶索引]) 火炬.矩阵乘法( 矩阵.t(), 状态.物理内存字典[桶索引] 输出=状态.q_memory_dict[桶索引] ) # TODO: 上述过程每次迭代执行两次矩阵乘法和 allreduce 操作 # 一次左乘和一次右乘。 # 对于预热启动,可以一次执行此类步骤中的一个,并在它们之间交替。 返回 ( 距离.all_reduce( 状态.q_memory_dict[桶索引] 群组=使用组, async_op=真实 ) .获取未来() .等待()[0] ) def 解压缩(fut): 状态.q_memory_dict[桶索引] = fut.().div_(世界大小) 火炬.矩阵乘法( 状态.物理内存字典[桶索引] 状态.q_memory_dict[桶索引].t(), 输出=矩阵, ) 如果 状态.使用错误反馈: 记忆本地错误。 状态.错误字典[桶索引] = 输入张量_cp - 输入张量 # 移除这个看似不必要的同步可能会引起失败。 # 参考:https://github.com/pytorch/pytorch/pull/54838 如果 火炬.cuda.是否可用(): 火炬.cuda.同步(设备) 如果 not 状态.预热启动: 状态.物理内存字典.清晰() 状态.q_memory_dict.清晰() 返回 = 输入张量.调整大小(总长度) 状态.可能增加迭代次数() 返回 返回 返回 allreduce_p_fut.然后(计算 q).然后(解压缩)

© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

查看 PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源,获取您的疑问解答

查看资源