Shortcuts

使用TensorPipe CUDA RPC进行设备到设备通信

Note

直接设备到设备RPC(CUDA RPC)在PyTorch 1.8中作为原型功能引入。此API可能会发生变化。

在本教程中,您将学习:

  • CUDA RPC的高级概念。

  • 如何使用CUDA RPC。

要求

什么是CUDA RPC?

CUDA RPC支持直接从本地CUDA内存向远程CUDA内存发送张量。在1.8版本发布之前,PyTorch RPC只接受CPU张量。 因此,当应用程序需要通过RPC发送CUDA张量时,它必须首先将张量移动到调用方的CPU,通过RPC发送, 然后在被调用方将其移动到目标设备,这会导致不必要的同步和D2H和H2D复制。从1.8版本开始,RPC允许用户使用 set_device_map API配置每个进程的全局设备映射,指定如何将本地设备映射到远程设备。具体来说,如果``worker0``的设备映射有一个条目 "worker1" : {"cuda:0" : "cuda:1"},所有来自 worker0"cuda:0" 上的RPC参数 将直接发送到 worker1``的”cuda:1”`` 。RPC的响应将使用调用方设备映射的逆映射,即如果 worker1 返回 "cuda:1" 上的张量,它将直接发送到 worker0"cuda:0" 。 所有预期的设备到设备直接通信必须在每个进程的设备映射中指定。否则,只允许CPU张量。

在底层,PyTorch RPC依赖于 TensorPipe 作为通信后端。 PyTorch RPC从每个请求或响应中提取所有张量到一个列表中,并将其他所有内容打包成二进制负载。 然后,TensorPipe将根据张量设备类型和调用方和被调用方的通道可用性,自动为每个张量选择通信通道。 现有的 TensorPipe 通道涵盖 NVLink、InfiniBand、SHM、CMA、TCP 等。

如何使用CUDA RPC?

以下代码展示了如何使用CUDA RPC。该模型包含两个线性层,被分成两个分片。这两个分片分别放置在 worker0worker1 上, worker0 作为主节点驱动前向和后向传播。请注意,我们有意跳过了 DistributedOptimizer 以突出使用 CUDA RPC 时的性能改进。实验重复前向和后向传播 10 次,并测量总执行时间。它比较了使用 CUDA RPC 与手动暂存到 CPU 内存 并使用 CPU RPC 的情况。

import torch
import torch.distributed.autograd as autograd
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp
import torch.nn as nn

import os
import time


class MyModule(nn.Module):
    def __init__(self, device, comm_mode):
        super().__init__()
        self.device = device
        self.linear = nn.Linear(1000, 1000).to(device)
        self.comm_mode = comm_mode

    def forward(self, x):
        # 如果x已经在self.device上,x.to()是一个空操作
        y = self.linear(x.to(self.device))
        return y.cpu() if self.comm_mode == "cpu" else y

    def parameter_rrefs(self):
        return [rpc.RRef(p) for p in self.parameters()]


def measure(comm_mode):
    # "worker0/cuda:0"上的本地模块
    lm = MyModule("cuda:0", comm_mode)
    # "worker1/cuda:1"上的远程模块
    rm = rpc.remote("worker1", MyModule, args=("cuda:1", comm_mode))
    # 准备随机输入
    x = torch.randn(1000, 1000).cuda(0)

    tik = time.time()
    for _ in range(10):
        with autograd.context() as ctx:
            y = rm.rpc_sync().forward(lm(x))
            autograd.backward(ctx, [y.sum()])
    # 在"cuda:0"上同步,以确保所有待处理的CUDA操作都包含在测量中
    torch.cuda.current_stream("cuda:0").synchronize()
    tok = time.time()
    print(f"{comm_mode} RPC总执行时间:{tok - tik}")


def run_worker(rank):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=128)

    if rank == 0:
        options.set_device_map("worker1", {0: 1})
        rpc.init_rpc(
            f"worker{rank}",
            rank=rank,
            world_size=2,
            rpc_backend_options=options
        )
        measure(comm_mode="cpu")
        measure(comm_mode="cuda")
    else:
        rpc.init_rpc(
            f"worker{rank}",
            rank=rank,
            world_size=2,
            rpc_backend_options=options
        )

    # 阻塞直到所有rpc完成
    rpc.shutdown()


if __name__=="__main__":
    world_size = 2
    mp.spawn(run_worker, nprocs=world_size, join=True)

输出显示如下,表明在这个实验中,CUDA RPC 可以帮助实现 34 倍的速度提升,相比于 CPU RPC。

cpu RPC总执行时间:2.3145179748535156秒
cuda RPC总执行时间:0.06867480278015137秒

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources