DDP 通信钩子 ¶
DDP 通信钩子是一个通用的接口,用于通过覆盖 DistributedDataParallel 中的 vanilla allreduce 来控制如何在工作者之间通信梯度。提供了一些内置的通信钩子,用户可以轻松地将这些钩子之一应用于优化通信。此外,钩子接口还可以支持用户定义的通信策略,以适应更高级的使用场景。
如何使用通信钩子? ¶
使用通信钩子,用户只需在训练循环之前让 DDP 模型注册钩子即可。
torch.nn.parallel.DistributedDataParallel.register_comm_hook()
通信钩子操作的是什么? ¶
通信钩子提供了一种灵活的 allreduce 梯度的方式。因此,它主要在 allreduce 之前操作每个副本上的梯度,这些梯度被分桶以提高通信和计算的重叠。特别是, torch.distributed.GradBucket
代表要 allreduce 的梯度张量桶。
- class torch.distributed.GradBucket¶
本类主要传递一个扁平化的梯度张量(由
buffer()
返回)到 DDP 通信钩子。此张量可以进一步分解为该桶内每个参数的张量列表(由get_per_parameter_tensors()
返回),以应用逐层操作。
- torch.distributed.GradBucket.index(self:torch._C._distributed_c10d.GradBucket) → int
警告
由于桶在第一次迭代后重建,不应依赖于训练开始时的索引。
- 返回:
存储几个连续层梯度的桶的索引。所有梯度都已分桶。
- torch.distributed.GradBucket.buffer(self: torch._C._distributed_c10d.GradBucket) torch.Tensor ¶
- 返回:
一个平铺的 1D
torch.Tensor
缓冲区,可以进一步分解为该桶内每个参数的张量列表。
- torch.distributed.GradBucket.gradients(self: torch._C._distributed_c10d.GradBucket) list[torch.Tensor] ¶
- 返回:
列表中的每个张量都对应一个梯度。
- torch.distributed.GradBucket.is_last(self:torch._C._distributed_c10d.GradBucket) → bool
- 返回:
判断此桶是否是迭代中 allreduce 的最后一个桶。这也意味着此桶对应于前向传播中的前几层。
- torch.distributed.GradBucket.set_buffer(self:torch._C._distributed_c10d.GradBucket, buffer:torch.Tensor) → None
替换桶中的张量为输入张量缓冲区。
-
torch.distributed.GradBucket.parameters(self:torch._C._distributed_c10d.GradBucket) → list[torch.Tensor] ¶ torch.distributed.GradBucket 的参数(self:torch._C._distributed_c10d.GradBucket) → torch.Tensor 列表 - 返回:
列表中的每个张量对应一个模型参数。
默认通信钩子
默认通信钩子是简单的无状态钩子,因此输入状态在 register_comm_hook
中要么是进程组,要么是 None
。输入 bucket
是一个 torch.distributed.GradBucket
对象。
- torch.distributed.algorithms.ddp_comm_hooks.default_hooks.allreduce_hook(process_group, bucket)[source][source]¶
使用
GradBucket
张量调用allreduce
。一旦梯度张量在所有工作者之间聚合,其
then
回调将取平均值并返回结果。如果用户注册此 DDP 通信钩子,DDP 结果应与未注册钩子的情况相同。因此,这不会改变 DDP 的行为,用户可以将此作为参考或修改此钩子以记录有用的信息或任何其他目的,而不会影响 DDP 的行为。
- 示例::
>>> ddp_model.register_comm_hook(process_group, allreduce_hook)
- torch.distributed.algorithms.ddp_comm_hooks.default_hooks.fp16_compress_hook(process_group, bucket)[source][source]
通过将
GradBucket
转换为torch.float16
除以进程组大小进行压缩。此 DDP 通信钩子实现了一种简单的梯度压缩方法,将
GradBucket
张量转换为半精度浮点格式(torch.float16
),然后除以进程组大小。然后对所有float16
梯度张量进行 allreduce 操作。压缩后的梯度张量 allreduce 完成后,链式回调decompress
将其转换回输入数据类型(如float32
)。- 示例::
>>> ddp_model.register_comm_hook(process_group, fp16_compress_hook)
- torch.distributed.algorithms.ddp_comm_hooks.default_hooks.bf16_compress_hook(process_group, bucket)[source][source]¶
警告:此 API 为实验性,需要 NCCL 版本晚于 2.9.6。
此 DDP 通信钩子实现了一种简单的梯度压缩方法,将
GradBucket
张量转换为半精度 Brain 浮点格式(torch.bfloat16
),然后除以进程组大小。它对所有bfloat16
梯度张量进行 allreduce 操作。一旦压缩后的梯度张量完成 allreduce 操作,链式回调decompress
将其转换回输入数据类型(如float32
)。- 示例::
>>> ddp_model.register_comm_hook(process_group, bf16_compress_hook)
- 返回类型:
未来[Tensor]
此外,还提供了一个通信钩子包装器,支持使用 fp16_compress_hook()
或 bf16_compress_hook()
作为包装器,可以与其他通信钩子结合使用。
- torch.distributed.algorithms.ddp_comm_hooks.default_hooks.fp16_compress_wrapper(hook)[source][source]¶
将输入张量转换为
torch.float16
,将钩子返回的结果转换回输入数据类型。此包装将给定 DDP 通信钩子的输入梯度张量转换为半精度浮点格式(
torch.float16
),并将给定钩子的结果张量转换回输入数据类型,例如float32
。因此,fp16_compress_hook
等价于fp16_compress_wrapper(allreduce_hook)
。- 示例::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1, start_powerSGD_iter=10) >>> ddp_model.register_comm_hook(state, fp16_compress_wrapper(powerSGD_hook))
- 返回类型:
Callable[[Any, GradBucket], Future[Tensor]]
- torch.distributed.algorithms.ddp_comm_hooks.default_hooks.bf16_compress_wrapper(hook)[source][source]
警告:此 API 为实验性,需要 NCCL 版本晚于 2.9.6。
此包装器将给定 DDP 通信钩子的输入梯度张量转换为半精度 Brain 浮点格式 `_ (``torch.bfloat16`),并将给定钩子的结果张量转换回输入数据类型,例如
float32
。因此,
bf16_compress_hook
等同于bf16_compress_wrapper(allreduce_hook)
。- 示例::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1, start_powerSGD_iter=10) >>> ddp_model.register_comm_hook(state, bf16_compress_wrapper(powerSGD_hook))
- 返回类型:
Callable[[Any, GradBucket], Future[Tensor]]
PowerSGD 通信钩子 ¶
PowerSGD(Vogels 等人,NeurIPS 2019)是一种梯度压缩算法,可以提供非常高的压缩率并加速带宽受限的分布式训练。该算法需要维护一些超参数和内部状态。因此,PowerSGD 通信钩子是一个有状态的钩子,用户需要提供以下定义的状态对象。
PowerSGD 状态 ¶
- class torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook.PowerSGDState(process_group, matrix_approximation_rank=1, start_powerSGD_iter=1000, min_compression_rate=2, use_error_feedback=True, warm_start=True, orthogonalization_epsilon=0, random_seed=0, compression_stats_logging_frequency=10000, batch_tensors_with_same_shape=False)[source][source]¶
在训练过程中存储算法的超参数和所有梯度的内部状态。
特别是,
matrix_approximation_rank
和start_powerSGD_iter
是用户应该调整的主要超参数。为了性能,我们建议保留二进制超参数use_error_feedback
和warm_start
。matrix_approximation_rank
控制压缩低秩张量的大小,这决定了压缩率。秩越低,压缩越强。1.1. 如果
matrix_approximation_rank
过低,模型的全局质量需要更多的训练步骤才能达到或永远无法达到,从而降低准确率。1.2.
matrix_approximation_rank
的增加会显著增加压缩的计算成本,并且准确率可能不会超过某个matrix_approximation_rank
阈值而进一步改善。
调整
matrix_approximation_rank
,我们建议从 1 开始,以 2 的倍数增加(如指数网格搜索,1,2,4,……),直到达到令人满意的准确率。通常只使用较小的值 1-4。对于某些 NLP 任务(如原文附录 D 所示),此值已增加到 32。start_powerSGD_iter
将 PowerSGD 压缩推迟到步骤start_powerSGD_iter
,而 vanilla allreduce 在步骤start_powerSGD_iter
之前运行。这种 vanilla allreduce + PowerSGD 的混合方案可以有效地提高准确率,即使使用相对较小的matrix_approximation_rank
也能做到。这是因为训练阶段的初期通常对不精确的梯度非常敏感,过早地压缩梯度可能会导致训练快速进入次优轨迹,从而对准确率产生不可恢复的影响。
调整
start_powerSGD_iter
时,建议从总训练步骤的 10%开始,直到达到满意的准确率为止。如果训练中存在预热阶段,start_powerSGD_iter
通常不应少于预热步骤的数量。min_compression_rate
是压缩层时所需的最低压缩率。由于压缩带来的计算开销,只有当可以足够节省带宽时,张量才值得压缩,其中(num_rows + num_cols) * matrix_approximation_rank * min_compression_rate < num_rows * num_cols
。如果无法满足指定的压缩率阈值,张量将直接进行 allreduced 操作而不进行压缩。
PowerSGD 压缩开始后,每
compression_stats_logging_frequency
次迭代记录一次压缩统计信息。orthogonalization_epsilon
可以是一个非常小的值(例如,1e-8),在正交化步骤中添加到每个归一化矩阵列中,以防止任何列全为 0 时的除以零错误。如果已经可以防止这种情况(例如,通过批量归一化),则建议使用 0 作为 epsilon 以提高精度。控制是否在批量操作中对形状相同的张量进行压缩和解压缩以实现更高的并行性。请注意,您还应该增加桶的大小(即,DDP 构造函数中的
bucket_cap_mb
参数),以便更多相同形状的张量出现在同一个桶中,然而这可能会减少计算和通信之间的重叠,并由于堆叠相同形状的张量而增加内存占用。如果压缩/解压缩计算是瓶颈,则设置为True
。
警告
如果启用了错误反馈或预热,DDP 中允许的
start_powerSGD_iter
的最小值是 2。这是因为 DDP 中还有一个内部优化,在迭代 1 重建桶,这可能会与重建过程之前记住的任何张量冲突。
PowerSGD 插件
警告
PowerSGD 通常需要与模型梯度相同大小的额外内存来启用错误反馈,这可以补偿有偏的压缩通信并提高精度。
警告
PowerSGD 钩子可能与 Apex 自动混合精度包冲突。请改用 PyTorch 原生自动混合精度包。
- torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook.powerSGD_hook(state, bucket)[source][source]¶
实现 PowerSGD 算法。
此 DDP 通信钩子实现了论文中描述的 PowerSGD 梯度压缩算法。一旦所有工作进程聚合了梯度张量,此钩子将按以下方式应用压缩:
查看输入的展平 1D 梯度张量作为每个参数张量的列表,并将所有张量分为两组:
1.1 在 allreduce 之前应该压缩的张量,因为压缩可以在带宽上节省足够的空间。
1.2 其余的张量将直接进行 allreduce 而不进行压缩,包括所有向量张量(对于偏置)。
处理未压缩的张量:
为这些未压缩的张量分配连续内存,并将所有未压缩的张量作为一个批次进行 allreduce,而不进行压缩;
将单个未压缩的张量从连续内存复制回输入张量。
处理应由 PowerSGD 压缩压缩的张量:
3.1. 对于每个张量 M,创建两个低秩张量 P 和 Q 来分解 M,使得 M = PQ^T,其中 Q 从标准正态分布初始化并正交化;
3.2. 计算 Ps 中的每个 P,等于 MQ;
3.3. 将 Ps 作为一个批次进行 allreduce;
3.4. 对 Ps 中的每个 P 进行正交化;
3.5. 计算 Qs 中的每个 Q,约等于 M^TP;
3.6. 将所有 reduces 作为一批处理;
3.7. 计算所有压缩张量中的每个 M,其值约等于 PQ^T。
注意,此通信钩子在最初的
state.start_powerSGD_iter
次迭代中强制执行 vanilla allreduce。这不仅使用户能够更好地控制速度提升与精度之间的权衡,还有助于抽象化 DDP 内部优化的一些复杂性,为未来的通信钩子开发者提供帮助。- 参数:
状态(PowerSGDState)- 配置压缩率和支持错误反馈、预热启动等的状态信息。要调整压缩配置,主要需要调整
matrix_approximation_rank
、start_powerSGD_iter
和min_compression_rate
。bucket (dist.GradBucket) – 存储多个变量张量批次的 1D 展平梯度张量的桶。注意,由于 DDP 通信钩子仅支持单进程单设备模式,因此此桶中仅存储一个张量。
- 返回:
通信的将来处理器,用于就地更新梯度。
- 返回类型:
- 示例::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1, start_powerSGD_iter=10, min_compression_rate=0.5) >>> ddp_model.register_comm_hook(state, powerSGD_hook)
- torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook.batched_powerSGD_hook(state, bucket)[source][source]¶
实现简化的 PowerSGD 算法。
此 DDP 通信钩子实现了一种简化的 PowerSGD 梯度压缩算法,该算法在论文中有所描述。这种变体不是逐层压缩梯度,而是压缩所有梯度批量的扁平输入张量。因此,它比
powerSGD_hook()
更快,但通常精度要低得多,除非matrix_approximation_rank
为 1。警告
增加此处
matrix_approximation_rank
不一定能提高精度,因为在不进行列/行对齐的情况下批处理每个参数张量可能会破坏低秩结构。因此,用户应首先考虑powerSGD_hook()
,只有当matrix_approximation_rank
为 1 时才能达到令人满意的精度时才考虑此变体。当所有工作进程中的梯度张量聚合后,此钩子按照以下方式应用压缩:
将输入的展平一维梯度张量视为一个无填充的方形张量 M;
创建两个低秩张量 P 和 Q 来分解 M,使得 M = PQ^T,其中 Q 从标准正态分布初始化并正交化;
计算 P,P 等于 MQ;
全局减少 P
正交化 P
计算 Q,Q 约等于 M^TP
全局减少 Q
计算 M,M 约等于 PQ^T。
截断输入张量至原始长度。
注意,此通信钩子在最初的
state.start_powerSGD_iter
次迭代中强制执行 vanilla allreduce。这不仅给用户提供了更多关于速度提升和精度之间权衡的控制,还有助于抽象化未来通信钩子开发者对 DDP 内部优化的复杂性。- 参数:
状态(PowerSGDState)- 配置压缩率和支持错误反馈、预热启动等的状态信息。要调整压缩配置,主要需要调整
matrix_approximation_rank
和start_powerSGD_iter
。bucket (dist.GradBucket) – 存储多个变量张量批次的 1D 展平梯度张量的桶。注意,由于 DDP 通信钩子仅支持单进程单设备模式,因此此桶中仅存储一个张量。
- 返回:
通信的将来处理器,用于就地更新梯度。
- 返回类型:
- 示例::
>>> state = PowerSGDState(process_group=process_group, matrix_approximation_rank=1) >>> ddp_model.register_comm_hook(state, batched_powerSGD_hook)
调试通信钩子
如其名所示,调试通信钩子仅用于调试和性能优化目的。
警告
调试通信钩子不一定输出正确的结果。
- torch.distributed.algorithms.ddp_comm_hooks.debugging_hooks.noop_hook(_, bucket)[source][source]¶
返回一个包装输入的未来,使其成为一个不执行任何操作且不产生通信开销的空操作。
此钩子应仅用于 allreduce 优化中所有 reduce 分析的头程分析,而不是正常的梯度同步。例如,如果在此钩子注册后只能观察到小于 10%的训练时间加速,通常意味着 allreduce 不是此案例的性能瓶颈。此类仪器在 GPU 跟踪难以获取或跟踪分析复杂(例如,allreduce 与计算的叠加或跨 rank 的解同步等因素)时特别有用。
- 示例::
>>> ddp_model.register_comm_hook(None, noop_hook)
通信钩子的检查点
状态化通信钩子可以作为模型检查点的一部分保存,以实现训练器的重启。为了使钩子可序列化,应定义 __setstate__
和 __getstate__
。
警告
__getstate__
应排除返回字典中的非序列化属性。
警告
__setstate__
应正确初始化从提供的 state
中排除的非序列化属性。
PowerSGDState
已实现 __setstate__
和 __getstate__
并可以作为参考使用。
- class torch.distributed.algorithms.ddp_comm_hooks.powerSGD_hook.PowerSGDState[source][source]
- __setstate__(state)[来源][来源] ¶
将提供的
state
设置为该PowerSGDState
实例。process_group
被设置为默认值。
这是一个简单的、端到端示例,用于保存和重新加载 PowerSGD 状态和钩子。
import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel
from torch.distributed.algorithms.ddp_comm_hooks import powerSGD_hook as powerSGD
class SimpleModel(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(24,24)
self.relu = nn.ReLU()
self.fc2 = nn.Linear(24,12)
def forward(self, x):
return self.fc2(self.relu(self.fc1(x)))
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# initialize the process group
dist.init_process_group("nccl", rank=rank, world_size=world_size)
def cleanup():
dist.destroy_process_group()
def run_demo(demo_fn, world_size):
mp.spawn(
demo_fn,
args=(world_size,),
nprocs=world_size,
join=True)
def demo_serialization(rank, world_size):
setup(rank, world_size)
CHECKPOINT = tempfile.gettempdir() + "/checkpoint.pt"
model = SimpleModel().to(rank)
ddp_model = DistributedDataParallel(model, device_ids=[rank])
powersgd_hook = powerSGD.powerSGD_hook
powersgd_state = powerSGD.PowerSGDState(process_group=None)
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
ddp_model.register_comm_hook(powersgd_state, powersgd_hook)
state = {
'state_dict': ddp_model.state_dict(),
'comm_hook': powersgd_hook,
'comm_hook_state': powersgd_state}
if rank == 0:
torch.save(state, CHECKPOINT)
dist.barrier()
map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
checkpoint = torch.load(CHECKPOINT, map_location=map_location)
new_ddp_model = DistributedDataParallel(SimpleModel().to(rank), device_ids=[rank])
new_ddp_model.load_state_dict(checkpoint['state_dict'])
powersgd_hook = checkpoint['comm_hook']
powersgd_state = checkpoint['comm_hook_state']
new_ddp_model.register_comm_hook(powersgd_state, powersgd_hook)
if rank == 0:
os.remove(CHECKPOINT)
cleanup()
if __name__ == "__main__":
n_gpus = torch.cuda.device_count()
assert n_gpus >= 2, f"Requires at least 2 GPUs to run, but got {n_gpus}"
world_size = n_gpus
run_demo(demo_serialization, world_size)
致谢
感谢 PowerSGD 论文作者 Thijs Vogels 对 PowerSGD 通信钩子的代码审查,以及比较实验,这些实验表明 PowerSGD 通信钩子的性能与原始论文中的实现相当。