爱笑的小姐姐 · 2024年12月31日

CUDA-MODE 课程笔记 第17课 GPU 集合通信(NCCL)

我的课程笔记,欢迎关注:https://github.com/BBuf/how-to-optim-algorithm-in-cuda/tree/master/cuda-mode

这节课介绍了 NVIDIA 的 NCCL(NVIDIA Collective Communications Library)通信库,重点讲解了其在分布式深度学习中的应用。首先通过 PyTorch DDP 的实例,展示了 NCCL 如何实现高效的梯度同步。接着介绍了下 NCCL 的基本概念、API 使用、通信器初始化方式,并深入分析了 Ring AllReduce 算法的工作原理。

第 17 课,GPU 集合通信(NCCL)

课程笔记

image.png

Image

这张 Slides 介绍了 NVIDIA 的 NCCL (NVIDIA Collective Communications Library) 通信库,它是一个专门用于 GPU 之间快速数据通信的库,支持点对点和集体通信两种模式,提供了包括 Scatter、Gather、All-to-all、AllReduce、Broadcast、Reduce、AllGather 和 ReduceScatter 等多种通信原语,Slides 下方的图展示了 AllGather 操作的工作流程,然后在上方展示了一下 Broadcast 和 Scatter 的示意图。

Image

这张 Slides 简单展示了一下 nccl AllReduce(Reduce Sum)的操作。图片分为"Before"和"After"两个部分,显示了在 3 个 GPU(GPU 0、GPU 1 和 GPU 2)上的数据处理过程。在初始状态下,每个 GPU 都包含 3 个不同的数据块(GPU 0 有 A、B、C;GPU 1 有 D、E、F;GPU 2 有 G、H、I)。经过 AllReduce 操作后,每个 GPU 都得到了相同位置数据的总和(即 A+D+G、B+E+H、C+F+I),这样三个 GPU 最终都具有相同的计算结果。

Image

这张 Slides 讲了一下 DDP 里面需要 nccl 的地方,也就是同步全局梯度的时候。具体来说,在这个例子中,数据被分成两部分(x₀ 和 x₁)分别在两个 GPU 上处理。每个 GPU 运行相同的模型,计算各自的局部梯度(Local Gradients),然后通过 NCCL 的 AllReduce 操作来同步和平均所有 GPU 上的梯度。最后,每个 GPU 使用这个平均梯度来更新自己的模型参数,确保所有 GPU 上的模型保持同步。

Image

这张 Slides 更具体了一些,用一个 y = w 7 x 的例子,展示了 DDP 里面同步梯度的时候,如何使用 NCCL 的 AllReduce 操作来同步和平均所有 GPU 上的梯度。这个例子作者也提供了一个代码,代码如下:

# modified from https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

import torch  
import torch.distributed as dist  
import torch.nn as nn  
from torch.profiler import profile

from torch.nn.parallel import DistributedDataParallel as DDP

#  定义一个简单的玩具模型类  
class ToyModel(nn.Module):  
    def **init**(self):  
        super(ToyModel, self).**init**()  
        #  定义一个可训练参数 w,初始值为 5.0  
        self.w = nn.Parameter(torch.tensor(5.0))

def forward(self, x):  
        #  前向传播: y = w * 7 * x  
        return self.w * 7.0 * x

def demo_basic():  
    #  初始化进程组,使用 NCCL 后端  
    dist.init_process_group("nccl")  
    #  获取当前进程的 rank  
    rank = dist.get_rank()  
    print(f"Start running basic DDP example on rank {rank}.")

#  创建模型实例并移到对应 GPU  
    model = ToyModel().to(rank)  
    #  用 DDP 包装模型  
    ddp_model = DDP(model, device_ids=[rank])

#  使用 PyTorch profiler 收集性能数据  
    with profile() as prof:  
        #  创建输入张量,值为当前进程的 rank  
        x = torch.tensor(dist.get_rank(), dtype=torch.float)  
        #  前向传播  
        y = ddp_model(x)  
        #  打印计算结果  
        print(f"rank {rank}: y=w*7*x: {y.item()}={ddp_model.module.w.item()}_7_{x.item()}")  
        #  打印关于 w 的导数  
        print(f"rank {rank}: dy/dw=7*x: {7.0*x.item()}")  
        #  反向传播  
        y.backward()  
        #  打印经过 AllReduce 后的梯度  
        print(f"rank {rank}: reduced dy/dw: {ddp_model.module.w.grad.item()}")  
    # rank 0 负责导出性能跟踪文件  
    if rank == 0:  
        print("exporting trace")  
        prof.export_chrome_trace("trace_ddp_simple.json")  
    #  清理进程组  
    dist.destroy_process_group()

if **name** == "**main**":  
    print("Running")  
    demo_basic()

# torchrun --nnodes=1 --nproc_per_node=2 --rdzv_id=100 --rdzv_backend=c10d --rdzv_endpoint=localhost:29400 ddp_simple.py  

接着作者给出了一个稍微完善一些的例子,由 Linear 和 ReLU 组成的网络,有 optimizer 更新参数的过程,代码如下:

# modified from https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

import torch  
import torch.distributed as dist  
import torch.nn as nn

from torch.nn.parallel import DistributedDataParallel as DDP  
from torch.profiler import profile  
import torch.optim as optim

SIZE = 4000

class ToyModel(nn.Module):  
    def **init**(self):  
        super(ToyModel, self).**init**()  
        self.net1 = nn.Linear(SIZE, SIZE)  
        self.relu = nn.ReLU()  
        self.net2 = nn.Linear(SIZE, SIZE)  
        self.net3 = nn.Linear(SIZE, SIZE)

def forward(self, x):  
        return self.net3(self.relu(self.net2(self.relu(self.net1(x)))))

def demo_basic():  
    dist.init_process_group("nccl")  
    rank = dist.get_rank()  
    print(f"Start running basic DDP example on rank {rank}.")

model = ToyModel().to(rank)  
    ddp_model = DDP(model, bucket_cap_mb=25, device_ids=[rank])

loss_fn = nn.MSELoss()  
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

with profile(  
        record_shapes=True,  
        activities=[
            torch.profiler.ProfilerActivity.CPU,
            torch.profiler.ProfilerActivity.CUDA,
        ],  
    ) as prof:  
        for i in range(10):  
            optimizer.zero_grad()  
            outputs = ddp_model(torch.randn(1000, SIZE, device=rank))  
            labels = torch.randn(1000, SIZE, device=rank)  
            loss_fn(outputs, labels).backward()  
            optimizer.step()  
    if rank == 0:  
        prof.export_chrome_trace("trace_ddp_example.json")

if **name** == "**main**":  
    demo_basic()

# torchrun --nnodes=1 --nproc_per_node=2 --rdzv_id=100 --rdzv_backend=c10d --rdzv_endpoint=localhost:29400 ddp_example.py  

作者分析了几分钟这个代码中一个 iter 的 pytorch profiler 结果,我们可以看到前向 Pass,反向 Pass,优化器更新参数,以及 AllReduce 的通信时间以及部分 AllReduce 被重叠到了反向计算中。这就引入到了下一张 slides。

Image

这里作者讲了一下 DDP 里面的 AllReduce 是怎么和 Backward Pass 重叠的,这个建议阅读这篇博客:https://zhuanlan.zhihu.com/p/... ,从这张 Slides 的 PyTorch Profiler 图我们也可以发现一些其它信息,例如在同一个 Stream 上的 kernel 是顺序执行,所以为了重叠计算和通信这里使用了两个 Stream。由于网络最开始的几个层必须等待梯度计算完毕才能开始 AllReduce,所以存在无法重叠的层。

Image

这张 Slides 提了一下 yTorch DDP 的内部机制,包括:

  • DDP 的梯度同步机制:
  • 使用 autograd hooks 在构建时注册,用于触发梯度同步
  • Reducer 组件会异步执行 allreduce 操作来计算所有进程间的梯度平均值
  • 计算完成后,平均后的梯度会被写入所有参数的 param.grad 字段
  • 在反向传播完成后,不同 DDP 进程中相同参数的梯度值应该是一致的
  • 通信后端支持:
  • NCCL
  • MPI
  • Gloo
  • DDP 支持多种通信后端,包括:
  • 具体实现:
  • NCCL API 的调用是在 PyTorch 的 ProcessGroupNCCL.cpp 文件中通过 Reducer 完成的

Image

这张 Slides 开始介绍 NCCL 库中的 nccl AllReduce API 函数。该函数用于对长度为 count 的数据数组进行规约(reduce)操作,使用指定的 op 操作符进行计算,并将相同的结果复制到每个 recvbuff 中。当 sendbuff 和 recvbuff 指向相同位置时,会执行原地操作。这是一个在分布式深度学习中常用的集合通信操作,用于在多个 GPU 之间同步和聚合数据。

Image

这张 Slides 介绍了 NCCL 通信器对象的两种使用场景:一种是每个 CPU 进程对应一个 GPU 的情况,此时 root 进程会生成唯一 ID 并广播给所有进程,所有进程用相同的 ID 和唯一的 rank 初始化通信器例如 MPI;另一种是单个 CPU 进程管理多个 GPU 的情况,这时不需要广播 ID,而是通过循环来初始化每个 rank,并可以使用封装好的 ncclCommInitAll 函数来简化这个过程。Slides 右侧的代码示例展示了这些初始化操作的具体实现方式。

Image

这张 Slides 展示了错误处理宏定义

#define CUDACHECK(cmd) {                      
    cudaError_t err = cmd;                    
    if (err != cudaSuccess) {                
        printf("Failed: Cuda error %s:%d\n",  
            **FILE**,**LINE**,cudaGetErrorString(err));  
        exit(EXIT_FAILURE);                 
    }  
}

#define NCCLCHECK(cmd) {                      
    ncclResult_t res = cmd;                 
    if (res != ncclSuccess) {               
        printf("Failed: NCCL error %s:%d\n",  
            **FILE**,**LINE**,ncclGetErrorString(res));  
        exit(EXIT_FAILURE);                 
    }  
}  

这部分定义了两个错误处理宏:

  • CUDACHECK: 用于检查 CUDA API 调用的错误
  • NCCLCHECK: 用于检查 NCCL 操作的错误

Image

int main(int argc, char * argv[]) {  
    ncclComm_t comms[4];

  
    //管理 4 个设备  
    int nDev = 4;  
    int size = 32*1024*1024;  
    int devs[4] = { 0, 1, 2, 3 };

  
    //分配和初始化设备缓冲区  
    float** sendbuff = (float**)malloc(nDev * sizeof(float*));  
    float** recvbuff = (float**)malloc(nDev * sizeof(float*));  
    cudaStream_t* s = (cudaStream_t*)malloc(sizeof(cudaStream_t)*nDev);  

这里的代码创建了 NCCL 通信器数组,设置 4 个 GPU 设备,定义数据大小(32MB),分配发送和接收缓冲区的内存并为每个设备创建 CUDA 流。然后还有下面的循环

for (int i = 0; i < nDev; ++i) {  
    CUDACHECK(cudaSetDevice(i));  
    CUDACHECK(cudaMalloc(sendbuff + i, size * sizeof(float)));  
    CUDACHECK(cudaMalloc(recvbuff + i, size * sizeof(float)));  
    CUDACHECK(cudaMemset(sendbuff[i], 1, size * sizeof(float)));  
    CUDACHECK(cudaMemset(recvbuff[i], 0, size * sizeof(float)));  
    CUDACHECK(cudaStreamCreate(s+i));  
}  

这个循环给每个 GPU 设置当前设备,然后分配发送和接收缓冲区的 GPU 内存,初始化发送缓冲区为 1,接收缓冲区为 0,最后为每个设备创建 CUDA 流。

Image

//初始化 NCCL  
NCCLCHECK(ncclCommInitAll(comms, nDev, devs));

//调用 NCCL 通信 API  
NCCLCHECK(ncclGroupStart());  
for (int i = 0; i < nDev; ++i)  
    NCCLCHECK(ncclAllReduce((const void*)sendbuff[i], (void*)recvbuff[i], size, ncclFloat, ncclSum,  
        comms[i], s[i]));  
NCCLCHECK(ncclGroupEnd());

//同步 CUDA 流等待 NCCL 操作完成  
for (int i = 0; i < nDev; ++i) {  
    CUDACHECK(cudaSetDevice(i));  
    CUDACHECK(cudaStreamSynchronize(s[i]));  
}  

这部分代码展示了初始化 NCCL 通信器,执行 AllReduce 操作(将所有设备的数据求和并分发给所有设备),最后同步所有 CUDA 流确保操作完成。

Image

//释放设备缓冲区  
for (int i = 0; i < nDev; ++i) {  
    CUDACHECK(cudaSetDevice(i));  
    CUDACHECK(cudaFree(sendbuff[i]));  
    CUDACHECK(cudaFree(recvbuff[i]));  
}

//终止 NCCL  
for(int i = 0; i < nDev; ++i)  
    ncclCommDestroy(comms[i]);  

最后进行资源清理包括释放 GPU 上分配的内存,销毁 NCCL 通信器。

上面 4 张 slides 放在一起展示了一个如何在单个进程中使用 NCCL 进行 AllReduce 操作。

Image

这张 Slides 展示了"每个 CPU 进程一个 GPU"的场景下的实现。代码有以下步骤:

  • 获取 NCCL 唯一 ID 并在所有进程间广播
  • 基于本地 rank 选择 GPU 并分配设备缓冲区
  • 初始化 NCCL 通信器
  • 使用 NCCL 执行 AllReduce 集合通信操作(从代码可以看到是每个 rank 都发起了这个操作)
  • 同步 CUDA 流来完成 NCCL 操作

实际上这个例子对应的就是 PyTorch Distributed Data Parallel 里面的 AllReduce 操作,而上面的 Single Process 的例子对应的就是 PyTorch Data Parallel 里面的 AllReduce 操作。

Image

这里展示了一下环状的 AllReduce 算法的原理,它由两个操作组成:

  • ReduceScatter 操作: 输入数据分布在不同的 rank (进程/节点) 上 (rank 0 到 rank 3);每个 rank 负责对一部分数据进行规约(reduction)操作;规约结果被分散到不同的 rank 上;图中显示 out[i] = sum(in[j]^count+i))
  • AllGather 操作: 在 ReduceScatter 之后执行;每个 rank 将自己的部分结果广播给其他所有 rank;最终每个 rank 都获得完整的规约结果;图中显示 out[Ycount+i] = in[Y][i]

Image

这张 Slides 截了一下 Ring Allreduce 的 cuda 代码实现,可以粗略的浏览一下代码:

// Ring AllReduce 算法实现  (结合了 ReduceScatter 和 AllGather 操作)  
template<typename T, typename RedOp, typename Proto>  
**device** **forceinline** void run(ncclWorkElem *args) {  
    const int tid = threadIdx.x;      //  获取当前线程 ID  
    const int nthreads = args->nWarps*WARP_SIZE;  //  计算总线程数  
    const int bid = args->bid;        //  获取块 ID  
    const int nChannels = args->nChannels;  //  获取通道数  
    ncclRing *ring = &ncclShmem.channel.ring;  //  获取环形通信结构的指针  
    int ringIx = ring->index;         //  获取环形索引

  
    //  计算每步处理的数据块大小  
    const size_t chunkSize = int(Proto::calcBytePerStep()/sizeof(T)) * (Proto::Id == NCCL_PROTO_SIMPLE ? ALLREDUCE_CHUNKSTEPS : 1));  
    const int nranks = ncclShmem.comm.nRanks;  //  获取总进程数  
    const size_t loopSize = nChannels*nranks*chunkSize;  //  计算循环大小  
    const size_t size = args->count;  //  获取需要处理的总数据量

int minChunkSize;  //  最小数据块大小  
    if (Proto::Id == NCCL_PROTO_LL) {  
        // LL 协议下计算最小数据块大小  
        minChunkSize = nthreads*(Proto::calcBytePerGrain()/sizeof(T));  
    }  
    if (Proto::Id == NCCL_PROTO_LL128) {  
        // LL128 协议下的特殊处理  
        //  注释说明这里的除 2 可能是个 bug,但能提高性能  
        minChunkSize = nthreads*(Proto::calcBytePerGrain()/sizeof(T))/2;  
    }

//  使用 Primitives 模板类处理规约操作  
    Primitives<T, RedOp, FanSymmetric<1>, Proto, 0> prims  
        (tid, nthreads, &ring->prev, &ring->next, args->sendbuff, args->recvbuff, args->redOpArg);  
}  

Image

// Ring AllReduce 实现  (ReduceScatter + AllGather)  
for (size_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {  
    size_t realChunkSize;

  
    //  处理 NCCL 协议简单模式  
    if (Proto::id == NCCL_PROTO_SIMPLE) {  
        //  计算实际的 chunk 大小,考虑网格偏移和通道数  
        realChunkSize = min(chunkSize, divide(size-gridOffset, nChannels*nranks));  
        //  根据线程数和数据类型大小调整 chunk 大小  
        realChunkSize = roundUp(realChunkSize, (nthreads*WARP_SIZE)*sizeof(uint64_t)/sizeof(T));  
    } else {  
        //  非简单模式下的 chunk 大小计算  
        realChunkSize = min(chunkSize, divide(size-gridOffset, nChannels*nranks*minChunkSize));  
        realChunkSize = int(realChunkSize);  
    }

//  计算每个 chunk 的偏移量  
    auto calcOffset = [&]**device**(int chunk)->size_t {  
        if (Proto::id == NCCL_PROTO_SIMPLE)  
            return gridOffset + bid*nranks*realChunkSize + chunk*realChunkSize;  
        else  
            return gridOffset + (chunk*nChannels + bid)*realChunkSize;  
    };

//  计算每个 rank 的修改位置  
    auto modRanks = [&]**device**(int r)->int {  
        return r >= nranks ? r-nranks : r;  
    };

//  声明变量  
    size_t offset;  
    int nelem;  
    int chunk;

// step 0:  将数据推送到下一个 GPU  
    chunk = modRanks(ringIx + nranks-1);  //  计算 chunk 索引  
    offset = calcOffset(chunk);           //  计算偏移量  
    nelem = min(realChunkSize, size-offset); //  计算元素数量  
    prims.send(offset, nelem);           //  发送数据  
}

Image

Image

Image

这几张 Slides 展示了 Ring AllReduce(环形全规约)算法的工作原理,它是通过组合 ReduceScatter 和 AllGather 两个操作来实现的。第一张 Slides 的图展示了初始状态:

  • 有 3 个 GPU (GPU 0, 1, 2)
  • 每个 GPU 上有 3 个数据块(A/B/C, D/E/F, G/H/I)

第二张 Slides 的图展示了数据传输的模式:

  • 数据以环形方式在 GPU 之间传递
  • GPU 0 向 GPU 1 传输
  • GPU 1 向 GPU 2 传输
  • GPU 2 回传到 GPU 0,形成一个环
// k-2 步:  执行规约操作并将结果复制到下一个 GPU  
for (int j=2; j<nranks; ++j) {  
    //  计算当前需要处理的数据块索引  
    // ringIx 是当前 GPU 的索引,通过模运算确保索引在有效范围内  
    chunk = modRanks(ringIx + nranks-j);

  
    //  根据 chunk 计算在缓冲区中的偏移量  
    offset = calcOffset(chunk);

  
    //  计算本次需要传输的实际元素数量  
    //  取实际块大小和剩余大小中的较小值,避免越界  
    nelem = min(realChunkSize, size-offset);

  
    //  执行接收-规约-发送操作  
    //  从上一个 GPU 接收数据,与本地数据进行规约,然后发送给下一个 GPU  
    prims.recvReduceSend(offset, nelem);  
}  

Image

Image

Image

这里展示了 Ring AllReduce 第 k-1 步做的事:

// step k-1:  在当前 GPU 上规约缓冲区和数据  
//  规约结果将存储在当前数据中并传送到下一个 GPU

//  计算当前要处理的数据块索引  
// ringIx  是环形通信中的索引位置  
chunk = ringIx + 0;

//  根据 chunk 计算在内存中的偏移量  
//  用于确定数据在缓冲区中的具体位置  
offset = calcOffset(chunk);

//  计算本次需要处理的实际元素数量  
// realChunkSize:  标准块大小  
// size-offset:  剩余可处理的元素数量  
//  取两者的最小值以防止越界  
nelem = min(realChunkSize, size-offset);

//  执行接收-规约-复制-发送操作  
// offset:  源数据偏移量  
// offset:  目标数据偏移量  
// nelem:  要处理的元素数量  
// true: postOp 参数,表示是否执行后续操作  
prims.directRecvReduceCopySend(offset, offset, nelem, /_postOp=_/true);  

上面的过程实际上就对应了 ReduceScatter 操作。

Image

Image

Image

Image

Image

Image

这几张图涉及到的就是 AllGather 操作,只有数据复制,没有数据的 Reduce 操作。操作完成之后我们可以看到所有的 rank 上的数据都拥有一样的求和值。

Image

这里提一些有趣的知识

  • 除了 Ring Allreduce 之外还有其它的 AllReduce 算法,如 Tree AllReduce(树形归约)算法。可以参考https://developer.nvidia.com/...
  • 其他集体通信操作(Other Collectives)
  • 网络拓扑相关技术,包括 NVLink、Infiniband/RoCE(提供了 NVIDIA 官方白皮书链接)以及 IP 网络
  • 集体操作原语(Collective Operation Primitives)

Image

最后这张 Slides 介绍了 CUDA 中其它的集体操作原语(Collective Operations Prims),主要说明了 prims.send、prims.recvReduceSend 等函数是如何在 GPU 之间进行集体操作数据传输的。这些原语实现了三种不同的协议:Simple(简单协议)、LL(低延迟协议,8 字节原子存储,4 字节数据和 4 字节标志)以及 LL128(低延迟 128 位协议,128 字节原子存储,120 字节数据和 8 字节标志)。另外,AllReduce 操作通过组合 3 种算法和 3 种协议,总共可以有 9 种不同的运行方式,这些原语为 GPU 集群中的并行计算和数据通信提供了灵活的性能选择。

总结

这节课介绍了 NVIDIA 的 NCCL(NVIDIA Collective Communications Library)通信库,重点讲解了其在分布式深度学习中的应用。首先通过 PyTorch DDP 的实例,展示了 NCCL 如何实现高效的梯度同步。接着介绍了下 NCCL 的基本概念、API 使用、通信器初始化方式,并深入分析了 Ring AllReduce 算法的工作原理。

END

作者:BBuf
来源:GiantPandaCV

推荐阅读

欢迎大家点赞留言,更多 Arm 技术文章动态请关注极术社区嵌入式 AI 专栏欢迎添加极术小姐姐微信(id:aijishu20)加入技术交流群,请备注研究方向。

推荐阅读
关注数
18932
内容数
1437
嵌入式端AI,包括AI算法在推理框架Tengine,MNN,NCNN,PaddlePaddle及相关芯片上的实现。欢迎加入微信交流群,微信号:aijishu20(备注:嵌入式)
目录
极术微信服务号
关注极术微信号
实时接收点赞提醒和评论通知
安谋科技学堂公众号
关注安谋科技学堂
实时获取安谋科技及 Arm 教学资源
安谋科技招聘公众号
关注安谋科技招聘
实时获取安谋科技中国职位信息