• 文档 >
  • 分布式数据并行
快捷键

分布式数据并行

警告

torch.nn.parallel.DistributedDataParallel 的实现随时间演变。本设计笔记基于 v1.4 版本的状态编写。

torch.nn.parallel.DistributedDataParallel (DDP)透明地执行分布式数据并行训练。本页面描述了其工作原理并揭示了实现细节。

示例

让我们从简单的 torch.nn.parallel.DistributedDataParallel 示例开始。此示例使用 torch.nn.Linear 作为本地模型,将其包装在 DDP 中,然后对 DDP 模型运行一次前向传递、一次反向传递和一个优化器步骤。之后,本地模型上的参数将被更新,并且不同进程上的所有模型应该完全相同。

import torch
import torch.distributed as dist
import torch.multiprocessing as mp
import torch.nn as nn
import torch.optim as optim
import os
from torch.nn.parallel import DistributedDataParallel as DDP


def example(rank, world_size):
    # create default process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)
    # create local model
    model = nn.Linear(10, 10).to(rank)
    # construct DDP model
    ddp_model = DDP(model, device_ids=[rank])
    # define loss function and optimizer
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    # forward pass
    outputs = ddp_model(torch.randn(20, 10).to(rank))
    labels = torch.randn(20, 10).to(rank)
    # backward pass
    loss_fn(outputs, labels).backward()
    # update parameters
    optimizer.step()

def main():
    world_size = 2
    mp.spawn(example,
        args=(world_size,),
        nprocs=world_size,
        join=True)

if __name__=="__main__":
    # Environment variables which need to be
    # set when using c10d's default "env"
    # initialization mode.
    os.environ["MASTER_ADDR"] = "localhost"
    os.environ["MASTER_PORT"] = "29500"
    main()

DDP 与 TorchDynamo 协同工作。当与 TorchDynamo 一起使用时,请在编译模型之前应用 DDP 模型包装器,以便 torchdynamo 可以根据 DDP 桶大小应用 DDPOptimizer (图断优化)。(有关更多信息,请参阅 TorchDynamo DDPOptimizer。)

ddp_model = DDP(model, device_ids=[rank])
ddp_model = torch.compile(ddp_model)

内部设计 ¶

本节通过深入了解一次迭代中的每个步骤的细节,揭示了 torch.nn.parallel.DistributedDataParallel 的工作原理。

  • 前提条件:DDP 依赖于 c10d ProcessGroup 进行通信。因此,应用程序必须在构建 DDP 之前创建 ProcessGroup 实例。

  • 构建:DDP 构造函数接收本地模块的引用,并将 state_dict() 从具有 0 级别的进程广播到组中的所有其他进程,以确保所有模型副本从完全相同的状态开始。然后,每个 DDP 进程创建一个本地 Reducer ,该 Reducer 将在反向传播期间负责梯度同步。为了提高通信效率, Reducer 将参数梯度组织到桶中,并逐个减少桶的大小。桶大小可以通过在 DDP 构造函数中设置 bucket_cap_mb 参数进行配置。参数梯度到桶的映射是在构建时确定的,基于桶大小限制和参数大小。模型参数以(大致)与给定模型的 Model.parameters() 相反的顺序分配到桶中。使用相反顺序的原因是 DDP 预期梯度将在反向传播期间按大约那个顺序准备好。下面的图示显示了示例。请注意, grad0grad1bucket1 中,而其他两个梯度在 bucket0 中。 当然,这个假设并不总是成立,当这种情况发生时,它可能会损害 DDP 的向后速度,因为 Reducer 无法在最早可能的时间启动通信。除了分桶之外, Reducer 在构建过程中还会注册 autograd 钩子,每个参数一个钩子。这些钩子将在梯度准备好时在反向传播期间被触发。

  • 前向传播:DDP 接收输入并将其传递给本地模型,然后分析本地模型的输出,如果 find_unused_parameters 设置为 True 。这种模式允许在模型的子图上运行反向传播,DDP 通过遍历从模型输出开始的 autograd 图来找出参与反向传播的参数,并将所有未使用的参数标记为准备就绪以进行缩减。在反向传播期间, Reducer 只会等待未就绪的参数,但它仍然会缩减所有桶。将参数梯度标记为就绪并不能帮助 DDP 跳过桶,因为目前还不行,但它将防止 DDP 在反向传播期间永远等待缺失的梯度。请注意,遍历 autograd 图会引入额外的开销,因此只有在必要时才应将 find_unused_parameters 设置为 True

  • 后向传播: backward() 函数直接在损失 Tensor 上调用,这超出了 DDP 的控制范围,DDP 使用在构建时注册的 autograd 钩子来触发梯度同步。当一个梯度准备好时,对应于该 grad 累加器的 DDP 钩子会触发,然后 DDP 将该参数梯度标记为准备进行缩减。当一个 bucket 中的所有梯度都准备好时, Reducer 将在该 bucket 上启动一个异步的 allreduce 来计算所有进程的梯度平均值。当所有 bucket 都准备好时, Reducer 将阻塞等待所有 allreduce 操作完成。完成之后,平均梯度将被写入所有参数的 param.grad 字段。因此,在反向传播之后,不同 DDP 进程中相同对应参数的 grad 字段应该是相同的。

  • 优化器步骤:从优化器的角度来看,它正在优化一个本地模型。所有 DDP 进程上的模型副本可以保持同步,因为它们都从相同的状态开始,并且在每个迭代中都有相同的平均梯度。

ddp_grad_sync.png

注意

DDP 需要在所有进程中以相同的顺序调用 allreduce ,这通过始终按照桶索引顺序运行 allreduce 而不是实际的桶就绪顺序来实现。跨进程的 allreduce 顺序不匹配可能导致结果错误或 DDP 向后挂起。

实现 ¶

以下是 DDP 实现组件的指针。堆叠图显示了代码的结构。

ProcessGroup

  • ProcessGroup.hpp:包含所有进程组实现的抽象 API。 c10d 库提供了 3 种开箱即用的实现,分别是 ProcessGroupGloo、ProcessGroupNCCL 和 ProcessGroupMPI。 DistributedDataParallel 在初始化期间使用 ProcessGroup::broadcast() 将具有 rank 0 的进程中的模型状态发送到其他进程,并使用 ProcessGroup::allreduce() 来求和梯度。

  • Store.hpp:协助进程组实例的会合服务找到彼此。

分布式数据并行 ¶

  • distributed.py:是 DDP 的 Python 入口点。它实现了初始化步骤和 forward 函数,该函数调用 nn.parallel.DistributedDataParallel 模块中的 C++库。它的 _sync_param 函数在 DDP 进程在多个设备上工作时执行进程内参数同步,并且它还从排名为 0 的进程中广播模型缓冲区到所有其他进程。进程间参数同步发生在 Reducer.cpp

  • comm.h:实现了合并广播辅助函数,该函数在初始化期间调用以广播模型状态,并在前向传递之前同步模型缓冲区。

  • reducer.h:提供了反向传递中梯度同步的核心实现。它有三个入口点函数:

    • 构造函数在 distributed.py 中被调用,将 Reducer::autograd_hook() 注册到梯度累加器中。

    • 当梯度准备好时,该函数将由自动微分引擎调用。

    • distributed.py 中的 DDP 前向传播结束时调用 prepare_for_backward() 。当在 DDP 构造函数中将 find_unused_parameters 设置为 True 时,它遍历自动微分图以查找未使用的参数。

ddp_code.png

TorchDynamo DDPOptimizer

DDP 的性能优势来自于在反向传播过程中将 allreduce 收集操作与计算重叠。当使用 TorchDynamo 编译整个前向和整个反向图时,AotAutograd 会阻止这种重叠,因为 allreduce 操作是由 autograd 钩子在整个优化的反向计算完成后才启动的。

TorchDynamo 的 DDPOptimizer 通过在反向传播过程中打破前向图在 DDP 的 allreduce 桶的逻辑边界来帮助解决这个问题。注意:目标是反向传播过程中打破图,最简单的实现是先打破前向图,然后对每个部分调用 AotAutograd 和编译。这允许 DDP 的 allreduce 钩子在反向传播的不同部分之间触发,并安排通信与计算重叠。

欲了解更多解释和实验结果,请参阅这篇博客文章,或阅读 torch/_dynamo/optimizations/distributed.py 中的文档和代码。

要调试 DDPOptimizer,请设置 TORCH_LOGS=’ddp_graphs’以获取完整的图转储。对于不带图的日志,可以将‘dynamo’、‘distributed’或‘dist_ddp’之一添加到 TORCH_LOGS(以获取关于桶边界的详细信息)。要禁用 DDPOptimizer,请设置 torch._dynamo.config.optimize_ddp=False。即使没有 DDPOptimizer,DDP 和 TorchDynamo 也应正常工作,但性能会有所下降。


© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,并使用 Read the Docs 提供的主题。

文档

PyTorch 的全面开发者文档

查看文档

教程

深入了解初学者和高级开发者的教程

查看教程

资源

查找开发资源并获得您的疑问解答

查看资源