Ringbuffer(循环缓存)是软件中非常常用的数据结构之一, 在互联网应用、数据库应用等中使用广泛。处理器执行Ringbuffer的效率与其存储系统处理共享数据的性能息息相关。
本文旨在介绍Ringbuffer的性能优化方法。本文介绍基于mutex的Ringbuffer实现、阻塞的无锁实现以及非阻塞的无锁实现,并且比较和分析这些不同的实现在ARM服务器平台上的性能。
声明
本文介绍的针对Ringbuffer实现的优化方法来自于Ola Liljedahl的研究。Ola现任ARM公司的架构师,长期专注于网络软件的性能优化设计,尤其是可扩展的多线程共享存储编程。针对ARM A系列处理的多线程性能,他开发了一个可扩展的多线程并行库progress64。
本文分析了Ola提出的主要优化手段,并且在ARM服务器上重现了Ola的优化以验证其实际效果。博客中使用的代码只是示例代码(https://github.com/wangeddie67/ringbuffer_opt_demo),其功能完备性和鲁棒性都不能满足实际应用的需求。
介绍
Ringbuffer通常由一个包含数据的数组和指向队列头(Head)和队尾(Tail)的指针构成(如图1左边所示)。指针只能按照一个方向移动。当指针移动到数组结尾时,指针会折回到队列开始。
Head指针表示下一个可以弹出的数据;当数据弹出队列(称为Dequeue)时,Head指针加1。Tail指针表示下一个可以压入数据的单元;当数据压入队列(称为Enqueue)时,Tail指针加1;
从逻辑上看,Ringbuffer中的数组形成了一个环(如图1右边所示)。头和尾指针在这个环上沿着相同的方向移动。这就是Ringbuffer名称的由来。
图1 Ringbuffer示意图
向Ringbuffer压入单元的程序称为生产者(Producer),从Ringbuffer弹出单元的程序称为消费者(Consumer),如图2所示。一个Ringbuffer通常具有很多的生产者和消费者,每个生产者和消费者都是不同的线程或进程。这些线程或进程会被调度到系统中的不同处理器核心上,而Ringbuffer则是不同处理器核心之间共享的数据。
图2 生产者和消费者
基于Mutex的Ringbuffer实现
为了保证共享数据的功能正确,通常使用mutex来进行保护Ringbuffer,以保证只有一个生产者或消费者能够访问或操作Ringbuffer。
基于Mutex的Ringbuffer声明
Ringbuffer的数据结构如代码块1所示。
// Entry in ring buffer
class BufferEntry
{
public:
void *mp_ptr; // Pointer to data entry.
};
class RingBuffer
{
public:
int m_mutex; // Mutex for shared pointers.
unsigned int m_size; // The number of physical entries.
unsigned int m_head; // Head pointer.
unsigned int m_tail; // Tail pointer.
BufferEntry *mp_entries; // Pointer to physical entries.
};
代码1 基于Mutex的Ringbuffer声明
数据数组中每个单元BufferEntry
只提供一个指针,指针指向实际需要进入Ringbuffer的数据结构。当Enqueue和Dequeue时,只需要复制数据结构的指针即可,而不需要复制数据,从而减少数据拷贝的开销。
基于Mutex的Ringbuffer的Enqueue和Dequeue函数
图3展示了Enqueue和Dequeue的流程图。图3左边展示的是阻塞操作的流程图;右边展示的是非阻塞操作的流程图。如果实现的是阻塞式的操作,那么函数一直重复检查空满条件,直到Ringbuffer的满足操作所需的空满条件;如果实现的是非阻塞式的操作,那么函数可以在检查空满条件失败后直接退出。
图3 基于Mutex的Ringbuffer实现流程图
循环队列的空满控制有很多种方法。在本系列文章的示例中采用逻辑单元和物理单元的控制方法。head和tail的范围并不只限于物理单元的数量,而是可以一直累加。访问单元时,将(head MOD size)或(tail MOD size)得到对应的物理单元。
- 对于Enqueue操作,需要Ringbuffer不为满,即
m_head+m_size!=m_tail
(head和tail之差不大于物理单元的数量)。 - 对于Dequeue操作,需要Ringbuufer不为空,即
m_head!=m_tail
(head和tail没有指向同一个单元)。
基于Mutex的Ringbuffer实现如下:
// Blocking Enqueue Function
int enqueue_ringbuf(RingBuffer *ring_buffer, void *entry)
{
// Lock mutex.
lock(&ring_buffer->m_mutex);
// Ringbuffer is full. Do nothing.
while (ring_buffer->m_head + ring_buffer->m_size == ring_buffer->m_tail);
// Enqueue entry.
int enq_ptr = ring_buffer->m_tail % ring_buffer->m_size;
ring_buffer->mp_entries[enq_ptr].mp_ptr = entry;
// Increase tail pointer.
ring_buffer->m_tail = ring_buffer->m_tail + 1;
// Unlock mutex.
unlock(&ring_buffer->m_mutex);
return 0;
}
// Blocking Dequeue Function
int dequeue_ringbuf(RingBuffer *ring_buffer, void **entry)
{
// Lock mutex.
lock(&ring_buffer->m_mutex);
// Ringbuffer is empty. Do nothing.
while (ring_buffer->m_head == ring_buffer->m_tail);
// Dequeue entry.
int deq_ptr = ring_buffer->m_head % ring_buffer->m_size;
*entry = ring_buffer->mp_entries[deq_ptr].mp_ptr;
ring_buffer->mp_entries[deq_ptr].mp_ptr = NULL;
// Increase head pointer.
ring_buffer->m_head = ring_buffer->m_head + 1;
// Unlock mutex.
unlock(&ring_buffer->m_mutex);
return 0;
}
代码2 基于Mutex的Ringbuffer实现
Lock和Unlock
Ringbuffer最多允许一个生产者或一个消费者对于Ringbuffer进行操作。这需要通过锁标志m_mutex
进行控制。考虑到pthread中的mutex API引入的系统调度开销过大,这里采用__sync_val_compare_and_swap
原语实现lock和unlock操作。
type __sync_val_compare_and_swap(type* ptr, type oldval, type newval);
__sync_val_compare_and_swap
原语提供了同步和原子CAS操作两个功能,其功能核心是CASAL指令。原语将ptr
指向的变量与oldval
比较,如果值相同则将ptr
指向的变量修改为newval
,并返回ptr
指向变量的旧值;反之,则不修改ptr
指向的变量,并返回ptr
指向变量的值。同时,原语保证了load/store的同步,即在指令序上先于原语的load/store已经完成,指令序上晚于原语的load/store不会执行。
用__sync_val_compare_and_swap
原语实现的lock和unlock功能如下:
// Lock function
void lock(int *mutex)
{
while (__sync_val_compare_and_swap(mutex, 0, 1) != 0)
{
}
}
// Unlock function
void unlock(int *mutex)
{
__sync_val_compare_and_swap(mutex, 1, 0);
}
代码3 lock和unlock函数的实现
mutex
变量为0表示mutex
释放;变量为1表示mutex
锁定。在lock函数中,如果当前mutex
为0(释放),则更新为1(锁定),并且lock函数返回0;反之则lock函数返回1。
基于mutex的ringbuffer实现的完整源码请参见。
测试框架
测试程序包含一个控制测试过程的主线程和多个充当生产者和消费者的子线程。主线程首先创建并初始化Ringbuffer,并且创建指定数量的子线程,最后等待子线程结束。子线程数量可以配置。
每个子线程即使生产者也是消费者。每次测试迭代从队列中读取一个单元,再将这个单元(不做额外操作)插入队尾,即一次Dequeue和一次Enqueue操作。整个测试过程需要执行的Enqueue和Dequeue操作数量相同。如果将一部分线程设置为生产者,另一部分线程设置为消费者,那么数据单元一定由生产者向消费者移动。这可能在互联结构中出现具有方向性的数据流,从而加强测试结果与线程调度的核心的物理位置的关系。
为了保证子线程同步启动测试,子线程首先会陷入barrier。当所有子程序都创建完成后,子线程从barrier退出。每个线程执行的测试迭代数量都相同。
图4 测试程序流程图
每个子线程独立统计测试阶段完成的时间,并且分别计算操作的吞吐率。这是为了避免统计过程引入新的共享数据。最后,主线程获取每个子线程统计的操作吞吐率,并且求和得到总吞吐率。吞吐率用每秒执行的Enqueue或Dequeue操作数量表示,即op/s。
由于Ringbuffer的性能与存储系统的性能有直接关联,因此子线程调度到的核心对于具体实现具有很大的关系。为了保证测试的稳定性,需要将子线程调度到指定的核心(绑核)。如果不做绑核,那么相同实现的相同配置的不同测试结果可能会大相径庭,测试结果偏差极大。
测试框架的源码请参见。
基于Mutex的Ringbuffer实现的测试结果
测试采用的ARM服务器具有32个N1处理器,使用CMN600作为互联结果。测试用的ARM服务器上并没有区分NUMA结点,因此采用自然映射的关系,即子线程0调度到CPU0;子线程1调度到CPU1,以此类推。
图5展示了基于Mutex的Ringbuffer的性能测试结果。显然,随着子线程数量的增加,Ringbuffer的吞吐率显著降低。当使用2个子线程时,吞吐率为3985230 op/s;当使用全部32个子线程时,吞吐率为323111 op/s。前者是后者的12.3倍。
图5 基于Mutex的Ringbuffer性能测试
图5展示的性能测试结果是典型的以多线程竞争为主导的性能曲线。随着参与竞争的子线程数增加,在竞争mutex上花费的时间就更多,导致性能快速下降。
使用perf工具对于启动16个线程的测试程序进行热点分析,热点分析结果如图6所示。图中橙色部分提示了lock
函数占用了程序执行时间的91%。因此,对于Ringbuffer进行优化的首要步骤就是采用无锁的实现方式。
图6 Perf热点分析结果
无锁Ringbuffer实现
对于多线程共享程序,需要使用mutex来保护对于共享变量的load-modify-store代码序列,从而保证功能正确性。但是mutex的竞争引入了非常明显的性能开销。因此,需要探索无锁的Ringbuffer实现。
用原子操作替代锁
除了mutex,另一种可以保护共享变量的方法是原子操作。单个原子操作就是由load-modify-store三个部分构成。原子操作保证,在完成load-modify-store序列之前,被访问的地址不会被其他指令访问。典型的原子操作包括Compare-and-swap (CAS),以及ARM提供的LDADD等原子操作。如果有多个PE针对同一个共享变量发起多个原子操作,那么这些操作只能按照某种顺序串行,而不能并行。执行顺序取决于硬件实现。
CAS保证了对于mutex的操作都是串行的。例如PE1执行原子操作CAS(x, 0, 1),同时PE2执行原子操作CAS(x, 0, 2):
- 如果PE1先读取到x的值,那么PE1的CAS操作成功;PE2读取到的x的值是PE1更新后的值=1,CAS操作失败。最后PE1中返回值是0;PE2中返回值是1;x的值是1。
- 如果PE2先读取到x的值,那么PE2的CAS操作成功;PE1读取到的x的值是PE2更新后的值=2,CAS操作失败。最后PE1的返回值是2;PE2的返回值是0;x的值是2。
在Ringbuffer实现中,特别需要保护的就是被所有生产者和消费者共享的head和tail指针。可以用__atomic_fetch_add
原语来实现指针的原子指令并且实现指针的累加。
__atomic_fetch_add
原语的声明如下,其核心指令是LDADD。原语读取ptr
指向的变量,然后将变量累加val
后的新值会写到原来的位置;同时,原语将ptr
指令的变量的旧值返回。
type __atomic_fetch_add (type *ptr, type val, int memorder)
将__atomic_fetch_add
原语应用到head或tail指针上,则可以将存储在内存中的指针变量加1,并且同时将加1前的旧值返回。这就相当于将目前head/tail指针指向的单元分配给了当前的Enqueue或Dequeue的生产者或消费者,同时与其他生产者或消费者同步了更新后的指针。
Enqueue和Dequeue
图7展示了Enqueue和Dequeue的流程图。
图7 无锁Ringbuffer实现流程图(使用原子操作替换mutex)
相对于基于mutex的Ringbuffer实现,无锁Ringbuffer实现从软件和硬件两方面的优化。
从软件的角度,无锁Ringbuffer的关键区要小很多。在基于mutex的Ringbuffer实现(图8左侧)中,锁保护了整个Enqueue和Dequeue过程,每个Enqueue和Dequeue过程都只能串行执行。在无锁Ringbuffer实现(图8右侧)中,原子指令只保护了对于head或tail指针的累加。除了__atomic_fetch_add
必须串行,其他部分都是可以并行的,从而增加了程序的并行度。
图8 基于mutex的Ringbuffer实现和无锁Ringbuffer实现的关键区(着色部分)对比
从硬件的角度,原子指令引入更少的snoop操作。使用常规的load指令读取数据后,在PE侧的缓存中的数据副本的状态为Share(共享)。所有访问这个数据的PE的缓存都会具有这个共享数据的副本。当共享数据被某个PE修改后,需要大量的snoop去失效其他PE中的所有副本。
使用原子指令读取数据之后,数据的缓存状态为Exclusive(独有),即只有一个PE具有这个共享数据的副本。当另一个PE也执行原子操作后,只需要一个snoop就可以将已经存在副本清除。
从带宽的角度,更少snoop数量意味着snoop通信占用的通信带宽更少,提供给数据通信的带宽就会更高。从延迟的角度,load/store操作需要等到其引起的所有snoop都完成才能被标识为完成。更少的snoop数量意味着等待snoop完成的时间更短。
无锁Ringbuffer同时从这两方面优化中受惠,暂时没有办法定量分析出哪一个优化的作用更大。一般来说,缩小关键区的作用更大。
逻辑单元序号
上述的无锁Ringbuffer仍然可能导致功能错误。例如一个Ringbuffer有16个物理单元。首先为生产者A分配的逻辑单元是16;随后,为生产者B分配的单元是32;这两个单元映射到相同的物理单元0。虽然单元分配和指针更新是串行的,但是对于数据单元的访问是并行的,因此并不能保证生产者B访问物理单元0时,生产者A一定完成了操作。可能生产者A和生产者B同时查询到单元是空闲,进而同时对这个物理单元进行写入。此时无法保证写入单元的究竟是生产者A的数据还是生产者B的数据。
为了解决这样的问题,Ola提出,给每个单元增加一个域来表示其对应的逻辑单元序号。只有当逻辑单元序号与head/tail指针一致的时候才能进行访问。当Dequeue单元的时候,需要更新逻辑单元序号,将单元的序列号增加物理单元的总数。
还是上面的例子,当生产者A和B同时查询到内存是空闲时,由于单元的序列号与生产者A分配的单元匹配,那么生产者A可以操作这个单元;生产者B则因为序列号不匹配而不能访问,需要继续等待。
增加逻辑单元序号后的Enqueue和Dequeue流程图如图9。
图9 无锁Ringbuffer实现流程图(使用原子操作替换mutex)
无锁Ringbuffer声明
无锁Ringbuffer的声明如下。Ringbuffer不需要mutex进行保护。
// Entry in ring buffer
class BufferEntry
{
public:
unsigned int m_sn; // Sequential number.
void *mp_ptr; // Pointer to data entry.
};
class RingBuffer
{
public:
unsigned int m_size; // The number of physical entries.
unsigned int m_head; // Head pointer.
unsigned int m_tail; // Tail pointer.
BufferEntry *mp_entries; // Pointer to physical entries.
}
代码3 无锁Ringbuffer声明
无锁Ringbuffer的Enqueue和Dequeue函数
Enqueue和Dequeue函数的实现如下。
// Blocking Enqueue Function
int enqueue_ringbuf(RingBuffer *ring_buffer, void *entry)
{
// Update the tail pointer.
int sn = __atomic_fetch_add(&ring_buffer->m_tail, 1, __ATOMIC_RELAXED);
int enq_ptr = sn % ring_buffer->m_size;
// Check the data entry until empty.
unsigned int entry_sn;
void *entry_ptr;
do
{
// Replace load by atomic load.
// entry_sn = __atomic_load_n(&ring_buffer->mp_entries[enq_ptr].m_sn, __ATOMIC_RELAXED);
// entry_ptr = __atomic_load_n(&ring_buffer->mp_entries[enq_ptr].mp_ptr, __ATOMIC_RELAXED);
entry_sn = ring_buffer->mp_entries[enq_ptr].m_sn;
entry_ptr = ring_buffer->mp_entries[enq_ptr].mp_ptr;
} while (entry_sn != sn || entry_ptr != NULL);
// Write entry.
// Replace store by atomic store.
// __atomic_store_n(&ring_buffer->mp_entries[enq_ptr].mp_ptr, entry, __ATOMIC_RELEASE);
ring_buffer->mp_entries[enq_ptr].mp_ptr = entry;
return 0;
}
// Blocking Dequeue Function
int dequeue_ringbuf(RingBuffer *ring_buffer, void **entry)
{
// Update the head pointer.
int sn = __atomic_fetch_add(&ring_buffer->m_head, 1, __ATOMIC_RELAXED);
int deq_ptr = sn % ring_buffer->m_size;
// Check the data entry until not empty.
unsigned int entry_sn;
void *entry_ptr;
do
{
// Replace load by atomic load.
// entry_sn = __atomic_load_n(&ring_buffer->mp_entries[deq_ptr].m_sn, __ATOMIC_RELAXED);
// entry_ptr = __atomic_load_n(&ring_buffer->mp_entries[deq_ptr].mp_ptr, __ATOMIC_ACQUIRE);
entry_sn = ring_buffer->mp_entries[deq_ptr].m_sn;
entry_ptr = ring_buffer->mp_entries[deq_ptr].mp_ptr;
} while (entry_ptr == NULL || entry_sn != sn);
// Read entry.
*entry = entry_ptr;
// Replace store by atomic store
// __atomic_store_n(&ring_buffer->mp_entries[deq_ptr].mp_ptr, 0, __ATOMIC_RELAXED);
ring_buffer->mp_entries[deq_ptr].mp_ptr = 0;
// Update sequence number.
// Replace store by atomic store
// __atomic_store_n(&ring_buffer->mp_entries[deq_ptr].m_sn, sn + ring_buffer->m_size, __ATOMIC_RELEASE);
ring_buffer->mp_entries[deq_ptr].m_sn = sn + ring_buffer->m_size;
return 0;
}
代码4 无锁Ringbuffer的Enqueue和Dequeue函数
Enqueue和Dequeue函数中对于数据单元的load/store操作可以进一步被原子操作__atomic_load_n
和__atomic_store_n
取代。代码4中的注释给出了可以替换的原子操作。
无锁ringbuffer实现的完整源码请参见。
使用原子操作数据单元的无锁Ringbuffer实现的完整源码请参见。
无锁Ringbuffer性能测试结果
无锁Ringbuffer的性能如图10所示。曲线mutex表示基于mutex的Ringbuffer实现;lockfree和atomic表示无锁Ringbuffer实现(lockfree使用普通load/store操作数据单元;atomic使用原子load/store操作数据单元)。
图10 无锁Ringbuffer性能测试
从图10中可以发现,相对于基于mutex的Ringbuffer实现,无锁Ringbuffer实现的性能得到了很大的提升。用atomic曲线和mutex曲线进行比较。当只有2个线程时,无锁Ringbuffer实现的吞吐率是基于mutex的Ringbuffer实现的吞吐率的1.86倍;当使用32个线程时,无锁Ringbuffer实现的吞吐率是基于mutex的Ringbuffer实现的吞吐率的18倍。
从图10中可以发现,对于数据单元的访问是否使用原子操作对于性能的影响并不明显。两条曲线基本上是重合的。这是因为测试的ringbuffer提供了足够的单元,多个线程共享Ringbuffer单元的竞争并不强烈。
无锁Ringbuffer实现的吞吐率还是会随着子线程数增加而降低,但是并不如基于mutex的Ringbuffer强烈。当使用2个子线程时,吞吐率为7394830 op/s;当使用全部32个子线程时,吞吐率为5800350 op/s。前者是后者的1.27倍。
利用Perf分析程序热点只能定位到特点在enqueue和dequeue函数,无法提供进一步的信息。因此使用Arm的top-down工具对于微架构执行情况进行详细分析,得到的结果如图11。
图11 利用Top-down对无锁Ringbuffer进行性能分析
图11中用红圈标注了主要注意的指标。首先,程序明确属于后端瓶颈的程序,后端阻塞周期高达97%。其次,L1、L2和LC(Last-level cache)明显高于其他异常情况,这表示程序中存在大量的数据竞争。最后L1、L2和LC的数据缺失率属于同一量级,这表示数据竞争是全局性的,需要通过系统的最后一级缓存才能得到解决。top-down的分析结果提示,无锁的ringbuffer实现仍然导致了大量的全局性的数据冲突。
无锁Ringbuffer的对齐改进
通过对于程序的分析,符合上述特征的共享数据只有head和tail指针。因为head和tail指针处于相同的缓存行。这就导致生产者和消费者仍然在竞争同一个缓存行。可以通过对与Ringbuffer的数据结构进行缓存行对齐,减少不同生产者和消费者竞争同一个缓存行的情况。
Ringbuffer实现的缓存行对齐包含两个部分:
head/tail指针的对齐。将head指针和tail指针分别放置到不同的缓存行。这可以通过C语言原语alignas实现。
class RingBuffer
{
public:
alignas(64) unsigned int m_size;
alignas(64) unsigned int m_head;
alignas(64) unsigned int m_tail;
alignas(64) BufferEntry *mp_entries;
}
代码5 对齐的无锁Ringbuffer的声明
这样只有生产者竞争tail指针所在的缓存行;消费者竞争head指针所在的缓存行。
数据单元的对齐。这是为了避免访问不同单元的线程因为单元处于相同缓存行而产生竞争。比如访问物理单元0-3的线程实际上在竞争同一个缓存行。
这一部分可以通过空间换效率的方式实现。一个缓存行(64B)中可以放置4个单元(每个单元16B,8B指针和8B序列号)。在初始化时分配4倍的单元,在Enqueue和Dequeue每次都只使用4对齐的序号。逻辑单元0对应到物理单元0;逻辑单元1对应到物理单元4;依次类推。
物理单元序号计算方法如下:
int enq_ptr = (sn % ring_buffer->m_size) * 4;
代码6 对齐的无锁Ringbuffer映射逻辑单元到物理单元
对齐后的无锁ringbuffer的内存分布如下。其中红色部分是实际访问的数据单元。绿色部分是没有使用到的数据单元。
图12 对齐后的无锁Ringbuffer的内存分布
需要说明的是,当物理数据单元数量远大于生产者和消费者数量时,数据单元的对齐对于性能的影响非常微小。因为共享一个缓存行的线程数量很小,最多只有4个。
缓存行对齐的无锁ringbuffer实现的完整源码请参见。
对齐的无锁Ringbuffer性能测试结果
缓存行对齐的无锁Ringbuffer的性能如图13所示。曲线mutex表示基于mutex的Ringbuffer实现;lockfree和atomic表示无锁Ringbuffer实现(lockfree使用普通load/store操作数据单元;atomic使用原子load/store操作数据单元);align表示缓存行对齐后的无锁Ringbuffer实现。
图13 对齐的无锁Ringbuffer性能测试
缓存行对齐的无锁Ringbuffer实现的性能曲线随着子线程数量的增加呈现增长的趋势。具体来说,
- 首先,当子线程数小于16时,吞吐率随着子线程数增加而趋向于一个稳定的值。这个稳定值受限于是互联网络能够提供的最大带宽。
- 其次,当子线程数大于16时,吞吐率随着子线程数增加而增加。这与互联结构的拓扑有关系。当子线程数大于32时,新的子线程被分配到互联结构的另一半部分,从而使得能够利用的带宽增加。
用align曲线和mutex曲线进行比较。当只有2个线程时,无锁Ringbuffer实现的吞吐率是基于mutex的Ringbuffer实现的吞吐率的3.31倍;当使用16个线程时,无锁Ringbuffer实现的吞吐率是基于mutex的Ringbuffer实现的吞吐率的33.8倍。当使用32个线程时,无锁Ringbuffer实现的吞吐率是基于mutex的Ringbuffer实现的吞吐率的110倍。
缓存行对齐的无锁Ringbuffer实现的性能曲线与基于mutex的Ringbuffer实现和无锁Ringbuffer的实现有本质上的不同,呈现了一种互联带宽测试的规律。这表示缓存行对齐的无锁Ringbuffer实现能够充分利用互联网络提供的通信性能,是一个非常有利于系统规模扩展的Ringbuffer实现。
非阻塞Ringbuffer实现
前文介绍的无锁Ringbuffer实现存在一个重大的功能缺陷,就是上述实现只能构建阻塞式的Ringbuffer。当生产者或消费者进入Enqueue或Dequeue函数后,生产者或消费者的程序会一直尝试操作,直到操作成功才能推出Enqueue或Dequeue函数。
在实际软件更加倾向于实现非阻塞的Ringbuffer。因为当Enqueue或Dequeue操作不成功时,软件可以主动进行线程调度或休眠,将处理器资源让渡给可以进行操作的线程或任务。这有利于提高资源利用率。
指针保护
这里分三个步骤来推演非阻塞Ringbuffer的实现。
第一步。为了能够实现非阻塞的Ringbuffer,那么仍然需要在Enqueue和Dequeue函数的开始的时候检查队列的空满情况。如果Ringbuffer不满足进行Enqueue或Dequeue的条件,则退出函数并且返回错误代码。
考虑到这一点,非阻塞Ringbuffer的Enqueue和Dequeue的流程应当如下图:
图14 非阻塞无锁Ringbuffer实现的流程图(第一步)
第二步。非阻塞实现要使用原子指令替代mutex保护指针。但是原子指令可以保护对于一个共享变量的load-modify-store序列,但是不能保证对于多个共享变量的load-modify-store之间的原子性。因此,Enqueue中只能用原子指令保护tail指针;Dequeue中只能用原子指令保护head指针。
考虑到这一点,非阻塞Ringbuffer的Enqueue和Dequeue的流程应当如下图:
图15 非阻塞无锁Ringbuffer实现的流程图(第二步)
以enqueue为例,函数首先读取指针判断空满条件。如果Ringbuffer不满,则执行CAS(tail_addr, tail, tail+1)。
如果CAS成功,则表示从判断空满条件到更新tail这段时间,没有其他线程修改了tail指针,那么可以将tail指向的单元分配给当前线程,并且更新CAS。
如果CAS失败,则表示从判断空间条件到更新tail这段时间,其他线程修改了tail指针。此时,需要返回函数开始的地方重新判断空满条件。
这种方法可以保证tail指针的更新是原子的,每个生产者都是在最新的tail值基础上进行更新。
这里使用的是__atomic_compare_exchange_n
原语。
bool __atomic_compare_exchange_n (type *ptr, type *expected, type desired, bool weak, int success_memorder, int failure_memorder)
__atomic_compare_exchange_n
读取ptr
指向的变量,与expected
指向的变量比较。如果两者数值相同,则更新ptr
指向的变量为desired
,并且返回真。如果数值不同,则将读取的数值赋值给expected
指向的变量,同时返回假。
第三步。从生产者的角度,Enqueue函数需要在检查空满条件后分配一个单元给当前线程,并且让其他生产者知道更新后的head或tail值。第二步中引入的CAS操作保证了这一点。另一方面,消费者需要等到分配的单元被填充后才能知道指针的更新。也就是说,生产者和消费者看到指针更新的时间不同。
因此,非阻塞无锁Ringbuffer引入了两组指针分别针对生产者和消费者,即prod_head/prod_tail,和cons_head/cons_tail,如图15所示。
图16 生产者和消费者指针示意图
对于生产者来说,其prod_tail指针一定是准确的,但是prod_head指针并不是最新的。这会导致生产者认为Ringbuffer中被占用的单元数多于实际被占用的单元数量,从而导致生产者错误地认为队列是满的,而放弃Enqueue操作。类似地,对于消费者来说,其cons_head指针一定是准确的,但是cons_tail指针并不是最新的。那么消费者可能认为空闲单元少于实际的空闲单元,从而导致消费者错误地认为队列是满的,而放弃Dequeue操作。这样的误导判断虽然会导致生产者和消费者对于Ringbuffer的竞争比较保守,结果是Ringbuffer效率降低,但是不会引起功能错误。
考虑到这一点,非阻塞Ringbuffer的Enqueue和Dequeue的流程应当如下图:
图17 非阻塞无锁Ringbuffer实现的流程图(第三步)
生产者和消费者的指针同步
当Enqueue操作完成时,需要将生产者视角的prod_tail指针同步到消费者视角的cons_tail指针,告知消费者这些单元已经完成了Enqueue操作;当Dequeue操作完成时,需要将消费者的cons_head指针同步到消费者的prod_head指针,告知生产者这些单元已经完成了Dequeue操作。
但是这并不能够天然保证的。因为除了针对prod_tail和cons_head指针的CAS操作,Enqueue和Dequeue函数的其他部分都是并行,如图18所示。因此,某个线程的分配单元完成Enqueue或Dequeue操作时,完全不能推测这个单元之前的其他单元是否完成了Enqueue或Dequeue的操作。
图18 非阻塞无锁Ringbuffer的执行时序示意图
指针同步方式有很多种实现方法,主要分为顺序释放和乱序释放两种实现方法。
顺序释放。要求同步到另一个视角的单元必须完成了Enqueue或Dequeue操作。这就需要在更新指针前对于单元进行检查,只能将操作完成的单元可以同步给另一个视角。
比如,某个线程的Enqueue函数分配的单元为16,此时cons_tail的值是10,则Enqueue函数不能更新cons_tail。只有cons_tail的值是15时才能更新cons_tail。这样才能保证之前的物理单元都已经完成了Enqueue操作。
为了保证指针操作的原子性,需要使用__atomic_compare_exchange_n
原语更新另一个视角的指针。
乱序释放。在同步不同视角的指针时,不检查单元是否已经完全写入完成,而是直接将指针加1。这就会导致在后续Enqueue或Dequeue种分配的单元不能满足操作的要求。
比如,某个线程的Enqueue函数分配的单元为16,此时cons_tail的值是10。当这个Enqueue函数完成时将cons_tail更新为11,但是此时对于物理单元11的操作状态未知。物理单元11有可能已经完成了Enqueue操作,也有可能物理单元11还是空的。
为了避免功能错误,则需要在操作分配的单元时进行检查。对于Enqueue,如果分配的单元不为空,则一直等待;类似地,对于Dequeue,如果分配的单元为空,则也要一直等待。
为了保证指针操作的原子性,需要使用__atomic_fetch_add
原语更新另一个视角的指针。
至此,我们推导出了,非阻塞Ringbuffer的Enqueue和Dequeue的完整流程,如图19所示。
图19 非阻塞无锁Ringbuffer实现的流程图
Ringbuffer声明
非阻塞的无锁Ringbuffer不需要序列号。每个数据单元中只有一个指向实际数据结构的指针。
// Entry in ring buffer
class BufferEntry
{
public:
void *mp_ptr; // Pointer to data entry.
};
class RingBuffer
{
public:
alignas(64) unsigned int m_size;
// Pointers in the view of producer.
alignas(64) unsigned int m_prod_head; // read pointer
alignas(64) unsigned int m_prod_tail; // write pointer
// Pointers in the view of consumer.
alignas(64) unsigned int m_cons_head; // write pointer
alignas(64) unsigned int m_cons_tail; // read pointer
alignas(64) BufferEntry *mp_entries;
};
代码7 非阻塞无锁Ringbuffer的声明
Enqueue和Dequeue
本文测试的非阻塞Ringbuffer实现采用了乱序释放的方法。
int enqueue_ringbuf(RingBuffer *ring_buffer, void *entry)
{
// Step 1: acquire slots
unsigned int tail = __atomic_load_n(&ring_buffer->m_prod_tail, __ATOMIC_RELAXED);
do
{
unsigned int head = __atomic_load_n(
&ring_buffer->m_prod_head, __ATOMIC_ACQUIRE);
if (ring_buffer->m_size + head == tail)
{
return 1;
}
} while (!__atomic_compare_exchange_n(&ring_buffer->m_prod_tail,
&tail, // Updated on failure
tail + 1,
/*weak=*/true,
__ATOMIC_RELAXED,
__ATOMIC_RELAXED));
// Step 2: Write slots
int enq_ptr = (tail % ring_buffer->m_size) * 8;
void *entry_ptr;
do
{
entry_ptr = __atomic_load_n(&ring_buffer->mp_entries[enq_ptr].mp_ptr, __ATOMIC_RELAXED);
} while (entry_ptr != NULL);
__atomic_store_n(&ring_buffer->mp_entries[enq_ptr].mp_ptr, entry, __ATOMIC_RELEASE);
// Finally make all released slots available for new acquisitions
__atomic_fetch_add(&ring_buffer->m_cons_tail, 1, __ATOMIC_RELEASE);
return 0;
}
int dequeue_ringbuf(RingBuffer *ring_buffer, void **entry)
{
// Step 1: acquire slots
unsigned int tail = __atomic_load_n(&ring_buffer->m_cons_head, __ATOMIC_RELAXED);
do
{
unsigned int head = __atomic_load_n(
&ring_buffer->m_cons_tail, __ATOMIC_ACQUIRE);
if (head == tail)
{
return 1;
}
} while (!__atomic_compare_exchange_n(&ring_buffer->m_cons_head,
&tail, // Updated on failure
tail + 1,
/*weak=*/true,
__ATOMIC_RELAXED,
__ATOMIC_RELAXED));
// Step 2: Write slots (write NIL for dequeue)
int deq_ptr = (tail % ring_buffer->m_size) * 8;
unsigned int entry_sn;
void *entry_ptr;
do
{
entry_ptr = __atomic_load_n(&ring_buffer->mp_entries[deq_ptr].mp_ptr, __ATOMIC_ACQUIRE);
} while (entry_ptr == NULL);
*entry = entry_ptr;
__atomic_store_n(&ring_buffer->mp_entries[deq_ptr].mp_ptr, 0, __ATOMIC_RELAXED);
// Finally make all released slots available for new acquisitions
__atomic_fetch_add(&ring_buffer->m_prod_head, 1, __ATOMIC_RELEASE);
return 0;
}
代码8 非阻塞无锁Ringbuffer的实现
非阻塞的无锁ringbuffer实现的完整源码请参见。
非阻塞Ringbuffer性能测试结果
非阻塞的无锁Ringbuffer的性能如图13所示。曲线mutex表示基于mutex的Ringbuffer实现;lockfree和atomic表示无锁Ringbuffer实现(lockfree使用普通load/store操作数据单元;atomic使用原子load/store操作数据单元);align表示缓存行对齐后的无锁Ringbuffer实现;buck表示的非阻塞无锁Ringbuffer实现。
图20 非阻塞无锁Ringbuffer性能测试
非阻塞的无锁Ringbuffer的性能曲线呈现两段趋势:
- 从2个线程到16个线程左右,吞吐率随着线程数增加而缓慢增加。
- 从16个线程左右到32个线程,吞吐率随着线程数增加而缓慢降低。
与基于mutex的Ringbuffer相比,非阻塞Ringbuffer的性能要远好于基于mutex的Ringbuffer的性能,并且在使用16-20个线程时达到最高。
另一方面,与缓存行对齐的无锁Ringbuffer实现相比,非阻塞Ringbuffer的性能差强人意。这是因为非阻塞的Ringbuffer引入了更多的指针竞争。阻塞的无锁Ringbuffer实现的Enqueue和Dequeue中只关注一个共享指针。但是非阻塞Ringbuffer实现的Enqueue和Dequeue则需要关注三个共享指针。这就导致出现了更多的共享变量的竞争,从而引起了性能下降。
取2个线程、16个线程和32个线程三个位置进行定量比较,得到下表。表中第一行是吞吐率数据(单元ops/s),第二行是相对于基准(基于mutex的Ringbuffer实现)的加速比。
线程数 | mutex | lockfree | atomic | align | buck |
---|---|---|---|---|---|
2 | 3985230 | 8793590 | 7394830 | 13200200 | 7209980 |
1.0x | 2.2x | 1.8x | 3.3x | 1.8x | |
16 | 624949 | 7374800 | 6957440 | 21125400 | 11110100 |
1.0x | 11.8x | 11.1x | 33.8x | 17.7x | |
32 | 323111 | 5787700 | 5800350 | 35671500 | 5418150 |
1.0x | 17.9x | 17.9x | 110.4x | 16.7x |
相对于基于mutex的Ringbuffer,非阻塞的无锁Ringbuffer可以取得超过10倍的吞吐率提升。这表示非阻塞的无锁Ringbuffer实现仍然是非常有效的,值得在实际应用中尝试。
与program64的性能比较
在文系列完整的开篇处已经声明,本系列文章介绍的所有优化来自于Ola的研究。Ola开发的program64库也是示例的Ringbuffer实现的基础。
无锁Ringbuffer实现对应于program64库中的p64_blkring实现。
非阻塞Ringbuffer实现对应于program64库中的p64_buckring实现。
这里将program64库的实现集成到我们的测试程序中,使用相同的测试环境和配置进行了测试。align和p64_blkring是一个对照组;buck和p64_buckring是一个对照组。
图21 在ARM平台上与Program64库对比性能
我们的示例代码的性能与program64的性能在趋势上是一致,并且在数值范围上基本重合。两者之间的差距则是由于实现的细节引入的。比如program64的Enqueue和Dequeue可以操作多个单元,而示例代码只能操作一个单元。这些差距并不影响关于Ringbuffer优化的结论。
结论
本文介绍了多种Ringbuffer实现,并且比较和分析了不同Ringbuffer实现的性能表现:
- 基于mutex的Ringbuffer性能主要受到多线程相互竞争mutex的影响。参与竞争的线程越多,Ringbuffer的吞吐率越低,非常不利于系统规模的扩展。
- 阻塞式的无锁Ringbuffer能够从原子操作和缓存行对齐两个优化中获得非常明显的收益,使得实现能够充分利用系统中互联系统提供的通信容量,是一种非常有利于系统规模扩展的Ringbuffer实现。
- 非阻塞Ringbuffer实现丰富并完善了Ringbuffer的功能,并且同样可以取得很好的性能优化。但是由于引入了更多的共享变量,非阻塞Ringbuffer实现的性能要弱于缓存行对齐的无锁Ringbuffer实现。