优化技术专题「线程间的高性能消息框架」再次细节领略Disruptor的底层原理和优势分析

大道之行,天下为公。这篇文章主要讲述优化技术专题「线程间的高性能消息框架」再次细节领略Disruptor的底层原理和优势分析相关的知识,希望能为你提供帮助。
Disruptor原理
首先Disruptor是为了解决高并发缓存的队列,为线程间通讯提供高效的性能,它是如何做到无阻塞、多生产、多消费的?

优化技术专题「线程间的高性能消息框架」再次细节领略Disruptor的底层原理和优势分析

文章图片

上图简单的画了一下构建Disruptor的各个参数以及 ringBuffer 的构造,下面简单的说一下。
生产者需要组件
  • Event模型:从生产者传递给消费者的数据单位,完全由用户定义其类型。
@Data public class SampleEvent { private Long id。 private String sampleDataStr。 }

  • EventFactory:创建事件(任务)的工厂类。(这里任务会创建好,保存在内存中,可以看做是一个空任务)。
public class SampleEventFactory implements EventFactory< SampleEvent> { @Override public SampleEvent newInstance() { // 实例化数据(建好空数据,等待后面初始化) return new SampleEvent()。 } }

  • RingBuffer:环形缓冲区通常被认为是Disruptor的主要实现,当前版本即3.0版本之后,RingBuffer仅负责存储和更新通过Disruptor的数据(Event)。
    • ringBufferSize:容器的长度。( Disruptor 的核心容器是 ringBuffer,环转数组,有限长度)。
  • ProductType:生产者类型:单生产者、多生产者。
    • Sequencer:Sequencer是Disruptor的核心API。该接口的2个实现类(SingleProducer,MultiProducer)实现了所有并发算法,用于在生产者和消费者之间快速,正确地传递数据。
  • WaitStrategy:等待策略。(当队列里的数据都被消费完之后,消费者和生产者之间的等待策略),等待策略确定消费者如何等待生产者将事件放入Disruptor。
  • RingBuffer:存放数据的容器。
@Data @AllArgsConstructor public class SampleEventProducer { private RingBuffer< OrderEvent> ringBuffer。 public void sendData(long id) { //获取下一个可用序号 long sequence = ringBuffer.next()。 try { //获取一个空对象(没有填充值) SampleEvent sampleEent = ringBuffer.get(sequence)。 }finally { //提交 ringBuffer.publish(sequence)。 } } }

消费者需要组件
  • Executor:消费者线程池,执行任务的线程。(每一个消费者都需要从线程池里获得线程去消费任务)。
  • EventProcessor:用于处理来自Disruptor的事件的主事件循环,并具有消费者序列的所有权。有一个名为 BatchEventProcessor表示,它包含事件循环的有效实现,并将回调到使用的提供的EventHandler接口实现。
  • EventHandler:事件处理器,由用户实现并代表Disruptor的使用者的接口,用户客户端实现消息的处理机制,由客户端具体实现。
    public class SampleEventHandler implements EventHandler< SampleEvent> {/** * 事件驱动监听--消费者消费的主体 */ @Override public void onEvent(SampleEventevent, long sequence, boolean endOfBatch) throws Exception { System.out.println(event.getSampleDataStr() + "" +Thread.currentThread().getName())。 } }

算法核心Sequence序号
  • Sequence:Disruptor使用Sequences作为识别特定组件所在位置的方法。
    • 每个消费者(EventProcessor)都像Disruptor本身一样维护一个Sequence。大多数并发代码依赖于这些Sequence值的变化或者叫移动,因此Sequence支持AtomicLong的许多当前功能。
    • 事实上,唯一真正的区别是Sequence包含额外的功能,以防止序列和其他值之间的错误共享。
  • Sequence Barrier:序列屏障由Sequencer产生,包含对Sequencer中主要发布的sequence和任何依赖性消费者的序列的引用。它包含确定是否有任何可供消费者处理的事件的逻辑。
Disruptor的优点:
  1. 多线程之间没有竞争即没有锁。
  2. 所有访问者都记录自己的序号的实现方式,允许多个生产者与多个消费者共享相同的数据结构。
  3. 每个对象中都能跟踪序列号(ring buffer, claim strategy,生产者和消费者),加上神奇的缓存行填充,就意味着没有伪共享和非预期的竞争。
下面再简单介绍下RingBuffer核心实现,来看看队列的实现细节。
优化技术专题「线程间的高性能消息框架」再次细节领略Disruptor的底层原理和优势分析

文章图片

其为环形队列,有点像一致性Hash算法中的闭环,但完全不一样。
底层的话是一个固定大小的数组结构,相比于队列来说,其只有一个下标指针cursor,如果槽的个数是2的N次方更有利于基于二进制的计算机进行计算。如果看过HashMap源码应该知道,HashMap定位元素槽时使用了一种巧妙的方式,hash& (length-1)。
优化技术专题「线程间的高性能消息框架」再次细节领略Disruptor的底层原理和优势分析

文章图片

RingBuffer同样是相同的计算方式,sequence& (length-1),当然你可以进行取模操作。
  • 取模操作在寄存器中的计算,需要多次的迭代加操作进行的,所以相对于计算速度来说,对于计算机进行位运算效率绝对是高于取模操作的,尤其是对于高并发状况下的计算,能够节省很多单位cpu开销。
一般实现线性存储有两种实现方式:
  • 一种是基于连续内存分配的HashTable
  • 一种是基于随机内存分配的迭代指针。
为什么RingBuffer选用数组作为存储结构,而不选用链表存储?
缓存或者程序的局部性原理
  • (Good)数组内存属是连续分配内存的预读策略,也就是内存加载时,会将部分连续内存地址预先加载到高速缓存中,即认为你可能会使用,上面我们分析了操作系统中的cpu操作数据的流程,可以看出这种设计是为了不用反复从内存中加载。
  • (Bad)链表的内存分配是碎片化的所以其存储地址不是连续的,导致每次都会cpu都会重新计算下一个链表位置的地址,并从内存中加载相关的数据,数据量小的情况下并不能看出性能的优劣,但是当数据量大的情况下,这种极小的消耗,会对整体的运行效率产生影响。
伪共享内存以高速缓存行的形式存储在高速缓存系统中。高速缓存行是2的N次方个连续字节,其大小通常为32-256,最常见的缓存行大小为64字节。
  • 首先我们知道对于锁来说是关中断实现,锁定bus消息总线实现,而对于共享内存,计算机使用的是缓存行,共享变量的多个线程,共享相同的缓存行。
  • 实现线程数量的线性可伸缩性,我们必须确保没有两个线程写入同一个变量或缓存行。而当使用volatile的时候,我们读取直接共享变量从主内存或者叫共享内存中读取变量的值,其本质是使计算机缓存行失效。
在CPU核心A运行的线程想要更新变量X,而CPU核心B上的线程想要更新变量Y。
这两个热变量位于同一缓存行中。每个线程都将竞争缓存行的所有权,以便他们可以更新它。如果核心A获得所有权,那么MESI/MOSI缓存子系统将需要使核心B的相应缓存行无效。反之也是一样,极大地影响性能。如果竞争核心在不同的套接字上并且还必须跨越套接字互连,则缓存行问题将进一步加剧。
  • 特别是不同的线程操作同一个缓存行,需要发出RFO(Request for Owner)信号锁定缓存行,保证写操作的原子性,此时其他线程不能操作这个缓存行,这将对效率有极大的影响。
缓存行填充的概念下面是缓存行实现,另外缓存行填充有一个前提同时分配的对象往往位于同一位置。
public long p1, p2, p3, p4, p5, p6, p7; // cache line padding private volatile long cursor = INITIAL_CURSOR_VALUE; public long p8, p9, p10, p11, p12, p13, p14; // cache line padding

【优化技术专题「线程间的高性能消息框架」再次细节领略Disruptor的底层原理和优势分析】如果有不同的消费者往不同的字段写入,你需要确保各个字段间不会出现伪共享。
/** * 数组保存了VolatileLongPadding,其中数组中一个long类型保存数组长度,算上 * 自身long类型value,需要再填充6个long类型,就能将数组中的对象填充满一个缓存行。 * 注意:这里使用继承的方式实现缓存行对齐,因为java编译器会优化无效的字段。 */ class CacheLinePadding { // 如果不需要填充,只需要注释掉这段代码即可 public volatile long p1, p2, p3, p4, p5, p6; } class CacheLinePaddingObject extends CacheLinePadding { //实际操作的值 public volatile long value = https://www.songbingjia.com/android/0L; }


    推荐阅读