使用 PyTorch 编写分布式应用程序 ¶
创建于:2025 年 4 月 1 日 | 最后更新:2025 年 4 月 1 日 | 最后验证:2024 年 11 月 5 日
作者:Séb Arnold
备注
在 github 上查看和编辑此教程。
前提条件:
在这个简短的教程中,我们将介绍 PyTorch 的分布式包。我们将了解如何设置分布式环境,使用不同的通信策略,并了解该包的一些内部机制。
设置
PyTorch 中包含的分布式包(即 torch.distributed
)使研究人员和实践者能够轻松地将他们的计算并行化到进程和机器集群中。为此,它利用消息传递语义,允许每个进程将数据通信到其他任何进程。与多进程( torch.multiprocessing
)包相比,进程可以使用不同的通信后端,并且不受限于在相同的机器上执行。
为了开始,我们需要能够同时运行多个进程。如果您有权访问计算集群,您应该与您的本地系统管理员联系或使用您喜欢的协调工具(例如 pdsh、clustershell 或 slurm)。为了本教程的目的,我们将使用单个机器并使用以下模板启动多个进程。
"""run.py:"""
#!/usr/bin/env python
import os
import sys
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
def run(rank, size):
""" Distributed function to be implemented later. """
pass
def init_process(rank, size, fn, backend='gloo'):
""" Initialize the distributed environment. """
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank, world_size=size)
fn(rank, size)
if __name__ == "__main__":
world_size = 2
processes = []
if "google.colab" in sys.modules:
print("Running in Google Colab")
mp.get_context("spawn")
else:
mp.set_start_method("spawn")
for rank in range(world_size):
p = mp.Process(target=init_process, args=(rank, world_size, run))
p.start()
processes.append(p)
for p in processes:
p.join()
上述脚本启动了两个进程,每个进程将设置分布式环境,初始化进程组( dist.init_process_group
),并最终执行给定的 run
函数。
让我们来了解一下 init_process
函数。它确保每个进程都能通过一个主进程进行协调,使用相同的 IP 地址和端口。请注意,我们使用了 gloo
后端,但其他后端也是可用的。(参见第 5.1 节)在本教程的结尾,我们将介绍 dist.init_process_group
中的魔法,它本质上允许进程通过共享它们的位置相互通信。
点对点通信
从一个进程到另一个进程的数据传输称为点对点通信。这些是通过 send
和 recv
函数或它们的直接对应函数 isend
和 irecv
实现的。
"""Blocking point-to-point communication."""
def run(rank, size):
tensor = torch.zeros(1)
if rank == 0:
tensor += 1
# Send the tensor to process 1
dist.send(tensor=tensor, dst=1)
else:
# Receive tensor from process 0
dist.recv(tensor=tensor, src=0)
print('Rank ', rank, ' has data ', tensor[0])
在上述示例中,这两个进程都以零张量开始,然后进程 0 增加张量并将其发送到进程 1,以便它们最终都得到 1.0。请注意,进程 1 需要分配内存以存储它将接收的数据。
还要注意, send/recv
是阻塞的:两个进程都会在通信完成之前阻塞。另一方面,立即执行是非阻塞的;脚本继续执行,方法返回一个 Work
对象,我们可以选择 wait()
。
"""Non-blocking point-to-point communication."""
def run(rank, size):
tensor = torch.zeros(1)
req = None
if rank == 0:
tensor += 1
# Send the tensor to process 1
req = dist.isend(tensor=tensor, dst=1)
print('Rank 0 started sending')
else:
# Receive tensor from process 0
req = dist.irecv(tensor=tensor, src=0)
print('Rank 1 started receiving')
req.wait()
print('Rank ', rank, ' has data ', tensor[0])
当使用立即执行时,我们必须小心地使用发送和接收张量。由于我们不知道数据何时会被传递到另一个进程,因此我们不应该修改发送张量,也不应该在 req.wait()
完成之前访问接收张量。换句话说,
在
dist.isend()
之后写入tensor
将导致未定义的行为。从
tensor
读取后,在req.wait()
执行之前将导致未定义行为。
然而,在执行 req.wait()
之后,我们保证通信已经发生,并且存储在 tensor[0]
中的值是 1.0。
点对点通信在需要更精细地控制我们的进程通信时非常有用。它们可以用来实现复杂的算法,例如百度 DeepSpeech 或 Facebook 大规模实验中使用的算法。(参见图 4.1)
集体通信 §
与点对点通信相反,集体通信允许在组内所有进程之间进行通信模式。组是我们所有进程的子集。要创建一个组,我们可以将一个进程编号列表传递给 dist.new_group(group)
。默认情况下,集体操作在所有进程上执行,也称为世界。例如,为了获得所有进程上所有张量的总和,我们可以使用 dist.all_reduce(tensor, op, group)
集体操作。
""" All-Reduce example."""
def run(rank, size):
""" Simple collective communication. """
group = dist.new_group([0, 1])
tensor = torch.ones(1)
dist.all_reduce(tensor, op=dist.ReduceOp.SUM, group=group)
print('Rank ', rank, ' has data ', tensor[0])
由于我们想要组内所有张量的总和,我们使用 dist.ReduceOp.SUM
作为归约操作符。一般来说,任何可交换的数学运算都可以用作操作符。PyTorch 自带许多这样的操作符,所有操作都在元素级别上执行:
dist.ReduceOp.SUM
,dist.ReduceOp.PRODUCT
,dist.ReduceOp.MAX
,dist.ReduceOp.MIN
,dist.ReduceOp.BAND
,dist.ReduceOp.BOR
,dist.ReduceOp.BXOR
,dist.ReduceOp.PREMUL_SUM
.
支持的运算符完整列表在此。
除了 dist.all_reduce(tensor, op, group)
之外,PyTorch 中目前还实现了许多其他集体。以下是一些支持的集体。
dist.broadcast(tensor, src, group)
:将tensor
从src
复制到所有其他进程。dist.reduce(tensor, dst, op, group)
:将op
应用于每个tensor
,并将结果存储在dst
中。dist.all_reduce(tensor, op, group)
: 与 reduce 相同,但结果存储在所有进程中。dist.scatter(tensor, scatter_list, src, group)
: 将第\(i\)个张量scatter_list[i]
复制到第\(i\)个进程。dist.gather(tensor, gather_list, dst, group)
: 将tensor
从所有进程复制到dst
。dist.all_gather(tensor_list, tensor, group)
: 将tensor
从所有进程复制到tensor_list
,在所有进程中。dist.barrier(group)
: 阻塞组内所有进程,直到每个进程都进入此函数。dist.all_to_all(output_tensor_list, input_tensor_list, group)
: 将输入张量列表分散到组内的所有进程中,并返回输出列表中收集的张量列表。
可以通过查看 PyTorch 分布式(链接)的最新文档来找到支持的集体操作的全列表。
分布式训练 ¶
注意:您可以在本 GitHub 仓库中找到本节示例脚本。
现在我们已经了解了分布式模块的工作原理,让我们用它来编写一些有用的东西。我们的目标将是复制 DistributedDataParallel 的功能。当然,这只是一个教学示例,在实际情况下你应该使用上面链接的官方、经过充分测试和优化的版本。
简单来说,我们想要实现随机梯度下降的分布式版本。我们的脚本将允许所有进程在其数据批次上计算其模型的梯度,然后平均它们的梯度。为了确保在更改进程数量时具有相似的收敛结果,我们首先必须对我们的数据集进行分区。(你也可以使用 torch.utils.data.random_split,而不是下面的代码片段。)
""" Dataset partitioning helper """
class Partition(object):
def __init__(self, data, index):
self.data = data
self.index = index
def __len__(self):
return len(self.index)
def __getitem__(self, index):
data_idx = self.index[index]
return self.data[data_idx]
class DataPartitioner(object):
def __init__(self, data, sizes=[0.7, 0.2, 0.1], seed=1234):
self.data = data
self.partitions = []
rng = Random() # from random import Random
rng.seed(seed)
data_len = len(data)
indexes = [x for x in range(0, data_len)]
rng.shuffle(indexes)
for frac in sizes:
part_len = int(frac * data_len)
self.partitions.append(indexes[0:part_len])
indexes = indexes[part_len:]
def use(self, partition):
return Partition(self.data, self.partitions[partition])
使用上面的代码片段,我们现在可以简单地使用以下几行代码来分区任何数据集:
""" Partitioning MNIST """
def partition_dataset():
dataset = datasets.MNIST('./data', train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
size = dist.get_world_size()
bsz = 128 // size
partition_sizes = [1.0 / size for _ in range(size)]
partition = DataPartitioner(dataset, partition_sizes)
partition = partition.use(dist.get_rank())
train_set = torch.utils.data.DataLoader(partition,
batch_size=bsz,
shuffle=True)
return train_set, bsz
假设我们有 2 个副本,那么每个进程将有 train_set
60000 / 2 = 30000 个样本。我们还通过将批大小除以副本数量来保持整体批大小为 128。
我们现在可以编写我们常用的前后向优化训练代码,并添加一个调用函数来平均我们模型的梯度。(以下内容主要受到官方 PyTorch MNIST 示例的启发。)
""" Distributed Synchronous SGD Example """
def run(rank, size):
torch.manual_seed(1234)
train_set, bsz = partition_dataset()
model = Net()
optimizer = optim.SGD(model.parameters(),
lr=0.01, momentum=0.5)
num_batches = ceil(len(train_set.dataset) / float(bsz))
for epoch in range(10):
epoch_loss = 0.0
for data, target in train_set:
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
epoch_loss += loss.item()
loss.backward()
average_gradients(model)
optimizer.step()
print('Rank ', dist.get_rank(), ', epoch ',
epoch, ': ', epoch_loss / num_batches)
剩下的就是实现 average_gradients(model)
函数,该函数简单地接收一个模型并平均其在整个世界中的梯度。
""" Gradient averaging. """
def average_gradients(model):
size = float(dist.get_world_size())
for param in model.parameters():
dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
param.grad.data /= size
哈哈!我们成功实现了分布式同步 SGD,并且可以在大型计算机集群上训练任何模型。
注意:虽然最后一句话在技术上是真的,但要实现同步 SGD 的生产级实现还需要更多的技巧。再次强调,使用经过测试和优化的方法。
我们自己的环-allreduce
作为一项额外的挑战,想象一下,如果我们想实现 DeepSpeech 的高效环 allreduce。这使用点对点集体操作来实现相当简单。
""" Implementation of a ring-reduce with addition. """
def allreduce(send, recv):
rank = dist.get_rank()
size = dist.get_world_size()
send_buff = send.clone()
recv_buff = send.clone()
accum = send.clone()
left = ((rank - 1) + size) % size
right = (rank + 1) % size
for i in range(size - 1):
if i % 2 == 0:
# Send send_buff
send_req = dist.isend(send_buff, right)
dist.recv(recv_buff, left)
accum[:] += recv_buff[:]
else:
# Send recv_buff
send_req = dist.isend(recv_buff, right)
dist.recv(send_buff, left)
accum[:] += send_buff[:]
send_req.wait()
recv[:] = accum[:]
在上面的脚本中, allreduce(send, recv)
函数的签名与 PyTorch 中的略有不同。它接受一个 recv
张量,并将所有 send
张量的和存储在其中。作为留给读者的练习,我们的版本与 DeepSpeech 中的版本之间仍有一个区别:它们的实现将梯度张量分成块,以便最优地利用通信带宽。(提示:torch.chunk)
高级主题
我们现在准备探索 torch.distributed
的一些更高级的功能。由于内容很多,本节分为两个小节:
通信后端:学习如何使用 MPI 和 Gloo 进行 GPU-GPU 通信。
初始化方法:了解如何在
dist.init_process_group()
中最佳设置初始协调阶段。
通信后端 ¶
torch.distributed
最优雅的方面之一就是其能够抽象并构建在多种后端之上。正如之前提到的,PyTorch 中实现了多个后端。其中一些最受欢迎的后端包括 Gloo、NCCL 和 MPI。它们各自有不同的规范和权衡,这取决于所需的用例。支持的函数的比较表可以在这里找到。
Gloo 后端
到目前为止,我们已经广泛使用了 Gloo 后端。作为一个开发平台,它非常方便,因为它包含在预编译的 PyTorch 二进制文件中,并且可以在 Linux(自 0.2 版起)和 macOS(自 1.3 版起)上运行。它支持 CPU 上的所有点对点和集体操作,以及 GPU 上的所有集体操作。对于 CUDA 张量的集体操作实现没有 NCCL 后端提供的那么优化。
你肯定已经注意到了,我们的分布式 SGD 示例如果将 model
放在 GPU 上则无法工作。为了使用多个 GPU,我们还需要进行以下修改:
使用
device = torch.device("cuda:{}".format(rank))
model = Net()
\(\rightarrow\)model = Net().to(device)
使用
data, target = data.to(device), target.to(device)
经过上述修改,我们的模型现在在两个 GPU 上训练,你可以用 watch nvidia-smi
监控它们的利用率。
MPI 后端
消息传递接口(MPI)是高性能计算领域的一个标准化工具。它允许进行点对点和集体通信,并且是 torch.distributed
API 的主要灵感来源。存在多种 MPI 实现(例如 Open-MPI、MVAPICH2、Intel MPI),每种实现都针对不同的目的进行了优化。使用 MPI 后端的优点在于 MPI 在大型计算机集群上的广泛可用性和高度优化。一些最新的实现也能够利用 CUDA IPC 和 GPU Direct 技术,以避免通过 CPU 进行内存复制。
不幸的是,PyTorch 的二进制文件无法包含 MPI 实现,我们不得不手动重新编译它。幸运的是,由于在编译过程中,PyTorch 会自行查找可用的 MPI 实现,这个过程相对简单。以下步骤通过从源代码安装 PyTorch 来安装 MPI 后端。
创建并激活您的 Anaconda 环境,按照指南安装所有预置软件,但不要运行
python setup.py install
。选择并安装您喜欢的 MPI 实现。请注意,启用 CUDA 感知 MPI 可能需要一些额外的步骤。在我们的例子中,我们将坚持使用不带 GPU 支持的 Open-MPI:
conda install -c conda-forge openmpi
现在,前往您克隆的 PyTorch 仓库并执行
python setup.py install
。
为了测试我们新安装的后端,需要进行一些修改。
将
if __name__ == '__main__':
下的内容替换为init_process(0, 0, run, backend='mpi')
。运行
mpirun -n 4 python myscript.py
.
这些更改的原因是 MPI 需要在启动进程之前创建自己的环境。MPI 还将启动自己的进程并执行初始化方法中描述的手 shake,这使得 init_process_group
的 rank
和 size
参数变得多余。这实际上非常强大,因为您可以为 mpirun
传递额外的参数来为每个进程定制计算资源。(例如,每个进程的核心数、将机器手动分配给特定排名,以及更多)这样做,您应该获得与其他通信后端相同的熟悉输出。
NCCL 后端
NCCL 后端提供了针对 CUDA 张量的集体操作的优化实现。如果您仅使用 CUDA 张量进行集体操作,请考虑使用此后端以获得最佳性能。NCCL 后端包含在具有 CUDA 支持的预构建二进制文件中。
初始化方法
为了结束本教程,让我们检查我们调用的初始函数: dist.init_process_group(backend, init_method)
。具体来说,我们将讨论各种初始化方法,这些方法负责每个进程之间的初步协调步骤。这些方法使您能够定义这种协调是如何完成的。
初始化方法的选择取决于您的硬件配置,一种方法可能比其他方法更合适。除了以下章节外,请参阅官方文档以获取更多信息。
环境变量
我们在本教程中一直使用环境变量初始化方法。通过在所有机器上设置以下四个环境变量,所有进程都可以正确连接到主节点,获取其他进程的信息,并最终与它们进行握手。
MASTER_PORT
: 将运行进程 0 的机器上的一个空闲端口。MASTER_ADDR
: 将运行进程 0 的机器的 IP 地址。WORLD_SIZE
: 进程总数,以便主节点知道需要等待多少个工作进程。每个进程的排名,以便它们知道自己是主进程还是工作进程。
共享文件系统
共享文件系统要求所有进程都能访问共享文件系统,并通过共享文件进行协调。这意味着每个进程都会打开文件,写入其信息,并等待所有人完成。之后,所有所需信息将立即对所有进程可用。为了避免竞态条件,文件系统必须通过 fcntl 支持锁定。
dist.init_process_group(
init_method='file:///mnt/nfs/sharedfile',
rank=args.rank,
world_size=4)
TCP
通过提供进程 0 的 IP 地址和可到达的端口号,可以通过 TCP 进行初始化。在这里,所有工作者都可以连接到进程 0,并交换如何相互连接的信息。
dist.init_process_group(
init_method='tcp://10.1.1.20:23456',
rank=args.rank,
world_size=4)
致谢
我要感谢 PyTorch 的开发者们在实现、文档和测试方面做得如此出色。当代码不清楚时,我总能依靠文档或测试找到答案。特别是,我要感谢 Soumith Chintala、Adam Paszke 和 Natalia Gimelshein 提供的宝贵意见和回答早期草稿中的问题。