• 教程 >
  • PyTorch 菜谱 >
  • 使用 TensorPipe CUDA RPC 进行直接设备到设备通信
快捷键

使用 TensorPipe CUDA RPC 进行设备到设备直接通信

创建于:2025 年 4 月 1 日 | 最后更新:2025 年 4 月 1 日 | 最后验证:2024 年 11 月 5 日

备注

PyTorch 1.8 中引入了直接设备到设备 RPC(CUDA RPC)作为原型功能。此 API 可能会更改。

在本菜谱中,您将学习:

  • CUDA RPC 的高级概念

  • 如何使用 CUDA RPC

需求

什么是 CUDA RPC?

CUDA RPC 支持直接将本地 CUDA 内存中的 Tensors 发送到远程 CUDA 内存。在 v1.8 版本发布之前,PyTorch RPC 只接受 CPU Tensors。因此,当应用程序需要通过 RPC 发送 CUDA Tensor 时,必须在调用方先将 Tensor 移动到 CPU,通过 RPC 发送,然后再将 Tensor 移动到被调用方的目标设备,这会带来不必要的同步以及 D2H 和 H2D 的复制。自 v1.8 版本以来,RPC 允许用户使用 set_device_map API 配置每个进程的全局设备映射,指定如何将本地设备映射到远程设备。更具体地说,如果 worker0 的设备映射有 "worker1" : {"cuda:0" : "cuda:1"} 条目,那么来自 "cuda:0"worker0 上的所有 RPC 参数将直接发送到 "cuda:1" 上的 worker1 。RPC 的响应将使用调用方设备映射的逆映射,即如果 worker1"cuda:1" 返回一个 Tensor,它将被直接发送到 "cuda:0" 上的 worker0 。所有预期的设备到设备的直接通信都必须在进程的设备映射中指定。否则,只允许使用 CPU 张量。

在内部,PyTorch RPC 依赖于 TensorPipe 作为通信后端。PyTorch RPC 将每个请求或响应中的所有张量提取到一个列表中,并将其他所有内容打包成一个二进制有效载荷。然后,TensorPipe 将根据张量设备类型和调用方与被调用方上的通道可用性自动为每个张量选择一个通信通道。现有的 TensorPipe 通道包括 NVLink、InfiniBand、SHM、CMA、TCP 等。

如何使用 CUDA RPC?

下面的代码展示了如何使用 CUDA RPC。该模型包含两个线性层,并分为两个分片。两个分片分别放置在 worker0worker1worker0 作为主节点驱动正向和反向传播。请注意,我们有意跳过了 DistributedOptimizer,以突出使用 CUDA RPC 时的性能提升。实验重复执行了 10 次正向和反向传播,并测量了总执行时间。它比较了使用 CUDA RPC 与手动将数据阶段到 CPU 内存和使用 CPU RPC。

import torch
import torch.distributed.autograd as autograd
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import torch.nn as nn

import os
import time


class MyModule(nn.Module):
    def __init__(self, device, comm_mode):
        super().__init__()
        self.device = device
        self.linear = nn.Linear(1000, 1000).to(device)
        self.comm_mode = comm_mode

    def forward(self, x):
        # x.to() is a no-op if x is already on self.device
        y = self.linear(x.to(self.device))
        return y.cpu() if self.comm_mode == "cpu" else y

    def parameter_rrefs(self):
        return [rpc.RRef(p) for p in self.parameters()]


def measure(comm_mode):
    # local module on "worker0/cuda:0"
    lm = MyModule("cuda:0", comm_mode)
    # remote module on "worker1/cuda:1"
    rm = rpc.remote("worker1", MyModule, args=("cuda:1", comm_mode))
    # prepare random inputs
    x = torch.randn(1000, 1000).cuda(0)

    tik = time.time()
    for _ in range(10):
        with autograd.context() as ctx:
            y = rm.rpc_sync().forward(lm(x))
            autograd.backward(ctx, [y.sum()])
    # synchronize on "cuda:0" to make sure that all pending CUDA ops are
    # included in the measurements
    torch.cuda.current_stream("cuda:0").synchronize()
    tok = time.time()
    print(f"{comm_mode} RPC total execution time: {tok - tik}")


def run_worker(rank):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=128)

    if rank == 0:
        options.set_device_map("worker1", {0: 1})
        rpc.init_rpc(
            f"worker{rank}",
            rank=rank,
            world_size=2,
            rpc_backend_options=options
        )
        measure(comm_mode="cpu")
        measure(comm_mode="cuda")
    else:
        rpc.init_rpc(
            f"worker{rank}",
            rank=rank,
            world_size=2,
            rpc_backend_options=options
        )

    # block until all rpcs finish
    rpc.shutdown()


if __name__=="__main__":
    world_size = 2
    mp.spawn(run_worker, nprocs=world_size, join=True)

下面的输出显示了实验结果,表明与 CPU RPC 相比,CUDA RPC 在此实验中可以实现 34 倍的速度提升。

cpu RPC total execution time: 2.3145179748535156 Seconds
cuda RPC total execution time: 0.06867480278015137 Seconds

评分这个教程

© 版权所有 2024,PyTorch。

使用 Sphinx 构建,主题由 Read the Docs 提供。
//暂时添加调查链接

文档

访问 PyTorch 的全面开发者文档

查看文档

教程

获取初学者和高级开发者的深入教程

查看教程

资源

查找开发资源并获得您的疑问解答

查看资源