亦余心之所善兮,虽九死其犹未悔。这篇文章主要讲述分布式技术专题线程间的高性能消息框架-深入浅出Disruptor的使用和原理相关的知识,希望能为你提供帮助。
前提概要
简单回顾 jdk 里的队列:
阻塞队列:入队操作:
- 操作不阻塞:
- add:添加失败,则会直接进行返回。
- offer:添加失败后(满了)直接抛出异常,注意:offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
- 操作阻塞:
- put:满了,通过Condition:notFull.await()阻塞当前数据信息,当出队和删除元素时唤醒 put 操作。
- 操作不阻塞:
- poll:当空时直接返回 null。poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间,超时还没有数据可取,返回失败。
- remove:删除元素情况相关元素信息控制,InterruptException异常
- 操作阻塞:
- take:当空时,notEmpty.await()(当有元素入队时唤醒)。
- drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
- getDelay() 延迟时间。
- compareTo() 通过该方法比较从PriorityQueue里取值。
- 如果队列里无数据,元素入队时会被唤醒。
- 如果队列里有数据,会阻塞至时间满足。
- take-阻塞:
- poll-满足队列有数据并且 delay 时间小于0时候会取出元素,否则立即返回 null 可能会抢占成为 leader。
- 延时任务:设置任务延迟多久执行;需要设置过期值的处理,例如缓存过期。
- 实现方式:每次 getDelay() 方法提供一个缓存创建时间与当前时间的差值,出队时 compareTo() 方法取差值最小的。每次入队时都会重新取出队列里差值最小的值进行处理。
- 使用队列更多的是像生产者、消费者这种场景,这种场景大多数情况又对处理速度有着要求,所以我们会使用多线程技术。
- 使用多线程就可能会出现并发,为了避免出错,我们会选择线程安全的队列。
- ArrayBlockingQueue、LinkedBlockingQueue 或者是 ConcurrentLinkedQueue。前俩者是通过加锁取实现,后面一种是通过 cas 去实现线程安全。
- 要考虑到生产者过快可能造出的内存溢出的问题,所以看起来 ArrayBlockingQueue 是最符合要求的。
- ArrayBlockingQueue、LinkedBlockingQueue 或者是 ConcurrentLinkedQueue。前俩者是通过加锁取实现,后面一种是通过 cas 去实现线程安全。
- Disruptor的源码Git仓库地址:https://github.com/LMAX-Exchange/disruptor
- Disruptor的概念定义:异步体系的线程间的高性能消息框架
- Disruptor的核心思想:把多线程并发写的线程安全问题转化为线程本地写,即:不需要做同步,不许要进行加锁操作。
- 非常轻量,但性能却非常强悍,得益于其优秀的设计和对计算机底层原理的运用
- 单线程每秒能处理超600W的数据(Disruptor能在1秒内将600W数据发送给消费者,现在的硬件水平会远远在这个水平之上了!)
- 基于事件驱动模型,不用消费者主动拉取消息
- 比JDK的ArrayBlockingQueue性能高一个数量级
- 无锁序号栅栏
- 缓存行填充,消除伪共享
- 内存预分配
- 环形队列RingBuffer
- RingBuffer(环形队列):基于数组的内存级别缓存,是创建sequencer(序号)与定义WaitStrategy(拒绝策略)的入口。
- Disruptor(总体执行入口):对RingBuffer的封装,持有RingBuffer、消费者线程池Executor、消费之集合ConsumerRepository等引用。
- Sequence(序号分配器):对RingBuffer中的元素进行序号标记,通过顺序递增的方式来管理进行交换的数据(事件/Event),一个Sequence可以跟踪标识某个事件的处理进度,同时还能消除伪共享。
- Sequencer(数据传输器):Sequencer里面包含了Sequence,是Disruptor的核心,Sequencer有两个实现类:SingleProducerSequencer(单生产者实现)、MultiProducerSequencer(多生产者实现),Sequencer主要作用是实现生产者和消费者之间快速、正确传递数据的并发算法
- SequenceBarrier(消费者屏障):用于控制RingBuffer的Producer和Consumer之间的平衡关系,并且决定了Consumer是否还有可处理的事件的逻辑。
- WaitStrategy(消费者等待策略):决定了消费者如何等待生产者将Event生产进Disruptor,WaitStrategy有多种实现策略,分别是:
- BlockingWaitStrategy(最稳定的策略):阻塞方式,效率较低,但对cpu消耗小,内部使用的是典型的锁和条件变量机制(java的ReentrantLock),来处理线程的唤醒,这个策略是disruptor等待策略中最慢的一种,但是是最保守使用消耗cpu的一种用法,并且在不同的部署环境下最能保持性能一致。 但是,随着我们可以根据部署的服务环境优化出额外的性能。
- BusySpinWaitStrategy(性能最好的策略):自旋方式,无锁,BusySpinWaitStrategy是性能最高的等待策略,但是受部署环境的制约依赖也越强。 仅当event处理线程数少于物理核心数的时候才应该采用这种等待策略。 例如,超线程不可开启。
- LiteBlockingWaitStrategy(几乎不用,最接近原生的策略机制):BlockingWaitStrategy的变体版本,目前感觉不建议使用
- LiteTimeoutBlockingWaitStrategy:LiteBlockingWaitStrategy的超时版本
- PhasedBackoffWaitStrategy(最低CPU配置的策略):自旋 + yield + 自定义策略,当吞吐量和低延迟不如CPU资源重要,CPU资源紧缺,可以使用此策略。
- SleepingWaitStrategy:自旋休眠方式(无锁),性能和BlockingWaitStrategy差不多,但是这个对生产者线程影响最小,它使用一个简单的loop繁忙等待循环,但是在循环体中间它调用了LockSupport.parkNanos(1)。
- 一般情况在linux系统这样会使得线程停顿大约60微秒。不过这样做的好处是,生产者线程不需要额外的累加计数器,也不需要产生条件变量信号量开销。
- 负面影响是,在生产者线程与消费者线程之间传递event数据的延迟变高。所以SleepingWaitStrategy适合在不需要低延迟, 但需要很低的生产者线程影响的情形。一个典型的案例是异步日志记录功能。
- TimeoutBlockingWaitStrategy:BlockingWaitStrategy的超时阻塞方式
- YieldingWaitStrategy(充分进行实现CPU吞吐性能策略):自旋线程切换竞争方式(Thread.yield()),最快的方式,适用于低延时的系统,在要求极高性能且事件处理线数小于CPU逻辑核心数的场景中推荐使用此策略,它会充分使用压榨cpu来达到降低延迟的目标。
- 通过不断的循环等待sequence去递增到合适的值。 在循环体内,调用Thread.yield()来允许其他的排队线程执行。 这是一种在需要极高性能并且event handler线程数少于cpu逻辑内核数的时候推荐使用的策略。
- 这里说一下YieldingWaitStrategy使用要小心,不是特别要求性能的情况下,要谨慎使用,否则会引起服务起cpu飙升的情况,因为他的内部实现是在线程做100次递减然后Thread.yield(),可能会压榨cpu性能来换取速度。
- Event:从生产者到消费者过程中所处理的数据单元,Event由使用者自定义。
- EventHandler:由用户自定义实现,就是我们写消费者逻辑的地方,代表了Disruptor中的一个消费者的接口。
- EventProcessor:这是个事件处理器接口,实现了Runnable,处理主要事件循环,处理Event,拥有消费者的Sequence,这个接口有2个重要实现:
- WorkProcessor:多线程处理实现,在多生产者多消费者模式下,确保每个sequence只被一个processor消费,在同一个WorkPool中,确保多个WorkProcessor不会消费同样的sequence
- BatchEventProcessor:单线程批量处理实现,包含了Event loop有效的实现,并回调到了一个EventHandler接口的实现对象,这接口实际上是通过重写run方法,轮询获取数据对象,并把数据经过等待策略交给消费者去处理。
文章图片
- 构建 Disruptor 的各个参数以及 ringBuffer 的构造:
- EventFactory:创建事件(任务)的工厂类。
- ringBufferSize:容器的长度。
- Executor:消费者线程池,执行任务的线程。
- ProductType:生产者类型:单生产者、多生产者。
- WaitStrategy:等待策略。
- RingBuffer:存放数据的容器。
- EventHandler:事件处理器。
<
dependency>
<
groupId>
com.lmax<
/groupId>
<
artifactId>
disruptor<
/artifactId>
<
version>
3.4.2<
/version>
<
/dependency>
生产单消费简单模式案例: Event数据模型
import lombok.Data;
@Data
public class SampleEventModel {
private String data;
}
Event事件模型Factory工厂类
import com.lmax.disruptor.EventFactory;
/**
* 消息对象生产工厂
*/
public class SampleEventModelFactory implements EventFactory<
SampleEventModel>
{
@Override
public SampleEventModel newInstance() {
//返回空的消息对象数据Event
return new SampleEventModel();
}
}
EventHandler处理器操作
import com.lmax.disruptor.EventHandler;
/**
* 消息事件处理器
*/
public class SampleEventHandler implements EventHandler<
SampleEventModel>
{
/**
* 事件驱动模式
*/
@Override
public void onEvent(SampleEventModel event, long sequence, boolean endOfBatch) throws Exception {
// do ...
System.out.println("消费者消费处理数据:" + event.getData());
}
}
EventProducer工厂生产者服务处理器操作
import com.lmax.disruptor.RingBuffer;
/**
* 消息发送
*/
public class SampleEventProducer {
private RingBuffer<
SampleEventModel>
ringBuffer;
public SampleEventProducer(RingBuffer<
SampleEventModel>
ringBuffer) {
this.ringBuffer = ringBuffer;
}
/**
* 发布数据信息
* @param data
*/
public void publish(String data){
//从ringBuffer获取可用sequence序号
long sequence = ringBuffer.next();
try {
//根据sequence获取sequence对应的Event
//这个Event是一个没有赋值具体数据的对象
TestEvent testEvent = ringBuffer.get(sequence);
testEvent.setData(data);
} finally {
//提交发布
ringBuffer.publish(sequence);
}
}
}
EventProducer工厂生产者服务处理器操作
public class TestMain {
public static void main(String[] args) {
SampleEventModelFactory eventFactory = new SampleEventModelFactory();
int ringBufferSize = 1024 * 1024;
//这个线程池最好自定义
ExecutorService executor = Executors.newCachedThreadPool();
//实例化disruptor
Disruptor<
SampleEventModel>
disruptor = new Disruptor<
SampleEventModel>
(
eventFactory,//消息工厂
ringBufferSize,//ringBuffer容器最大容量长度
executor,//线程池,最好自定义一个
ProducerType.SINGLE,//单生产者模式
new BlockingWaitStrategy()//等待策略
);
//添加消费者监听 把TestEventHandler绑定到disruptor
disruptor.handleEventsWith(new SampleEventHandler());
//启动disruptor
disruptor.start();
//获取实际存储数据的容器RingBuffer
RingBuffer<
SampleEventModel>
ringBuffer = disruptor.getRingBuffer();
//生产发送数据
SampleEventProducer producer = new SampleEventProducer(ringBuffer);
for(long i = 0;
i <
100;
i ++){
producer.publish(i);
}
disruptor.shutdown();
executor.shutdown();
}
}
Disruptor的原理分析
- 使用循环数组代替队列生产者消费者模型自然是离不开队列的,使用预先填充数据的方式来避免 GC;
- 使用CPU缓存行填充的方式来避免极端情况下的数据争用导致的性能下降;
- 多线程编程中尽量避免锁争用的编码技巧。
文章图片
循环数组
- 设定一个数字标志表示当前的可用的位置(可以从0开始)。
- 头标志位表示下一个可以插入的位置。
- 头标志位不能大于尾标志位一个数组长度(因为这样就插入的位置和读取的位置就重叠了会导致数据丢失)。
- 尾标志位表示下一个可以读取的位置。
- 尾标志位不能等于头标志位(因为这样读取的数据实际上是上一轮的旧数据) 预先填充提高性能,我们知道在java中如果创造大量的对象使用后弃用,JVM 会在适当的时候进行GC操作。
- 头标志位表示下一个可以插入的位置。
- 当这个数字标志不断增长到大于数组长度时进行与数组长度的并运算,得到的新数字依然在数组的长度范围内,就又可以插入。
- 这样就好像一直插入直到数组末尾又再次从头开始,故而称之为循环数组。 一般的循环数组有头尾两个标志位。这点和队列很像。
只要争用的情况存在,并且线程较多,都会出现对资源的不断消耗。争用的对象越多,争用中消耗掉的资源也就越多。
为了避免这样的情况,减少争用的资源就是一个手段。比如在循环数组中只保留一个标志位,也就是下一个可以写入数据位置的标志位。而尾部标志位则在各个消费者线程中保存(具体的编程手法后续细讲)。
循环数组在单线程
- 循环数组在单线程中的使用,如果确定只有一个生产者,也就是说只有一个写线程。则在循环数组中的使用会更加简化。
- 具体来说单线程更新数组上的标志位,那这种情况,标志位就无需采用CAS写的方式来确定下一个可写入的位置,直接就是在单线程内进行普通的更新即可。
- 循环数组在多线程中的使用,如果存在多个生产者,则可写入的标志位需要用CAS 算法来进行争夺,避免锁的使用。
- 多个线程通过CAS得到唯一的不冲突的下一个可写序号,由于需要获得序号后才能进行写入,而写入完成才可以让消费者线程进行消费。
- 所以才获得序号后,完成写入前,必须有一种方式让消费者检测是否完成。
- 以避免消费者拿到还未填入输入的数组位。 为了达到这个目标,存在简单—效率低和复杂—效率高两种方式。
- prePut:表示下一个可以供生产者放入的位置;
- 多个生产者通过 CAS 获得 prePut 的不同的值,在获得的序号并且完成数据写入后,将 put 的值以 CAS 方式递增(比如获得的序号是7,只有 put 是6的时候才允许设置成功),称之为发布。
- 这种方式存在一个缺点,如果多个线程并发写入,获取 prePut 的值不会堵塞,假设其中一个生产者在写入数据的时候稍慢,则其他的线程写入完毕也无法完成发布。就会导致循环等待,浪费了 CPU 性能。
- put:表示最后一个生产者已经放入的位置。
- 复杂但是可能效率高的方式,在上面的方式中,主要的争夺环节集中在多线程发布中,序号大的线程发布需要等到序号小的线程发布完成后才能发布。那我们的优化的点也在这个地方。
- 这样就可以避免发布的争夺。 但是又来带来一个问题,用什么数字来表示是否已经发布完成?如果只是0和1,那么写过1轮以后,标志数组位上就都是1了。又无法区分。
- 所以标志数组上的数字应该在循环数组的每一轮循环的值都不同。
- CPU 为了更快的执行代码。于是当从内存中读取数据时,并不是只读自己想要的部分。而是读取足够的字节来填入高速缓存行。根据不同的 CPU ,高速缓存行大小不同。如 X86 是 32BYTES ,而 ALPHA 是 64BYTES 。并且始终在第 32 个字节或第 64 个字节处对齐。这样,当 CPU 访问相邻的数据时,就不必每次都从内存中读取,提高了速度。 因为访问内存要比访问高速缓存用的时间多得多。
- 这个缓存是CPU内部自己的缓存,内部的缓存单位是行,叫做缓存行。在多核环境下会出现CPU之间的内存同步问题(比如一个核加载了一份缓存,另外一个核也要用到同一份数据),如果每个核每次需要时都往内存中存取(一个在读缓存,一个在写缓存时,造成数据不一致),这会带来比较大的性能损耗。
- 数据在缓存中不是以独立的项来存储的,如不是一个单独的变量,也不是一个单独的指针。缓存是由缓存行组成的,通常是64字节(译注:这篇文章发表时常用处理器的缓存行是64字节的,比较旧的处理器缓存行是32字节),并且它有效地引用主内存中的一块地址。一个Java的long类型是8字节,因此在一个缓存行中可以存8个long类型的变量。
- 当数组中的一个值被加载到缓存中,它会额外加载另外7个。因此你能非常快地遍历这个数组。事实上,你可以非常快速的遍历在连续的内存块中分配的任意数据结构。
- 因此如果你数据结构中的项在内存中不是彼此相邻的,你将得不到免费缓存加载所带来的优势。并且在这些数据结构中的每一个项都可能会出现缓存未命中。
- 设想你的long类型的数据不是数组的一部分。设想它只是一个单独的变量。让我们称它为head,这么称呼它其实没有什么原因。然后再设想在你的类中有另一个变量紧挨着它。让我们直接称它为tail。现在,当你加载head到缓存的时候,你也免费加载了tail
推荐阅读
- Linux 6
- 测试平台系列(51) 编写数据库连接相关方法
- Java架构师-十项全能 百度网盘
- Java技术指南「并发编程专题」CompletionService框架基本使用和原理探究(基础篇
- 循环神经网络LSTM RNN回归(sin曲线预测)
- Kibana系列--安装与配置
- flink sql 知其所以然(flink sql tumble window 的奇妙解析之路)
- 搭建PG数据库一些归档的配置
- CentOS7.3在终端输入字母时出现了大小写乱跳