结合分布式数据并行与分布式 RPC 框架 ¶
创建时间:2025 年 4 月 1 日 | 最后更新时间:2025 年 4 月 1 日 | 最后验证:未验证
作者:Pritam Damania 和 Yi Wang
备注
在 github 上查看和编辑此教程。
本教程通过一个简单的示例演示了如何将分布式数据并行(DDP)与分布式 RPC 框架结合,以实现分布式数据并行与分布式模型并行的结合,从而训练一个简单的模型。示例的源代码可以在以下位置找到。
之前的教程《使用分布式数据并行入门》和《使用分布式 RPC 框架入门》分别描述了如何执行分布式数据并行和分布式模型并行训练。尽管如此,在许多训练范例中,您可能希望结合这两种技术。例如:
如果我们有一个包含稀疏部分(大型嵌入表)和密集部分(全连接层)的模型,我们可能希望将嵌入表放在参数服务器上,并使用 DistributedDataParallel 在多个训练器上复制全连接层。可以使用 Distributed RPC 框架在参数服务器上执行嵌入查找。
启用 PipeDream 论文中描述的混合并行性。我们可以使用 Distributed RPC 框架在多个工作器之间管道化模型的各个阶段,并使用 DistributedDataParallel 复制每个阶段(如果需要)。
在本教程中,我们将介绍上述提到的案例 1。我们的设置中共有 4 个工作器,如下所示:
1 个主节点,负责在参数服务器上创建嵌入表(nn.EmbeddingBag)。主节点还驱动两个训练器上的训练循环。
1 参数服务器,它基本上在内存中保存嵌入表并响应主节点和训练器的 RPC 调用。
2 训练器,它们存储一个全连接层(nn.Linear),该层通过 DistributedDataParallel 在它们之间复制。训练器还负责执行前向传递、反向传递和优化器步骤。
整个训练过程执行如下:
主节点创建一个包含嵌入表的远程模块,该模块位于参数服务器上。
主模块启动训练循环,并将远程模块传递给训练器。
训练器创建一个
HybridModel
,首先使用主模块提供的远程模块执行嵌入查找,然后执行封装在 DDP 中的 FC 层。训练器执行模型的正向传播,并使用损失来执行使用分布式自动微分(Distributed Autograd)的反向传播。
在反向传播过程中,首先计算 FC 层的梯度,并通过 DDP 中的 allreduce 同步到所有训练器。
接下来,分布式自动微分将梯度传播到参数服务器,更新嵌入表的梯度。
最后,使用分布式优化器更新所有参数。
注意
如果您正在结合 DDP 和 RPC,则应始终使用分布式自动微分进行反向传播。
现在,让我们详细地过一遍每个部分。首先,在我们进行任何训练之前,我们需要设置所有的工作者。我们创建 4 个进程,其中 rank 0 和 1 是我们的训练器,rank 2 是主节点,rank 3 是参数服务器。
我们使用 TCP init_method 在所有 4 个工作节点上初始化 RPC 框架。一旦 RPC 初始化完成,主节点就创建了一个远程模块,该模块在 Parameter Server 上使用 RemoteModule 持有一个 EmbeddingBag 层。然后,主节点通过调用每个训练器的 _run_trainer
来启动训练循环,使用 rpc_async。最后,主节点等待所有训练完成后再退出。
训练器首先使用 init_process_group 初始化一个 DDP 的 ProcessGroup
,其中 world_size=2(用于两个训练器)。接下来,他们使用 TCP init_method 初始化 RPC 框架。请注意,RPC 初始化和 ProcessGroup 初始化中的端口是不同的。这是为了避免两个框架初始化之间的端口冲突。一旦初始化完成,训练器只需等待主节点的 _run_trainer
RPC。
参数服务器仅初始化 RPC 框架,并等待来自训练器和主节点的 RPC。
def run_worker(rank, world_size):
r"""
A wrapper function that initializes RPC, calls the function, and shuts down
RPC.
"""
# We need to use different port numbers in TCP init_method for init_rpc and
# init_process_group to avoid port conflicts.
rpc_backend_options = TensorPipeRpcBackendOptions()
rpc_backend_options.init_method = "tcp://localhost:29501"
# Rank 2 is master, 3 is ps and 0 and 1 are trainers.
if rank == 2:
rpc.init_rpc(
"master",
rank=rank,
world_size=world_size,
rpc_backend_options=rpc_backend_options,
)
remote_emb_module = RemoteModule(
"ps",
torch.nn.EmbeddingBag,
args=(NUM_EMBEDDINGS, EMBEDDING_DIM),
kwargs={"mode": "sum"},
)
# Run the training loop on trainers.
futs = []
for trainer_rank in [0, 1]:
trainer_name = "trainer{}".format(trainer_rank)
fut = rpc.rpc_async(
trainer_name, _run_trainer, args=(remote_emb_module, trainer_rank)
)
futs.append(fut)
# Wait for all training to finish.
for fut in futs:
fut.wait()
elif rank <= 1:
# Initialize process group for Distributed DataParallel on trainers.
dist.init_process_group(
backend="gloo", rank=rank, world_size=2, init_method="tcp://localhost:29500"
)
# Initialize RPC.
trainer_name = "trainer{}".format(rank)
rpc.init_rpc(
trainer_name,
rank=rank,
world_size=world_size,
rpc_backend_options=rpc_backend_options,
)
# Trainer just waits for RPCs from master.
else:
rpc.init_rpc(
"ps",
rank=rank,
world_size=world_size,
rpc_backend_options=rpc_backend_options,
)
# parameter server do nothing
pass
# block until all rpcs finish
rpc.shutdown()
if __name__ == "__main__":
# 2 trainers, 1 parameter server, 1 master.
world_size = 4
mp.spawn(run_worker, args=(world_size,), nprocs=world_size, join=True)
在讨论 Trainer 的详细内容之前,让我们先介绍 Trainer 使用的 HybridModel
。如以下描述, HybridModel
使用远程模块初始化,该模块在参数服务器上持有嵌入表( remote_emb_module
)以及用于 DDP 的 device
。模型的初始化将 nn.Linear 层包裹在 DDP 中,以在所有 Trainer 之间复制和同步此层。
模型的前向方法相当简单。它使用 RemoteModule 的 forward
在参数服务器上执行嵌入查找,并将输出传递到 FC 层。
class HybridModel(torch.nn.Module):
r"""
The model consists of a sparse part and a dense part.
1) The dense part is an nn.Linear module that is replicated across all trainers using DistributedDataParallel.
2) The sparse part is a Remote Module that holds an nn.EmbeddingBag on the parameter server.
This remote model can get a Remote Reference to the embedding table on the parameter server.
"""
def __init__(self, remote_emb_module, device):
super(HybridModel, self).__init__()
self.remote_emb_module = remote_emb_module
self.fc = DDP(torch.nn.Linear(16, 8).cuda(device), device_ids=[device])
self.device = device
def forward(self, indices, offsets):
emb_lookup = self.remote_emb_module.forward(indices, offsets)
return self.fc(emb_lookup.cuda(self.device))
接下来,让我们看看 Trainer 的设置。Trainer 首先使用远程模块创建上述的 HybridModel
,该模块在参数服务器上持有嵌入表以及自己的 rank。
现在,我们需要检索所有想要使用 DistributedOptimizer 进行优化的参数的 RRefs 列表。要从参数服务器检索嵌入表的参数,我们可以调用 RemoteModule 的 remote_parameters,它基本上会遍历嵌入表的所有参数,并返回一个 RRefs 列表。训练器通过 RPC 在参数服务器上调用此方法,以接收所需参数的 RRefs 列表。由于 DistributedOptimizer 始终需要一个需要优化的参数的 RRefs 列表,因此我们甚至需要为我们的 FC 层的本地参数创建 RRefs。这是通过遍历 model.fc.parameters()
,为每个参数创建一个 RRef 并将其追加到 remote_parameters()
返回的列表中完成的。请注意,我们不能使用 model.parameters()
,因为它会递归调用 model.remote_emb_module.parameters()
,而 RemoteModule
不支持。
最后,我们使用所有 RRefs 创建我们的 DistributedOptimizer,并定义一个 CrossEntropyLoss 函数。
def _run_trainer(remote_emb_module, rank):
r"""
Each trainer runs a forward pass which involves an embedding lookup on the
parameter server and running nn.Linear locally. During the backward pass,
DDP is responsible for aggregating the gradients for the dense part
(nn.Linear) and distributed autograd ensures gradients updates are
propagated to the parameter server.
"""
# Setup the model.
model = HybridModel(remote_emb_module, rank)
# Retrieve all model parameters as rrefs for DistributedOptimizer.
# Retrieve parameters for embedding table.
model_parameter_rrefs = model.remote_emb_module.remote_parameters()
# model.fc.parameters() only includes local parameters.
# NOTE: Cannot call model.parameters() here,
# because this will call remote_emb_module.parameters(),
# which supports remote_parameters() but not parameters().
for param in model.fc.parameters():
model_parameter_rrefs.append(RRef(param))
# Setup distributed optimizer
opt = DistributedOptimizer(
optim.SGD,
model_parameter_rrefs,
lr=0.05,
)
criterion = torch.nn.CrossEntropyLoss()
现在我们已经准备好介绍在每个训练器上运行的主体训练循环。 get_next_batch
只是一个辅助函数,用于生成训练的随机输入和目标。我们运行多个 epoch 的训练循环,并为每个批次:
设置分布式自动微分上下文以进行分布式自动微分。
运行模型的正向传播并获取其输出。
使用损失函数根据我们的输出和目标计算损失。
使用分布式自动微分执行分布式反向传播,使用损失函数。
最后,运行分布式优化器步骤以优化所有参数。
def get_next_batch(rank):
for _ in range(10):
num_indices = random.randint(20, 50)
indices = torch.LongTensor(num_indices).random_(0, NUM_EMBEDDINGS)
# Generate offsets.
offsets = []
start = 0
batch_size = 0
while start < num_indices:
offsets.append(start)
start += random.randint(1, 10)
batch_size += 1
offsets_tensor = torch.LongTensor(offsets)
target = torch.LongTensor(batch_size).random_(8).cuda(rank)
yield indices, offsets_tensor, target
# Train for 100 epochs
for epoch in range(100):
# create distributed autograd context
for indices, offsets, target in get_next_batch(rank):
with dist_autograd.context() as context_id:
output = model(indices, offsets)
loss = criterion(output, target)
# Run distributed backward pass
dist_autograd.backward(context_id, [loss])
# Tun distributed optimizer
opt.step(context_id)
# Not necessary to zero grads as each iteration creates a different
# distributed autograd context which hosts different grads
print("Training done for epoch {}".format(epoch))
整个示例的源代码可以在以下位置找到。