• 文档 >
  • 管道并行
快捷键

管道并行 ¶

注意

torch.distributed.pipelining 目前处于 alpha 状态,处于开发中。API 变更可能发生。它已从 PiPPy 项目迁移过来。

为什么需要管道并行? ¶

深度学习中的一种基本并行方式是管道并行。它允许将模型的执行分割成多个微批,以便多个微批可以并发执行模型代码的不同部分。管道并行可以是一种有效的技术,用于:

  • 大规模训练

  • 带宽受限的集群

  • 大模型推理

以上场景具有一个共同点,即每个设备上的计算无法隐藏传统并行通信,例如 FSDP 的全局聚合。

什么是 torch.distributed.pipelining ? ¶

虽然在扩展方面很有前景,但流水线通常难以实现,因为它需要除了模型权重外,还要对模型的执行进行分区。执行分区通常需要修改模型的侵入性代码。复杂性的另一个方面来自于在分布式环境中调度微批处理,需要考虑数据流依赖。

pipelining 包提供了一套自动执行这些任务的工具包,这使得在通用模型上轻松实现流水线并行成为可能。

它由两部分组成:一个分割前端和一个分布式运行时。分割前端接收你的模型代码,将其分割成“模型分区”,并捕获数据流关系。分布式运行时在不同的设备上并行执行流水线阶段,处理微批分割、调度、通信和梯度传播等。

总体而言, pipelining 包含以下功能:

  • 基于简单规范的模型代码拆分。

  • 对管道调度提供丰富支持,包括 GPipe、1F1B、交错 1F1B 和循环 BFS,并提供编写自定义调度的基础设施。

  • 首选支持跨主机管道并行,因为这是 PP 通常使用的地方(在较慢的互连上)。

  • 与其他 PyTorch 并行技术(如数据并行(DDP、FSDP)或张量并行)的兼容性。TorchTitan 项目展示了在 Llama 模型上的“3D 并行”应用。

步骤 1:构建 PipelineStage

在我们能够使用 PipelineSchedule 之前,我们需要创建 PipelineStage 对象来封装在该阶段运行的部分模型。 PipelineStage 负责分配通信缓冲区并创建与对等节点通信的发送/接收操作。它管理中间缓冲区,例如尚未被消费的前向输出,并为阶段模型的后向运行提供实用工具。

PipelineStage 需要知道阶段模型的输入和输出形状,以便正确分配通信缓冲区。形状必须是静态的,例如,在运行时形状不能从步骤到步骤发生变化。如果运行时形状与预期形状不匹配,将引发 PipeliningShapeError 类。在与其他并行性组合或应用混合精度时,必须考虑这些技术,以便 PipelineStage 知道阶段模块在运行时的正确形状(和数据类型)。

用户可以直接通过传递表示应在舞台上运行的模型部分的 nn.Module 来构建一个 PipelineStage 实例。这可能需要修改原始模型代码。请参阅选项 1:手动拆分模型的示例。

或者,拆分前端可以使用图划分自动将您的模型拆分为一系列 nn.Module 。这种技术要求模型可以通过 torch.Export 进行追踪。结果 nn.Module 与其他并行技术兼容性实验性,可能需要一些解决方案。如果用户无法轻松更改模型代码,使用此前端可能更具吸引力。有关更多信息,请参阅选项 2:自动拆分模型。

第 2 步:使用 PipelineSchedule 进行执行

现在我们可以将 PipelineStage 附接到管道调度中,并使用输入数据运行调度。以下是一个 GPipe 示例:

from torch.distributed.pipelining import ScheduleGPipe

# Create a schedule
schedule = ScheduleGPipe(stage, n_microbatches)

# Input data (whole batch)
x = torch.randn(batch_size, in_dim, device=device)

# Run the pipeline with input `x`
# `x` will be divided into microbatches automatically
if rank == 0:
    schedule.step(x)
else:
    output = schedule.step()

注意,上述代码需要为每个工作进程单独启动,因此我们使用启动器服务来启动多个进程:

torchrun --nproc_per_node=2 example.py

模型拆分的选项 ¶

选项 1:手动拆分模型 ¶

要直接构建一个 PipelineStage ,用户负责提供一个拥有相关 nn.Modulenn.Parameters 的单个 nn.Buffers 实例,并定义一个 forward() 方法来执行该阶段相关的操作。例如,在 Torchtitan 中定义的 Transformer 类的简化版本显示了构建易于分割的模型的模式。

class Transformer(nn.Module):
    def __init__(self, model_args: ModelArgs):
        super().__init__()

        self.tok_embeddings = nn.Embedding(...)

        # Using a ModuleDict lets us delete layers without affecting names,
        # ensuring checkpoints will correctly save and load.
        self.layers = torch.nn.ModuleDict()
        for layer_id in range(model_args.n_layers):
            self.layers[str(layer_id)] = TransformerBlock(...)

        self.output = nn.Linear(...)

    def forward(self, tokens: torch.Tensor):
        # Handling layers being 'None' at runtime enables easy pipeline splitting
        h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens

        for layer in self.layers.values():
            h = layer(h, self.freqs_cis)

        h = self.norm(h) if self.norm else h
        output = self.output(h).float() if self.output else h
        return output

以这种方式定义的模型可以通过首先初始化整个模型(使用元设备以避免内存不足错误),删除该阶段不需要的层,然后创建一个包装模型的 PipelineStage 来轻松按阶段进行配置。例如:

with torch.device("meta"):
    assert num_stages == 2, "This is a simple 2-stage example"

    # we construct the entire model, then delete the parts we do not need for this stage
    # in practice, this can be done using a helper function that automatically divides up layers across stages.
    model = Transformer()

    if stage_index == 0:
        # prepare the first stage model
        del model.layers["1"]
        model.norm = None
        model.output = None

    elif stage_index == 1:
        # prepare the second stage model
        model.tok_embeddings = None
        del model.layers["0"]

    from torch.distributed.pipelining import PipelineStage
    stage = PipelineStage(
        model,
        stage_index,
        num_stages,
        device,
    )

当与其他数据或模型并行技术结合使用时,如果模型块的输出形状/数据类型将受到影响,可能还需要 output_args

选项 2:自动拆分模型 ¶

如果您有一个完整的模型,并且不想花费时间将其修改为“模型分区”的序列,那么 pipeline API 就在这里帮助您。以下是一个简短的示例:

class Model(torch.nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.emb = torch.nn.Embedding(10, 3)
        self.layers = torch.nn.ModuleList(
            Layer() for _ in range(2)
        )
        self.lm = LMHead()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.emb(x)
        for layer in self.layers:
            x = layer(x)
        x = self.lm(x)
        return x

如果打印模型,我们可以看到多个层次结构,这使得手动分割变得困难:

Model(
  (emb): Embedding(10, 3)
  (layers): ModuleList(
    (0-1): 2 x Layer(
      (lin): Linear(in_features=3, out_features=3, bias=True)
    )
  )
  (lm): LMHead(
    (proj): Linear(in_features=3, out_features=3, bias=True)
  )
)

让我们看看 pipeline API 是如何工作的:

from torch.distributed.pipelining import pipeline, SplitPoint

# An example micro-batch input
x = torch.LongTensor([1, 2, 4, 5])

pipe = pipeline(
    module=mod,
    mb_args=(x,),
    split_spec={
        "layers.1": SplitPoint.BEGINNING,
    }
)

pipeline API 根据 split_spec 分割你的模型,其中 SplitPoint.BEGINNING 表示在 forward 函数执行某些子模块之前添加分割点,同样, SplitPoint.END 表示在执行之后添加分割点。

如果我们 print(pipe) ,我们可以看到:

GraphModule(
  (submod_0): GraphModule(
    (emb): InterpreterModule()
    (layers): Module(
      (0): InterpreterModule(
        (lin): InterpreterModule()
      )
    )
  )
  (submod_1): GraphModule(
    (layers): Module(
      (1): InterpreterModule(
        (lin): InterpreterModule()
      )
    )
    (lm): InterpreterModule(
      (proj): InterpreterModule()
    )
  )
)

def forward(self, x):
    submod_0 = self.submod_0(x);  x = None
    submod_1 = self.submod_1(submod_0);  submod_0 = None
    return (submod_1,)

“模型分区”由子模块( submod_0submod_1 )表示,每个子模块都使用原始模型操作、权重和层次结构重建。此外,还重建了一个“根级” forward 函数,以捕获这些分区之间的数据流。这种数据流将由管道运行时以分布式方式回放。

Pipe 对象提供了一个检索“模型分区”的方法:

stage_mod : nn.Module = pipe.get_stage_module(stage_idx)

返回的 stage_mod 是一个 nn.Module ,您可以使用它创建优化器、保存或加载检查点,或应用其他并行性。

Pipe 还允许您在给定的设备上创建一个分布式阶段运行时 ProcessGroup

stage = pipe.build_stage(stage_idx, device, group)

或者,如果您想在修改 stage_mod 之后稍后再构建阶段运行时,可以使用 build_stage API 的功能版本。例如:

from torch.distributed.pipelining import build_stage
from torch.nn.parallel import DistributedDataParallel

dp_mod = DistributedDataParallel(stage_mod)
info = pipe.info()
stage = build_stage(dp_mod, stage_idx, info, device, group)

注意

pipeline 前端使用跟踪器( torch.export )将您的模型捕获到一个单独的图中。如果您的模型无法进行全图化,您可以使用我们下面的手动前端。

Hugging Face 示例

在此包最初创建的 PiPPy 仓库中,我们保留了基于未修改的 Hugging Face 模型的示例。请参阅 examples/huggingface 目录。

包括以下示例:

技术深度解析 ¶

这个 pipeline API 是如何分割模型的?¶

首先, pipeline API 通过追踪模型将我们的模型转换为一个有向无环图(DAG)。它使用 torch.export –一个 PyTorch 2 全图捕获工具来追踪模型。

然后,它将一个阶段所需的操作和参数组合成一个重构的子模块: submod_0submod_1 ,…

与传统的子模块访问方法 Module.children() 不同, pipeline API 不仅切割了您模型的模块结构,还切割了您模型的正向函数。

这是因为模型结构如 Module.children() 仅仅在 Module.__init__() 期间捕获信息,并没有捕获关于 Module.forward() 的任何信息。换句话说, Module.children() 缺乏以下关键信息:

  • forward 中子模块的执行顺序

  • 子模块之间的激活流程

  • 子模块之间是否存在功能操作符(例如, reluadd 操作不会被 Module.children() 捕获)。

相反, pipeline API 确保了 forward 行为真正得到保留。它还捕获了分区之间的激活流程,帮助分布式运行时在没有人工干预的情况下正确地进行发送/接收调用。

pipeline API 的另一个灵活性在于,分割点可以位于您的模型层次结构中的任意级别。在分割分区中,与该分区相关的原始模型层次结构将免费重建。因此,指向子模块或参数的完全限定名称(FQN)仍然有效,依赖于 FQN 的服务(如 FSDP、TP 或检查点)仍然可以在您的分区模块上运行,几乎不需要修改代码。

实现自己的调度

您可以通过扩展以下两个类之一来实现自己的管道调度:

  • PipelineScheduleSingle

  • PipelineScheduleMulti

PipelineScheduleSingle 用于只分配一个阶段给每个等级的计划。 PipelineScheduleMulti 用于为每个等级分配多个阶段的计划。

例如, ScheduleGPipeSchedule1F1BPipelineScheduleSingle 的子类。而 ScheduleInterleaved1F1BScheduleLoopedBFSScheduleInterleavedZeroBubbleScheduleZBVZeroBubblePipelineScheduleMulti 的子类。

记录日志

您可以使用 TORCH_LOGS 环境变量从 torch._logging 中开启额外的日志记录。

  • 设置 TORCH_LOGS=+pp 将显示 logging.DEBUG 消息及其以上级别的所有日志。

  • TORCH_LOGS=pp 将显示 logging.INFO 级别及以上的日志信息。

  • TORCH_LOGS=-pp 将显示 logging.WARNING 级别及以上的日志信息。

API 参考¶

模型分割 API ¶

以下一组 API 将您的模型转换为管道表示形式。

class torch.distributed.pipelining.SplitPoint(value, names=<未提供>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source][source] ¶

枚举表示在子模块执行过程中可以发生拆分的点。 :ivar BEGINNING: 表示在正向函数执行某个子模块之前添加拆分点。 :ivar END: 表示在正向函数执行某个子模块之后添加拆分点。

torch.distributed.pipelining.pipeline(module, mb_args, mb_kwargs=None, split_spec=None, split_policy=None)[source][source]

根据规范拆分模块。

查看管道以获取更多详细信息。

参数:
  • 模块(模块)- 要拆分的模块。

  • mb_args(元组[任何, ...])- 示例位置输入,以微批形式。

  • mb_kwargs(可选[dict[str, 任何])- 示例关键字输入,以微批形式。(默认:None)

  • split_spec (Optional[dict[str, torch.distributed.pipelining._IR.SplitPoint]]) – 使用子模块名称作为分割标记的字典。 (默认:None)

  • split_policy (Optional[Callable[[GraphModule], GraphModule]]) – 用于分割模块的策略。 (默认:None)

返回类型:

A pipeline representation of class Pipe.

class torch.distributed.pipelining.Pipe(split_gm, num_stages, has_loss_and_backward, loss_spec)[source][source]
torch.distributed.pipelining.pipe_split()[source][source]

pipe_split 是一个特殊操作符,用于标记模块中阶段的边界。它用于将模块分割成阶段。如果您的注释模块是急切运行的,则它是一个无操作(no-op)。

示例

>>> def forward(self, x):
>>>     x = torch.mm(x, self.mm_param)
>>>     x = torch.relu(x)
>>>     pipe_split()
>>>     x = self.lin(x)
>>>     return x

上述示例将被分割成两个阶段。

微批处理工具

class torch.distributed.pipelining.microbatch.TensorChunkSpec(split_dim)[source][source]

用于指定输入分块的字类

torch.distributed.pipelining.microbatch.split_args_kwargs_into_chunks(args, kwargs, chunks, args_chunk_spec=None, kwargs_chunk_spec=None)[source][source]

根据各自的分块规范,将一系列参数和关键字参数分割成多个块。

参数:
  • args (tuple[Any, ...]) – 参数元组

  • kwargs (Optional[dict[str, Any]]) – kwargs 字典(可选)

  • chunks (int) – 将 args 和 kwargs 分割成块的数量

  • args_chunk_spec (Optional[tuple[torch.distributed.pipelining.microbatch.TensorChunkSpec, ...]]) – args 的块分割规范,形状与 args 相同

  • kwargs_chunk_spec (Optional[dict[str, torch.distributed.pipelining.microbatch.TensorChunkSpec]]) – kwargs 的 chunking 规范,形状与 kwargs 相同

返回:

分片参数 kwargs_split:分片 kwargs 的列表

返回类型:

args_split

torch.distributed.pipelining.microbatch.merge_chunks(chunks, chunk_spec)[source][source]

给定一系列块,根据块规范将它们合并成一个值。

参数:
  • chunks (list[Any]) – 块列表

  • chunk_spec – 块分割规范

返回:

合并后的值

返回类型:

管道阶段 ¶

class torch.distributed.pipelining.stage.PipelineStage(submodule, stage_index, num_stages, device, input_args=None, output_args=None, group=None, dw_builder=None)[source][source]

表示管道并行设置中管道阶段的类。

PipelineStage 假设模型进行顺序分区,即模型被分割成块,其中一块的输出作为下一块输入,没有跳过连接。

PipelineStage 自动执行运行时形状/数据类型推断,通过将 stage0 的输出传递到 stage1,依此类推,按线性顺序进行。要绕过形状推断,请将 input_args 和 output_args 传递给每个 PipelineStage 实例。

参数:
  • 子模块(nn.Module)- 此阶段包装的 PyTorch 模块。

  • stage_index(int)- 此阶段的 ID。

  • num_stages (int) – 阶段总数。

  • device (torch.device) – 此阶段所在设备。

  • input_args (Union[torch.Tensor, Tuple[torch.tensor]], optional) – 子模块的输入参数。

  • output_args (Union[torch.Tensor, Tuple[torch.tensor]], optional) – 子模块的输出参数。

  • group (dist.ProcessGroup, 可选) – 分布式训练的进程组。如果为 None,则使用默认组。

  • dw_builder (Optional[Callable[[], Callable[..., None]]) – 如果提供,dw_builder 将构建一个新的 dw_runner 函数,该函数将为 F、I、W(前向、输入、权重)零气泡调度提供 W 操作(输入权重)。

torch.distributed.pipelining.stage.build_stage(stage_module, stage_index, pipe_info, device, group=None)[source][source]

根据要由该阶段包装的阶段模块和管道信息创建一个管道阶段。

参数:
  • stage_module (torch.nn.Module) – 该阶段要包装的模块

  • stage_index (int) – 该阶段在流水线中的索引

  • pipe_info (PipeInfo) – 流水线信息,可以通过 pipe.info() 获取

  • device (torch.device) – 该阶段使用的设备

  • group(可选的 dist.ProcessGroup)- 此阶段所使用的进程组

返回:

可以与 PipelineSchedules 一起运行的管道阶段

返回类型:

_PipelineStage

管道调度表 ¶

class torch.distributed.pipelining.schedules.ScheduleGPipe(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source][source]

GPipe 调度。将以填充-排空的方式遍历所有微批次。

class torch.distributed.pipelining.schedules.Schedule1F1B(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source][source]

1F1B 调度。将在稳态微批次上执行一次前向和一次反向操作。

class torch.distributed.pipelining.schedules.ScheduleInterleaved1F1B(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source][source]

交错 1F1B 调度。详情请见 https://arxiv.org/pdf/2104.04473。在稳态下将对微批执行一次前向和一次反向操作,并支持每个排名的多个阶段。当微批准备进行多个本地阶段时,交错 1F1B 会优先处理较早的微批(也称为“深度优先”)。

此调度与原始论文基本相似。不同之处在于放宽了 num_microbatch % pp_size == 0 的要求。使用 flex_pp 调度,我们将有 num_rounds = max(1, n_microbatches // pp_group_size),并且只要 n_microbatches % num_rounds == 0 就可以工作。以下是一些示例,支持

  1. pp_group_size = 4, n_microbatches = 10。我们将有 num_rounds = 2,并且 n_microbatches % 2 == 0。

  2. pp_group_size = 4, n_microbatches = 3. 我们将会有 num_rounds = 1,且 n_microbatches % 1 等于 0。

class torch.distributed.pipelining.schedules.ScheduleLoopedBFS(stages, n_microbatches, loss_fn=None, output_merge_spec=None, scale_grads=True)[source][source]

广度优先流水线并行。有关详细信息,请参阅 https://arxiv.org/abs/2211.05953。类似于交错 1F1B,循环 BFS 支持每个 rank 多个阶段。不同之处在于,当微批次准备就绪以进行多个本地阶段时,循环 BFS 将优先考虑较早的阶段,一次性运行所有可用的微批次。

class torch.distributed.pipelining.schedules.ScheduleInterleavedZeroBubble(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source][source]

交错零气泡调度。详见 https://arxiv.org/pdf/2401.10241 以获取详细信息。将在稳态下对微批次的输入执行一次正向和一次反向操作,并支持每个 rank 的多个阶段。使用反向操作填充管道气泡的权重。

尤其是在实现论文中的 ZB1P 调度。

class torch.distributed.pipelining.schedules.ScheduleZBVZeroBubble(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source][source]

零气泡调度(ZBV 变体)。详见 https://arxiv.org/pdf/2401.10241 第 6 节以获取详细信息。

此调度要求每个等级恰好有两个阶段。

此调度将在稳态下对微批次的输入执行一次正向和一次反向操作,并支持每个等级的多个阶段。使用相对于权重的反向操作来填充管道气泡。

只有当正向时间等于反向时间、输入时间等于反向权重时间时,此 ZB-V 调度才具有“零气泡”属性。在实践中,对于真实模型来说,这不太可能成立,因此可以实施贪婪调度器来处理不等/不平衡的时间。

class torch.distributed.pipelining.schedules.PipelineScheduleSingle(stage, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, scale_grads=True)[source][source]

单阶段调度的基础类。实现了 step 方法。派生类应实现_step_microbatches。

根据 scale_grads 参数对梯度进行缩放,num_microbatches 的数量,默认为 True。此设置应与您的 loss_fn 配置相匹配,loss_fn 可能平均损失(scale_grads=True)或累加损失(scale_grads=False)。

step(*args, target=None, losses=None, **kwargs)[source][source]

使用全批次输入运行一次管道调度。将自动将输入分块为微批次,并按照调度实现遍历微批次。

args: 模型的位置参数(在非管道情况下)。kwargs: 模型的关键字参数(在非管道情况下)。target: 损失函数的目标。losses: 存储每个微批次的损失的列表。

class torch.distributed.pipelining.schedules.PipelineScheduleMulti(stages, n_microbatches, loss_fn=None, args_chunk_spec=None, kwargs_chunk_spec=None, output_merge_spec=None, use_full_backward=None, scale_grads=True)[source][source]

多阶段调度的基础类。实现了步骤方法。

根据 scale_grads 参数,梯度按微批次数缩放,默认为 True。此设置应与您的 loss_fn 配置相匹配,loss_fn 可能平均损失(scale_grads=True)或累加损失(scale_grads=False)。

step(*args, target=None, losses=None, **kwargs)[source][source]

执行一次整个批次输入的管道调度迭代。将自动将输入分块为微批次,并按照调度实现遍历微批次。

args: 模型的位置参数(如在非管道情况下)。kwargs: 模型的关键字参数(如在非管道情况下)。target: 损失函数的目标。losses: 存储每个微批次损失的列表。


© 版权所有 PyTorch 贡献者。

使用 Sphinx 构建,主题由 Read the Docs 提供。

文档

PyTorch 开发者文档全面访问

查看文档

教程

获取初学者和高级开发者的深入教程

查看教程

资源

查找开发资源并获得您的疑问解答

查看资源