分布式数据并行
警告
torch.nn.parallel.DistributedDataParallel
的实现随时间演变。本设计笔记基于 v1.4 版本的状态编写。
torch.nn.parallel.DistributedDataParallel
(DDP)透明地执行分布式数据并行训练。本页面描述了其工作原理并揭示了实现细节。
示例
让我们从简单的 torch.nn.parallel.DistributedDataParallel
示例开始。此示例使用 torch.nn.Linear
作为本地模型,将其包装在 DDP 中,然后对 DDP 模型运行一次前向传递、一次反向传递和一个优化器步骤。之后,本地模型上的参数将被更新,并且不同进程上的所有模型应该完全相同。
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
import os
from torch.nn.parallel import DistributedDataParallel as DDP
def example(rank, world_size):
# create default process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
# create local model
model = nn.Linear(10, 10).to(rank)
# construct DDP model
ddp_model = DDP(model, device_ids=[rank])
# define loss function and optimizer
loss_fn = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
# forward pass
outputs = ddp_model(torch.randn(20, 10).to(rank))
labels = torch.randn(20, 10).to(rank)
# backward pass
loss_fn(outputs, labels).backward()
# update parameters
optimizer.step()
def main():
world_size = 2
mp.spawn(example,
args=(world_size,),
nprocs=world_size,
join=True)
if __name__=="__main__":
# Environment variables which need to be
# set when using c10d's default "env"
# initialization mode.
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "29500"
main()
DDP 与 TorchDynamo 协同工作。当与 TorchDynamo 一起使用时,请在编译模型之前应用 DDP 模型包装器,以便 torchdynamo 可以根据 DDP 桶大小应用 DDPOptimizer
(图断优化)。(有关更多信息,请参阅 TorchDynamo DDPOptimizer。)
ddp_model = DDP(model, device_ids=[rank])
ddp_model = torch.compile(ddp_model)
内部设计 ¶
本节通过深入了解一次迭代中的每个步骤的细节,揭示了 torch.nn.parallel.DistributedDataParallel
的工作原理。
前提条件:DDP 依赖于 c10d
ProcessGroup
进行通信。因此,应用程序必须在构建 DDP 之前创建ProcessGroup
实例。构建:DDP 构造函数接收本地模块的引用,并将
state_dict()
从具有 0 级别的进程广播到组中的所有其他进程,以确保所有模型副本从完全相同的状态开始。然后,每个 DDP 进程创建一个本地Reducer
,该Reducer
将在反向传播期间负责梯度同步。为了提高通信效率,Reducer
将参数梯度组织到桶中,并逐个减少桶的大小。桶大小可以通过在 DDP 构造函数中设置 bucket_cap_mb 参数进行配置。参数梯度到桶的映射是在构建时确定的,基于桶大小限制和参数大小。模型参数以(大致)与给定模型的Model.parameters()
相反的顺序分配到桶中。使用相反顺序的原因是 DDP 预期梯度将在反向传播期间按大约那个顺序准备好。下面的图示显示了示例。请注意,grad0
和grad1
在bucket1
中,而其他两个梯度在bucket0
中。 当然,这个假设并不总是成立,当这种情况发生时,它可能会损害 DDP 的向后速度,因为Reducer
无法在最早可能的时间启动通信。除了分桶之外,Reducer
在构建过程中还会注册 autograd 钩子,每个参数一个钩子。这些钩子将在梯度准备好时在反向传播期间被触发。前向传播:DDP 接收输入并将其传递给本地模型,然后分析本地模型的输出,如果
find_unused_parameters
设置为True
。这种模式允许在模型的子图上运行反向传播,DDP 通过遍历从模型输出开始的 autograd 图来找出参与反向传播的参数,并将所有未使用的参数标记为准备就绪以进行缩减。在反向传播期间,Reducer
只会等待未就绪的参数,但它仍然会缩减所有桶。将参数梯度标记为就绪并不能帮助 DDP 跳过桶,因为目前还不行,但它将防止 DDP 在反向传播期间永远等待缺失的梯度。请注意,遍历 autograd 图会引入额外的开销,因此只有在必要时才应将find_unused_parameters
设置为True
。后向传播:
backward()
函数直接在损失Tensor
上调用,这超出了 DDP 的控制范围,DDP 使用在构建时注册的 autograd 钩子来触发梯度同步。当一个梯度准备好时,对应于该 grad 累加器的 DDP 钩子会触发,然后 DDP 将该参数梯度标记为准备进行缩减。当一个 bucket 中的所有梯度都准备好时,Reducer
将在该 bucket 上启动一个异步的allreduce
来计算所有进程的梯度平均值。当所有 bucket 都准备好时,Reducer
将阻塞等待所有allreduce
操作完成。完成之后,平均梯度将被写入所有参数的param.grad
字段。因此,在反向传播之后,不同 DDP 进程中相同对应参数的 grad 字段应该是相同的。优化器步骤:从优化器的角度来看,它正在优化一个本地模型。所有 DDP 进程上的模型副本可以保持同步,因为它们都从相同的状态开始,并且在每个迭代中都有相同的平均梯度。

注意
DDP 需要在所有进程中以相同的顺序调用 allreduce
,这通过始终按照桶索引顺序运行 allreduce
而不是实际的桶就绪顺序来实现。跨进程的 allreduce
顺序不匹配可能导致结果错误或 DDP 向后挂起。
实现 ¶
以下是 DDP 实现组件的指针。堆叠图显示了代码的结构。
ProcessGroup¶
ProcessGroup.hpp:包含所有进程组实现的抽象 API。
c10d
库提供了 3 种开箱即用的实现,分别是 ProcessGroupGloo、ProcessGroupNCCL 和 ProcessGroupMPI。DistributedDataParallel
在初始化期间使用ProcessGroup::broadcast()
将具有 rank 0 的进程中的模型状态发送到其他进程,并使用ProcessGroup::allreduce()
来求和梯度。Store.hpp:协助进程组实例的会合服务找到彼此。
分布式数据并行 ¶
distributed.py:是 DDP 的 Python 入口点。它实现了初始化步骤和
forward
函数,该函数调用nn.parallel.DistributedDataParallel
模块中的 C++库。它的_sync_param
函数在 DDP 进程在多个设备上工作时执行进程内参数同步,并且它还从排名为 0 的进程中广播模型缓冲区到所有其他进程。进程间参数同步发生在Reducer.cpp
。comm.h:实现了合并广播辅助函数,该函数在初始化期间调用以广播模型状态,并在前向传递之前同步模型缓冲区。
reducer.h:提供了反向传递中梯度同步的核心实现。它有三个入口点函数:
构造函数在
distributed.py
中被调用,将Reducer::autograd_hook()
注册到梯度累加器中。当梯度准备好时,该函数将由自动微分引擎调用。
在
distributed.py
中的 DDP 前向传播结束时调用prepare_for_backward()
。当在 DDP 构造函数中将find_unused_parameters
设置为True
时,它遍历自动微分图以查找未使用的参数。

TorchDynamo DDPOptimizer¶
DDP 的性能优势来自于在反向传播过程中将 allreduce 收集操作与计算重叠。当使用 TorchDynamo 编译整个前向和整个反向图时,AotAutograd 会阻止这种重叠,因为 allreduce 操作是由 autograd 钩子在整个优化的反向计算完成后才启动的。
TorchDynamo 的 DDPOptimizer 通过在反向传播过程中打破前向图在 DDP 的 allreduce 桶的逻辑边界来帮助解决这个问题。注意:目标是反向传播过程中打破图,最简单的实现是先打破前向图,然后对每个部分调用 AotAutograd 和编译。这允许 DDP 的 allreduce 钩子在反向传播的不同部分之间触发,并安排通信与计算重叠。
欲了解更多解释和实验结果,请参阅这篇博客文章,或阅读 torch/_dynamo/optimizations/distributed.py 中的文档和代码。
要调试 DDPOptimizer,请设置 TORCH_LOGS=’ddp_graphs’以获取完整的图转储。对于不带图的日志,可以将‘dynamo’、‘distributed’或‘dist_ddp’之一添加到 TORCH_LOGS(以获取关于桶边界的详细信息)。要禁用 DDPOptimizer,请设置 torch._dynamo.config.optimize_ddp=False。即使没有 DDPOptimizer,DDP 和 TorchDynamo 也应正常工作,但性能会有所下降。