分布式 RPC 框架入门指南 ¶
创建于:2025 年 4 月 1 日 | 最后更新:2025 年 4 月 1 日 | 最后验证:2024 年 11 月 5 日
作者:沈丽
备注
在 github 上查看和编辑此教程。
前提条件:
本教程使用两个简单的示例来演示如何使用 torch.distributed.rpc 包构建分布式训练,该包最初在 PyTorch v1.4 中作为实验性功能引入。两个示例的源代码可以在 PyTorch 示例中找到。
之前的教程《PyTorch 分布式数据并行入门》和《使用 PyTorch 编写分布式应用程序》介绍了 DistributedDataParallel,它支持一种特定的训练范式,即模型在多个进程中复制,每个进程处理输入数据的一部分。有时,您可能会遇到需要不同训练范式的情况。例如:
在强化学习中,从环境中获取训练数据可能相对昂贵,而模型本身可能相当小。在这种情况下,并行启动多个观察者并共享单个代理可能很有用。在这种情况下,代理负责本地训练,但应用程序仍然需要库来在观察者和训练者之间发送和接收数据。
您的模型可能太大,无法在一个机器的 GPU 中运行,因此需要库来帮助将模型分割到多个机器上。或者您可能正在实现参数服务器训练框架,其中模型参数和训练器位于不同的机器上。
torch.distributed.rpc 包可以帮助解决上述问题。在情况 1 中,RPC 和 RRef 允许将数据从一个工作器发送到另一个工作器,同时可以轻松地引用远程数据对象。在情况 2 中,分布式自动微分和分布式优化器使得执行反向传播和优化器步骤就像本地训练一样。在接下来的两个部分中,我们将使用强化学习示例和语言模型示例来演示 torch.distributed.rpc 的 API。请注意,本教程的目标不是构建最准确或最有效的模型来解决给定问题,而是展示如何使用 torch.distributed.rpc 包构建分布式训练应用程序。
使用 RPC 和 RRef 进行分布式强化学习
本节描述了使用 RPC 构建玩具分布式强化学习模型,以解决 OpenAI Gym 中的 CartPole-v1 问题的步骤。策略代码主要借鉴了现有的单线程示例,如下所示。我们将跳过 Policy
设计的细节,并专注于 RPC 的使用。
import torch.nn as nn
import torch.nn.functional as F
class Policy(nn.Module):
def __init__(self):
super(Policy, self).__init__()
self.affine1 = nn.Linear(4, 128)
self.dropout = nn.Dropout(p=0.6)
self.affine2 = nn.Linear(128, 2)
def forward(self, x):
x = self.affine1(x)
x = self.dropout(x)
x = F.relu(x)
action_scores = self.affine2(x)
return F.softmax(action_scores, dim=1)
我们准备展示观察者。在这个例子中,每个观察者创建自己的环境,并等待代理的命令来运行一个回合。在每个回合中,观察者最多循环 n_steps
次,在每个迭代中,它使用 RPC 将环境状态传递给代理,并获取一个动作。然后它将这个动作应用到其环境中,并从环境中获取奖励和下一个状态。之后,观察者使用另一个 RPC 将奖励报告给代理。请注意,这显然不是最有效的观察者实现。例如,一个简单的优化可以是打包当前状态和最后一个奖励到一个 RPC 中,以减少通信开销。然而,目标是演示 RPC API,而不是构建 CartPole 的最佳求解器。所以,让我们保持逻辑简单,并在本例中明确这两个步骤。
import argparse
import gym
import torch.distributed.rpc as rpc
parser = argparse.ArgumentParser(
description="RPC Reinforcement Learning Example",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument('--world_size', default=2, type=int, metavar='W',
help='number of workers')
parser.add_argument('--log_interval', type=int, default=10, metavar='N',
help='interval between training status logs')
parser.add_argument('--gamma', type=float, default=0.99, metavar='G',
help='how much to value future rewards')
parser.add_argument('--seed', type=int, default=1, metavar='S',
help='random seed for reproducibility')
args = parser.parse_args()
class Observer:
def __init__(self):
self.id = rpc.get_worker_info().id
self.env = gym.make('CartPole-v1')
self.env.seed(args.seed)
def run_episode(self, agent_rref):
state, ep_reward = self.env.reset(), 0
for _ in range(10000):
# send the state to the agent to get an action
action = agent_rref.rpc_sync().select_action(self.id, state)
# apply the action to the environment, and get the reward
state, reward, done, _ = self.env.step(action)
# report the reward to the agent for training purpose
agent_rref.rpc_sync().report_reward(self.id, reward)
# finishes after the number of self.env._max_episode_steps
if done:
break
代理的代码稍微复杂一些,我们将将其分解成多个部分。在这个例子中,代理同时充当训练者和主控,向多个分布式观察者发送命令以运行剧集,并本地记录所有动作和奖励,这些将在每个剧集后的训练阶段使用。下面的代码显示了 Agent
构造函数,其中大多数行都是初始化各种组件。最后的循环在远程工作者上初始化观察者,并将 RRefs
本地保留给那些观察者。代理将使用这些观察者 RRefs
来发送命令。应用程序无需担心 RRefs
的生命周期。每个 RRef
的所有者维护一个引用计数映射来跟踪其生命周期,并保证只要有任何活跃用户使用该 RRef
,远程数据对象就不会被删除。请参阅 RRef
设计文档以获取详细信息。
import gym
import numpy as np
import torch
import torch.distributed.rpc as rpc
import torch.optim as optim
from torch.distributed.rpc import RRef, rpc_async, remote
from torch.distributions import Categorical
class Agent:
def __init__(self, world_size):
self.ob_rrefs = []
self.agent_rref = RRef(self)
self.rewards = {}
self.saved_log_probs = {}
self.policy = Policy()
self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
self.eps = np.finfo(np.float32).eps.item()
self.running_reward = 0
self.reward_threshold = gym.make('CartPole-v1').spec.reward_threshold
for ob_rank in range(1, world_size):
ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
self.ob_rrefs.append(remote(ob_info, Observer))
self.rewards[ob_info.id] = []
self.saved_log_probs[ob_info.id] = []
接下来,代理向观察者公开了两个 API,用于选择动作和报告奖励。这些函数仅在代理本地运行,但将通过 RPC 由观察者触发。
class Agent:
...
def select_action(self, ob_id, state):
state = torch.from_numpy(state).float().unsqueeze(0)
probs = self.policy(state)
m = Categorical(probs)
action = m.sample()
self.saved_log_probs[ob_id].append(m.log_prob(action))
return action.item()
def report_reward(self, ob_id, reward):
self.rewards[ob_id].append(reward)
让我们在代理上添加一个 run_episode
函数,该函数告诉所有观察者执行一个场景。在这个函数中,它首先创建一个列表来收集异步 RPC 的未来,然后遍历所有观察者 RRefs
来进行异步 RPC。在这些 RPC 中,代理还将自己的 RRef
传递给观察者,以便观察者也可以调用代理上的函数。如上图所示,每个观察者都会向代理发送 RPC,这些 RPC 是嵌套的。在每个场景之后, saved_log_probs
和 rewards
将包含记录的动作概率和奖励。
class Agent:
...
def run_episode(self):
futs = []
for ob_rref in self.ob_rrefs:
# make async RPC to kick off an episode on all observers
futs.append(
rpc_async(
ob_rref.owner(),
ob_rref.rpc_sync().run_episode,
args=(self.agent_rref,)
)
)
# wait until all obervers have finished this episode
for fut in futs:
fut.wait()
最后,在完成一个场景之后,代理需要训练模型,这在上面的 finish_episode
函数中实现。这个函数中没有 RPC,它主要借鉴了单线程示例。因此,我们省略了其内容的描述。
class Agent:
...
def finish_episode(self):
# joins probs and rewards from different observers into lists
R, probs, rewards = 0, [], []
for ob_id in self.rewards:
probs.extend(self.saved_log_probs[ob_id])
rewards.extend(self.rewards[ob_id])
# use the minimum observer reward to calculate the running reward
min_reward = min([sum(self.rewards[ob_id]) for ob_id in self.rewards])
self.running_reward = 0.05 * min_reward + (1 - 0.05) * self.running_reward
# clear saved probs and rewards
for ob_id in self.rewards:
self.rewards[ob_id] = []
self.saved_log_probs[ob_id] = []
policy_loss, returns = [], []
for r in rewards[::-1]:
R = r + args.gamma * R
returns.insert(0, R)
returns = torch.tensor(returns)
returns = (returns - returns.mean()) / (returns.std() + self.eps)
for log_prob, R in zip(probs, returns):
policy_loss.append(-log_prob * R)
self.optimizer.zero_grad()
policy_loss = torch.cat(policy_loss).sum()
policy_loss.backward()
self.optimizer.step()
return min_reward
使用 Policy
、 Observer
和 Agent
类,我们已准备好启动多个进程以执行分布式训练。在本例中,所有进程运行相同的 run_worker
函数,并使用排名来区分其角色。排名 0 始终是代理,而所有其他排名都是观察者。代理作为主节点,通过反复调用 run_episode
和 finish_episode
直到运行奖励超过环境指定的奖励阈值。所有观察者被动等待来自代理的命令。代码被 rpc.init_rpc 和 rpc.shutdown 包裹,分别用于初始化和终止 RPC 实例。更多详细信息请参阅 API 页面。
import os
from itertools import count
import torch.multiprocessing as mp
AGENT_NAME = "agent"
OBSERVER_NAME="obs{}"
def run_worker(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
if rank == 0:
# rank0 is the agent
rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)
agent = Agent(world_size)
print(f"This will run until reward threshold of {agent.reward_threshold}"
" is reached. Ctrl+C to exit.")
for i_episode in count(1):
agent.run_episode()
last_reward = agent.finish_episode()
if i_episode % args.log_interval == 0:
print(f"Episode {i_episode}\tLast reward: {last_reward:.2f}\tAverage reward: "
f"{agent.running_reward:.2f}")
if agent.running_reward > agent.reward_threshold:
print(f"Solved! Running reward is now {agent.running_reward}!")
break
else:
# other ranks are the observer
rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
# observers passively waiting for instructions from the agent
# block until all rpcs finish, and shutdown the RPC instance
rpc.shutdown()
mp.spawn(
run_worker,
args=(args.world_size, ),
nprocs=args.world_size,
join=True
)
以下是使用 world_size=2 进行训练时的部分示例输出。
This will run until reward threshold of 475.0 is reached. Ctrl+C to exit.
Episode 10 Last reward: 26.00 Average reward: 10.01
Episode 20 Last reward: 16.00 Average reward: 11.27
Episode 30 Last reward: 49.00 Average reward: 18.62
Episode 40 Last reward: 45.00 Average reward: 26.09
Episode 50 Last reward: 44.00 Average reward: 30.03
Episode 60 Last reward: 111.00 Average reward: 42.23
Episode 70 Last reward: 131.00 Average reward: 70.11
Episode 80 Last reward: 87.00 Average reward: 76.51
Episode 90 Last reward: 86.00 Average reward: 95.93
Episode 100 Last reward: 13.00 Average reward: 123.93
Episode 110 Last reward: 33.00 Average reward: 91.39
Episode 120 Last reward: 73.00 Average reward: 76.38
Episode 130 Last reward: 137.00 Average reward: 88.08
Episode 140 Last reward: 89.00 Average reward: 104.96
Episode 150 Last reward: 97.00 Average reward: 98.74
Episode 160 Last reward: 150.00 Average reward: 100.87
Episode 170 Last reward: 126.00 Average reward: 104.38
Episode 180 Last reward: 500.00 Average reward: 213.74
Episode 190 Last reward: 322.00 Average reward: 300.22
Episode 200 Last reward: 165.00 Average reward: 272.71
Episode 210 Last reward: 168.00 Average reward: 233.11
Episode 220 Last reward: 184.00 Average reward: 195.02
Episode 230 Last reward: 284.00 Average reward: 208.32
Episode 240 Last reward: 395.00 Average reward: 247.37
Episode 250 Last reward: 500.00 Average reward: 335.42
Episode 260 Last reward: 500.00 Average reward: 386.30
Episode 270 Last reward: 500.00 Average reward: 405.29
Episode 280 Last reward: 500.00 Average reward: 443.29
Episode 290 Last reward: 500.00 Average reward: 464.65
Solved! Running reward is now 475.3163778435275!
在本例中,我们展示了如何使用 RPC 作为通信工具在工作者之间传递数据,以及如何使用 RRef 引用远程对象。确实,您可以直接在 ProcessGroup
send
和 recv
API 上构建整个结构,或者使用其他通信/RPC 库。然而,通过使用 torch.distributed.rpc,您可以在底层获得原生支持和持续优化的性能。
接下来,我们将展示如何结合 RPC 和 RRef 与分布式自动微分和分布式优化器来执行分布式模型并行训练。
使用分布式自动微分和分布式优化器实现的分布式 RNN
在本节中,我们使用一个 RNN 模型来展示如何使用 RPC API 构建分布式模型并行训练。示例 RNN 模型非常小,可以轻松地适应单个 GPU,但我们仍然将其层分割到两个不同的工作者上以演示这一概念。开发者可以将类似的技术应用于将更大的模型分布在多个设备和机器上。
RNN 模型设计借鉴了 PyTorch 示例仓库中的词语言模型,包含三个主要组件:一个嵌入表、一个 LSTM
层和一个解码器。下面的代码将嵌入表和解码器包装成子模块,以便它们的构造函数可以传递给 RPC API。在 EmbeddingTable
子模块中,我们故意将 Embedding
层放在 GPU 上以覆盖使用场景。在 v1.4 中,RPC 始终在目标工作者上创建 CPU 张量参数或返回值。如果函数接受 GPU 张量,您需要将其显式地移动到正确的设备。
class EmbeddingTable(nn.Module):
r"""
Encoding layers of the RNNModel
"""
def __init__(self, ntoken, ninp, dropout):
super(EmbeddingTable, self).__init__()
self.drop = nn.Dropout(dropout)
self.encoder = nn.Embedding(ntoken, ninp).cuda()
self.encoder.weight.data.uniform_(-0.1, 0.1)
def forward(self, input):
return self.drop(self.encoder(input.cuda()).cpu()
class Decoder(nn.Module):
def __init__(self, ntoken, nhid, dropout):
super(Decoder, self).__init__()
self.drop = nn.Dropout(dropout)
self.decoder = nn.Linear(nhid, ntoken)
self.decoder.bias.data.zero_()
self.decoder.weight.data.uniform_(-0.1, 0.1)
def forward(self, output):
return self.decoder(self.drop(output))
使用上述子模块,我们现在可以使用 RPC 将它们拼接在一起以创建一个 RNN 模型。下面的代码中 ps
代表一个参数服务器,它托管嵌入表和解码器的参数。构造函数使用远程 API 在参数服务器上创建一个 EmbeddingTable
对象和一个 Decoder
对象,并在本地创建一个 LSTM
子模块。在正向传播过程中,训练器使用 EmbeddingTable
RRef
查找远程子模块,并通过 RPC 将输入数据传递给 EmbeddingTable
,并获取查找结果。然后,它将嵌入通过本地的 LSTM
层,最后使用另一个 RPC 将输出发送到 Decoder
子模块。一般来说,为了实现分布式模型并行训练,开发人员可以将模型划分为子模块,远程调用 RPC 创建子模块实例,并在需要时使用 RRef
查找它们。正如您在下面的代码中所看到的,它看起来与单机模型并行训练非常相似。主要区别是使用 RPC 函数替换 Tensor.to(device)
。
class RNNModel(nn.Module):
def __init__(self, ps, ntoken, ninp, nhid, nlayers, dropout=0.5):
super(RNNModel, self).__init__()
# setup embedding table remotely
self.emb_table_rref = rpc.remote(ps, EmbeddingTable, args=(ntoken, ninp, dropout))
# setup LSTM locally
self.rnn = nn.LSTM(ninp, nhid, nlayers, dropout=dropout)
# setup decoder remotely
self.decoder_rref = rpc.remote(ps, Decoder, args=(ntoken, nhid, dropout))
def forward(self, input, hidden):
# pass input to the remote embedding table and fetch emb tensor back
emb = _remote_method(EmbeddingTable.forward, self.emb_table_rref, input)
output, hidden = self.rnn(emb, hidden)
# pass output to the rremote decoder and get the decoded output back
decoded = _remote_method(Decoder.forward, self.decoder_rref, output)
return decoded, hidden
在介绍分布式优化器之前,我们先添加一个辅助函数来生成模型参数的 RRefs 列表,这些列表将被分布式优化器消费。在本地训练中,应用程序可以通过 Module.parameters()
来获取所有参数张量的引用,并将其传递给本地优化器进行后续更新。然而,在分布式训练场景中,由于一些参数位于远程机器上,相同的 API 无法使用。因此,分布式优化器不是获取参数列表 Tensors
,而是获取一个列表 RRefs
,每个模型参数一个 RRef
,无论是本地还是远程模型参数。辅助函数很简单,只需调用 Module.parameters()
,并在每个参数上创建一个本地的 RRef
。
def _parameter_rrefs(module):
param_rrefs = []
for param in module.parameters():
param_rrefs.append(RRef(param))
return param_rrefs
然后,由于 RNNModel
包含三个子模块,我们需要调用 _parameter_rrefs
三次,并将其封装到另一个辅助函数中。
class RNNModel(nn.Module):
...
def parameter_rrefs(self):
remote_params = []
# get RRefs of embedding table
remote_params.extend(_remote_method(_parameter_rrefs, self.emb_table_rref))
# create RRefs for local parameters
remote_params.extend(_parameter_rrefs(self.rnn))
# get RRefs of decoder
remote_params.extend(_remote_method(_parameter_rrefs, self.decoder_rref))
return remote_params
现在,我们准备实现训练循环。初始化模型参数后,我们创建 RNNModel
和 DistributedOptimizer
。分布式优化器将获取一个参数 RRefs
列表,找到所有不同的拥有者工作者,并使用给定的参数(即 lr=0.05
)在每个拥有者工作者上创建指定的本地优化器(即在这种情况下,您也可以使用其他本地优化器)。
在训练循环中,它首先创建一个分布式自动微分上下文,这将帮助分布式自动微分引擎找到梯度以及涉及的 RPC 发送/接收函数。分布式自动微分引擎的设计细节可以在其设计说明中找到。然后,它就像一个本地模型一样启动前向传递,并运行分布式反向传递。对于分布式反向传递,您只需要指定一个根列表,在这种情况下,是损失 Tensor
。分布式自动微分引擎将自动遍历分布式图并正确写入梯度。接下来,它在分布式优化器上运行 step
函数,这将联系所有涉及的本地优化器以更新模型参数。与本地训练相比,一个小的不同之处在于您不需要运行 zero_grad()
,因为每个自动微分上下文都有专门的空间来存储梯度,并且由于我们为每个迭代创建一个上下文,因此来自不同迭代的梯度不会累积到相同的 Tensors
集合中。
def run_trainer():
batch = 5
ntoken = 10
ninp = 2
nhid = 3
nindices = 3
nlayers = 4
hidden = (
torch.randn(nlayers, nindices, nhid),
torch.randn(nlayers, nindices, nhid)
)
model = rnn.RNNModel('ps', ntoken, ninp, nhid, nlayers)
# setup distributed optimizer
opt = DistributedOptimizer(
optim.SGD,
model.parameter_rrefs(),
lr=0.05,
)
criterion = torch.nn.CrossEntropyLoss()
def get_next_batch():
for _ in range(5):
data = torch.LongTensor(batch, nindices) % ntoken
target = torch.LongTensor(batch, ntoken) % nindices
yield data, target
# train for 10 iterations
for epoch in range(10):
for data, target in get_next_batch():
# create distributed autograd context
with dist_autograd.context() as context_id:
hidden[0].detach_()
hidden[1].detach_()
output, hidden = model(data, hidden)
loss = criterion(output, target)
# run distributed backward pass
dist_autograd.backward(context_id, [loss])
# run distributed optimizer
opt.step(context_id)
# not necessary to zero grads since they are
# accumulated into the distributed autograd context
# which is reset every iteration.
print("Training epoch {}".format(epoch))
最后,让我们添加一些粘合代码来启动参数服务器和训练进程。
def run_worker(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
if rank == 1:
rpc.init_rpc("trainer", rank=rank, world_size=world_size)
_run_trainer()
else:
rpc.init_rpc("ps", rank=rank, world_size=world_size)
# parameter server do nothing
pass
# block until all rpcs finish
rpc.shutdown()
if __name__=="__main__":
world_size = 2
mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)