使用异步执行实现批处理 RPC 处理
创建时间:2025 年 4 月 1 日 | 最后更新时间:2025 年 4 月 1 日 | 最后验证:未验证
作者:沈丽
备注
在 github 上查看和编辑此教程。
前提条件:
本教程演示了如何使用@rpc.functions.async_execution 装饰器构建批处理 RPC 应用程序,该装饰器通过减少阻塞的 RPC 线程数量并合并被调用方的 CUDA 操作来加速训练。这与 TorchServe 中的批推理具有相同的思想。
备注
本教程需要 PyTorch v1.6.0 或更高版本。
基础 §
前面的教程已经展示了使用 torch.distributed.rpc 构建分布式训练应用程序的步骤,但它们没有详细说明在处理 RPC 请求时调用方会发生什么。从 PyTorch v1.5 版本开始,每个 RPC 请求都会阻塞调用方的一个线程来执行该请求中的函数,直到该函数返回。这对于许多用例来说都是可行的,但有一个需要注意的地方。如果用户函数在 IO 上阻塞,例如,使用嵌套 RPC 调用或信号,例如等待另一个 RPC 请求解除阻塞,调用方的 RPC 线程将不得不空闲等待,直到 IO 完成或信号事件发生。因此,RPC 调用方可能会使用比必要的更多线程。这个问题产生的原因是 RPC 将用户函数视为黑盒,对函数内部发生的事情了解很少。为了允许用户函数让出并释放 RPC 线程,需要向 RPC 系统提供更多的提示。
自 v1.6.0 版本以来,PyTorch 通过引入两个新概念来解决此问题:
一种 torch.futures.Future 类型,它封装了异步执行,并支持安装回调函数。
一个允许应用程序通知调用方目标函数将返回一个未来,并且在执行过程中可以暂停和多次 yield 的@rpc.functions.async_execution 装饰器。
使用这两个工具,应用程序代码可以将用户函数拆分为多个更小的函数,将它们作为回调链在 Future
对象上串联起来,并返回包含最终结果的 Future
。在调用方一侧,当获取 Future
对象时,它将后续的 RPC 响应准备和通信作为回调安装,这些回调将在最终结果准备好时触发。这样,调用方就不再需要阻塞一个线程,等待最终返回值就绪。请参阅@rpc.functions.async_execution 的 API 文档中的简单示例。
除了减少调用方的空闲线程数量外,这些工具还有助于使批量 RPC 处理更加容易和快速。本教程的以下两节演示了如何使用@rpc.functions.async_execution 装饰器构建分布式批量更新参数服务器和批量处理强化学习应用程序。
批量更新参数服务器 ¶
考虑一个具有一个参数服务器(PS)和多个训练器的同步参数服务器训练应用。在这个应用中,PS 保存参数并等待所有训练器报告梯度。在每次迭代中,它等待接收所有训练器的梯度,然后一次性更新所有参数。下面的代码展示了 PS 类的实现。 update_and_fetch_model
方法使用 @rpc.functions.async_execution
装饰,将由训练器调用。每次调用都返回一个 Future
对象,该对象将被填充以更新模型。大多数训练器启动的调用只是将梯度累积到 .grad
字段,立即返回,并释放 PS 上的 RPC 线程。最后到达的训练器将触发优化器步骤并消耗所有之前报告的梯度。然后它使用更新后的模型设置 future_model
,这反过来又通过 Future
对象通知所有其他训练器的先前请求,并将更新后的模型发送给所有训练器。
import threading
import torchvision
import torch
import torch.distributed.rpc as rpc
from torch import optim
num_classes, batch_update_size = 30, 5
class BatchUpdateParameterServer(object):
def __init__(self, batch_update_size=batch_update_size):
self.model = torchvision.models.resnet50(num_classes=num_classes)
self.lock = threading.Lock()
self.future_model = torch.futures.Future()
self.batch_update_size = batch_update_size
self.curr_update_size = 0
self.optimizer = optim.SGD(self.model.parameters(), lr=0.001, momentum=0.9)
for p in self.model.parameters():
p.grad = torch.zeros_like(p)
def get_model(self):
return self.model
@staticmethod
@rpc.functions.async_execution
def update_and_fetch_model(ps_rref, grads):
# Using the RRef to retrieve the local PS instance
self = ps_rref.local_value()
with self.lock:
self.curr_update_size += 1
# accumulate gradients into .grad field
for p, g in zip(self.model.parameters(), grads):
p.grad += g
# Save the current future_model and return it to make sure the
# returned Future object holds the correct model even if another
# thread modifies future_model before this thread returns.
fut = self.future_model
if self.curr_update_size >= self.batch_update_size:
# update the model
for p in self.model.parameters():
p.grad /= self.batch_update_size
self.curr_update_size = 0
self.optimizer.step()
self.optimizer.zero_grad()
# by settiing the result on the Future object, all previous
# requests expecting this updated model will be notified and
# the their responses will be sent accordingly.
fut.set_result(self.model)
self.future_model = torch.futures.Future()
return fut
对于训练器来说,它们都使用 PS 中相同的参数集进行初始化。在每次迭代中,每个训练器首先运行前向和反向传播来生成局部梯度。然后,每个训练器通过 RPC 将梯度报告给 PS,并通过同一 RPC 请求的返回值获取更新的参数。在训练器的实现中,目标函数是否标记为 @rpc.functions.async_execution
没有区别。训练器只需调用 update_and_fetch_model
使用 rpc_sync
,这将阻塞训练器,直到返回更新的模型。
batch_size, image_w, image_h = 20, 64, 64
class Trainer(object):
def __init__(self, ps_rref):
self.ps_rref, self.loss_fn = ps_rref, torch.nn.MSELoss()
self.one_hot_indices = torch.LongTensor(batch_size) \
.random_(0, num_classes) \
.view(batch_size, 1)
def get_next_batch(self):
for _ in range(6):
inputs = torch.randn(batch_size, 3, image_w, image_h)
labels = torch.zeros(batch_size, num_classes) \
.scatter_(1, self.one_hot_indices, 1)
yield inputs.cuda(), labels.cuda()
def train(self):
name = rpc.get_worker_info().name
# get initial model parameters
m = self.ps_rref.rpc_sync().get_model().cuda()
# start training
for inputs, labels in self.get_next_batch():
self.loss_fn(m(inputs), labels).backward()
m = rpc.rpc_sync(
self.ps_rref.owner(),
BatchUpdateParameterServer.update_and_fetch_model,
args=(self.ps_rref, [p.grad for p in m.cpu().parameters()]),
).cuda()
在本教程中,我们跳过了启动多个进程的代码,请参阅示例仓库以获取完整实现。请注意,即使没有@rpc.functions.async_execution 装饰器,也可以实现批处理。但是,这需要要么在 PS 上阻塞更多的 RPC 线程,要么使用另一轮 RPC 来获取更新的模型,后者会增加更多的代码复杂性和通信开销。
本节使用一个简单的参数服务器训练示例来展示如何使用@rpc.functions.async_execution 装饰器实现批处理 RPC 应用。在下一节中,我们使用批处理重新实现上一节中分布式 RPC 框架入门教程中的强化学习示例,并展示其对训练速度的影响。
批处理 CartPole 求解器
本节使用 OpenAI Gym 的 CartPole-v1 作为示例,以展示批处理 RPC 的性能影响。请注意,由于目标是演示@rpc.functions.async_execution 的使用,而不是构建最佳的 CartPole 求解器或解决大多数不同的强化学习问题,我们使用了非常简单的策略和奖励计算策略,并专注于多观察者单代理批处理 RPC 的实现。我们使用与之前教程中类似的 Policy
模型,如下所示。与之前的教程相比,其区别在于其构造函数接受一个额外的 batch
参数,该参数控制 dim
参数的 F.softmax
,因为随着批处理, forward
函数中的 x
参数包含来自多个观察者的状态,因此维度需要相应地改变。其他一切保持不变。
import argparse
import torch.nn as nn
import torch.nn.functional as F
parser = argparse.ArgumentParser(description='PyTorch RPC Batch RL example')
parser.add_argument('--gamma', type=float, default=1.0, metavar='G',
help='discount factor (default: 1.0)')
parser.add_argument('--seed', type=int, default=543, metavar='N',
help='random seed (default: 543)')
parser.add_argument('--num-episode', type=int, default=10, metavar='E',
help='number of episodes (default: 10)')
args = parser.parse_args()
torch.manual_seed(args.seed)
class Policy(nn.Module):
def __init__(self, batch=True):
super(Policy, self).__init__()
self.affine1 = nn.Linear(4, 128)
self.dropout = nn.Dropout(p=0.6)
self.affine2 = nn.Linear(128, 2)
self.dim = 2 if batch else 1
def forward(self, x):
x = self.affine1(x)
x = self.dropout(x)
x = F.relu(x)
action_scores = self.affine2(x)
return F.softmax(action_scores, dim=self.dim)
Observer
的构造函数也相应调整。它也接受一个 batch
参数,该参数控制它使用哪个 Agent
函数来选择动作。在批处理模式下,它将在稍后介绍的 Agent
上调用 select_action_batch
函数,并将该函数装饰为@rpc.functions.async_execution。
import gym
import torch.distributed.rpc as rpc
class Observer:
def __init__(self, batch=True):
self.id = rpc.get_worker_info().id - 1
self.env = gym.make('CartPole-v1')
self.env.seed(args.seed)
self.select_action = Agent.select_action_batch if batch else Agent.select_action
与之前的教程《分布式 RPC 框架入门》相比,观察者的行为略有不同。当环境停止时,它不会退出,而是在每个回合中始终运行 n_steps
次迭代。当环境返回时,观察者简单地重置环境并重新开始。这种设计使得智能体将从每个观察者接收固定数量的状态,因此可以将它们打包成一个固定大小的张量。在每一步, Observer
使用 RPC 将其状态发送到 Agent
,并通过返回值获取动作。在每个回合结束时,它将所有步骤的奖励返回给 Agent
。请注意,这个 run_episode
函数将由 Agent
使用 RPC 调用。因此,在这个函数中的 rpc_sync
调用将是一个嵌套的 RPC 调用。我们可以将这个函数标记为 @rpc.functions.async_execution
,以避免在 Observer
上阻塞一个线程。然而,瓶颈是 Agent
而不是 Observer
,因此阻塞 Observer
进程上的一个线程应该是可以的。
import torch
class Observer:
...
def run_episode(self, agent_rref, n_steps):
state, ep_reward = self.env.reset(), NUM_STEPS
rewards = torch.zeros(n_steps)
start_step = 0
for step in range(n_steps):
state = torch.from_numpy(state).float().unsqueeze(0)
# send the state to the agent to get an action
action = rpc.rpc_sync(
agent_rref.owner(),
self.select_action,
args=(agent_rref, self.id, state)
)
# apply the action to the environment, and get the reward
state, reward, done, _ = self.env.step(action)
rewards[step] = reward
if done or step + 1 >= n_steps:
curr_rewards = rewards[start_step:(step + 1)]
R = 0
for i in range(curr_rewards.numel() -1, -1, -1):
R = curr_rewards[i] + args.gamma * R
curr_rewards[i] = R
state = self.env.reset()
if start_step == 0:
ep_reward = min(ep_reward, step - start_step + 1)
start_step = step + 1
return [rewards, ep_reward]
构造函数 Agent
也接受一个 batch
参数,用于控制动作概率的批处理方式。在批处理模式下, saved_log_probs
包含一个张量列表,其中每个张量包含一个步骤中所有观察者的动作概率。在不批处理的情况下, saved_log_probs
是一个字典,键是观察者 ID,值是该观察者的动作概率列表。
import threading
from torch.distributed.rpc import RRef
class Agent:
def __init__(self, world_size, batch=True):
self.ob_rrefs = []
self.agent_rref = RRef(self)
self.rewards = {}
self.policy = Policy(batch).cuda()
self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
self.running_reward = 0
for ob_rank in range(1, world_size):
ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
self.ob_rrefs.append(rpc.remote(ob_info, Observer, args=(batch,)))
self.rewards[ob_info.id] = []
self.states = torch.zeros(len(self.ob_rrefs), 1, 4)
self.batch = batch
self.saved_log_probs = [] if batch else {k:[] for k in range(len(self.ob_rrefs))}
self.future_actions = torch.futures.Future()
self.lock = threading.Lock()
self.pending_states = len(self.ob_rrefs)
非批处理 select_acion
简单地将状态传递给策略,保存动作概率,并立即将动作返回给观察者。
from torch.distributions import Categorical
class Agent:
...
@staticmethod
def select_action(agent_rref, ob_id, state):
self = agent_rref.local_value()
probs = self.policy(state.cuda())
m = Categorical(probs)
action = m.sample()
self.saved_log_probs[ob_id].append(m.log_prob(action))
return action.item()
在批处理中,状态存储在一个 2D 张量 self.states
中,使用观察者 ID 作为行 ID。然后,通过安装回调函数到批生成的 self.future_actions
Future
对象上,该对象将使用观察者的 ID 索引特定的行。最后到达的观察者一次性将所有批处理状态通过策略,并相应地设置 self.future_actions
。当这种情况发生时,所有安装到 self.future_actions
上的回调函数将被触发,它们的返回值将用于填充链式 Future
对象,进而通知 Agent
准备和传达对其他观察者之前所有 RPC 请求的响应。
class Agent:
...
@staticmethod
@rpc.functions.async_execution
def select_action_batch(agent_rref, ob_id, state):
self = agent_rref.local_value()
self.states[ob_id].copy_(state)
future_action = self.future_actions.then(
lambda future_actions: future_actions.wait()[ob_id].item()
)
with self.lock:
self.pending_states -= 1
if self.pending_states == 0:
self.pending_states = len(self.ob_rrefs)
probs = self.policy(self.states.cuda())
m = Categorical(probs)
actions = m.sample()
self.saved_log_probs.append(m.log_prob(actions).t()[0])
future_actions = self.future_actions
self.future_actions = torch.futures.Future()
future_actions.set_result(actions.cpu())
return future_action
现在我们来定义如何将不同的 RPC 函数拼接在一起。 Agent
控制每个剧集的执行。它首先使用 rpc_async
在所有观察者上启动剧集,并阻塞在返回的未来中,这些未来将被填充上观察者奖励。注意,下面的代码使用 RRef 辅助函数 ob_rref.rpc_async()
在 ob_rref
RRef 的所有者上启动 run_episode
函数,并传入提供的参数。然后它将保存的动作概率和返回的观察者奖励转换为预期的数据格式,并启动训练步骤。最后,它重置所有状态并返回当前剧集的奖励。这个函数是运行一个剧集的入口点。
class Agent:
...
def run_episode(self, n_steps=0):
futs = []
for ob_rref in self.ob_rrefs:
# make async RPC to kick off an episode on all observers
futs.append(ob_rref.rpc_async().run_episode(self.agent_rref, n_steps))
# wait until all obervers have finished this episode
rets = torch.futures.wait_all(futs)
rewards = torch.stack([ret[0] for ret in rets]).cuda().t()
ep_rewards = sum([ret[1] for ret in rets]) / len(rets)
# stack saved probs into one tensor
if self.batch:
probs = torch.stack(self.saved_log_probs)
else:
probs = [torch.stack(self.saved_log_probs[i]) for i in range(len(rets))]
probs = torch.stack(probs)
policy_loss = -probs * rewards / len(rets)
policy_loss.sum().backward()
self.optimizer.step()
self.optimizer.zero_grad()
# reset variables
self.saved_log_probs = [] if self.batch else {k:[] for k in range(len(self.ob_rrefs))}
self.states = torch.zeros(len(self.ob_rrefs), 1, 4)
# calculate running rewards
self.running_reward = 0.5 * ep_rewards + 0.5 * self.running_reward
return ep_rewards, self.running_reward
代码的其余部分是正常的过程,包括启动和记录,这与其他 RPC 教程类似。在本教程中,所有观察者都被动地等待来自代理的命令。请参阅示例仓库以获取完整实现。
def run_worker(rank, world_size, n_episode, batch, print_log=True):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
if rank == 0:
# rank0 is the agent
rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)
agent = Agent(world_size, batch)
for i_episode in range(n_episode):
last_reward, running_reward = agent.run_episode(n_steps=NUM_STEPS)
if print_log:
print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
i_episode, last_reward, running_reward))
else:
# other ranks are the observer
rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
# observers passively waiting for instructions from agents
rpc.shutdown()
def main():
for world_size in range(2, 12):
delays = []
for batch in [True, False]:
tik = time.time()
mp.spawn(
run_worker,
args=(world_size, args.num_episode, batch),
nprocs=world_size,
join=True
)
tok = time.time()
delays.append(tok - tik)
print(f"{world_size}, {delays[0]}, {delays[1]}")
if __name__ == '__main__':
main()
批量 RPC 有助于将动作推理合并为更少的 CUDA 操作,从而减少了分摊开销。上面的 main
函数在批量和无批量模式下使用不同的观察者数量运行相同的代码,范围从 1 到 10。下面的图显示了使用默认参数值的不同世界大小的执行时间。结果证实了我们的预期,批量处理有助于加速训练。
