宝剑锋从磨砺出,梅花香自苦寒来。这篇文章主要讲述优化技术专题「高性能框架」终极关注Disruptor源码和Java8的@Contended伪共享相关的知识,希望能为你提供帮助。
Disruptor原理分析
启动Disruptorstart() ->
开启 Disruptor,运行事件处理器。
public RingBuffer<
T>
start(){
checkOnlyStartedOnce();
//在前面 handleEventsWith() 方法里添加的 handler 对象会加入到 consumerRepository 里,这里遍历 consumerRepository 开启消费者线程
for (final ConsumerInfo consumerInfo : consumerRepository){
//从线程池中获取一个线程来开启消费事件处理器。(消费者开启监听,一旦有生产者投递,即可消费)
//这里开启的线程对象为BatchEventProcessor的实例
consumerInfo.start(executor)。
}
return ringBuffer。
}
关联事件handleEventsWith() -> createEventProcessors()调用的核心方法,作用是创建事件处理器。
@SafeVarargs
public final EventHandlerGroup<
T>
handleEventsWith(final EventHandler<
? super T>
... handlers){
return createEventProcessors(new Sequence[0], handlers);
}
存储事件将EventHandler对象绑定存储到consumerRepository内部,并且交由BatchEventProcessor处理器进行代理执行。
EventHandlerGroup<
T>
createEventProcessors(
final Sequence[] barrierSequences,
final EventHandler<
? super T>
[] eventHandlers){
...
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
//创建 sequence 序号栅栏
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences)。
for (int i = 0, eventHandlersLength = eventHandlers.length。i <
eventHandlersLength。i++){
final EventHandler<
? super T>
eventHandler = eventHandlers[i];
final BatchEventProcessor<
T>
batchEventProcessor = new BatchEventProcessor<
>
(ringBuffer, barrier, eventHandler)。
...
//这里将消费者加入到 consumerRepository 中---ConsumerRepository
consumerRepository.add(batchEventProcessor, eventHandler, barrier)。
processorSequences[i] = batchEventProcessor.getSequence()。
}
...
}
- handleEventsWith() 方法中,可以看到构建了一个 BatchEventProcessor(继承了 Runnable 接口)对象,start()方法启动的同样也是这个对象的实例。
- 这个对象继承自 EventProcessor ,EventProcessor 是 Disruptor 里非常核心的一个接口,它的实现类的作用是轮询接收RingBuffer提供的事件,并在没有可处理事件是实现等待策略。
- 这个接口的实现类必须要关联一个线程去执行,通常我们不需要自己去实现它。
- Sequence :维护当前消费者消费的 ID。
- SequenceBarrier:序号屏障,协调消费者的消费 ID,主要作用是获取消费者的可用序号,并提供等待策略的执行。
- EventHandler<
? super T>
:消费者的消费逻辑(我们实现的业务逻辑)。
- DataProvider :获取消费对象。RingBuffer 实现了此接口,主要是提供业务对象。
- processEvents():由于 BatchEventProcessor 继承自 Runnable 接口,所以在前面启动它后,run() 方法会执行,而 run() 方法内部则会调用此方法。
private void processEvents()
{
T event = null。
获取当前消费者维护的序号中并+1,即下一个消费序号
long nextSequence = sequence.get() + 1L。
while (true) {
try {
//获取可执行的最大的任务 ID,如果没有,waitFor() 方法内会进行等待
final long availableSequence = sequenceBarrier.waitFor(nextSequence)。
if (batchStartAware != null &
&
availableSequence >
= nextSequence) {
batchStartAware.onBatchStart(availableSequence - nextSequence + 1)。
}
//不断获取对应位置的任务进行消费 直到上面查询到的 availableSequence 消费完
while (nextSequence <
= availableSequence) {
event = dataProvider.get(nextSequence)。
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence)。
nextSequence++。
}
sequence.set(availableSequence)。
}
...
}
}
- 消费者事件处理器的核心代码,sequenceBarrier.waitFor(nextSequence) 方法内部,会比较当前消费者序号与可用序号的大小:
- 当可用序号(availableSequence)大于当前消费者序号(nextSequence),再获取到当前可用的最大的事件序号 ID(waitFot()方法内部调用 sequencer.getHighestPublishedSequence(sequence, availableSequence)),进行循环消费。
- 可用序号是维护在 ProcessingSequenceBarrier 里的,ProcessingSequenceBarrier 是通过 ringBuffer.newBarrier() 创建出来的。
文章图片
多消费事件和单消费事件在dependentSequence 上的处理有一些不同,可以看下 ProcessingSequenceBarrier 的 dependentSequence 的赋值以及 get() 方法 (Util.getMinimumSequence(sequences))。
启动过程分析之生产者
首先调用了 ringBuffer.next() 方法,获取可用序号,再获取到该序号下事先通过 Eventfactory 创建好的空事件对象,在我们对空事件对象进行赋值后,再调用 publish 方法将事件发布,则消费者就可以获取进行消费了。
生产者这里的核心代码如下,这里我截取的是多生产者模式下的代码:
public long next(int n){
if (n <
1 || n >
bufferSize) {
throw new IllegalArgumentException("n must be >
0 and <
bufferSize")。
}
long current。
long next。
do{
//cursor 为生产者维护的 sequence 序列,获取到当前可投递的的下标,即当前投递到该位置
current = cursor.get()。
//再+n获取下一个下标,即下一次投递的位置。
next = current + n。
long wrapPoint = next - bufferSize。
//目的:也是实现快读的读写。gatingSequenceCache独占缓存行
long cachedGatingSequence = gatingSequenceCache.get()。
if (wrapPoint >
cachedGatingSequence || cachedGatingSequence >
current){
//获取消费者最小序号
long gatingSequence = Util.getMinimumSequence(gatingSequences, current)。
if (wrapPoint >
gatingSequence) {
//如果不符合,则阻塞线程 1ns(park()不会有死锁的问题)
LockSupport.parkNanos(1)。
// TODO, should we spin based on the wait strategy?
continue。
}
gatingSequenceCache.set(gatingSequence)。
}
//多个生产者时要保证线程安全(这里更新的 cursor 同时也是等待策略里的 waitFor() 方法的 cursor 参数,因此这里更新成功后,则等待策略会通过,表示有新的任务进来,就会消费)
else if (cursor.compareAndSet(current, next)){
break。
}
}while (true);
return next。
}
- 从图里可以看出,Sequence 继承以及间接继承了 RhsPadding 和 LhsPadding 类,而这俩个类都各定义了 7 个 long 类型的成员变量。
- 而 Sequence 的 get() 方法返回的也是一个 long 类型的值 value。这是上一篇文章介绍的充缓存行,消除伪共享。
- 在 64 位的计算机中,单个缓存行一般占 64 个字节,当 cpu 从换存里取数据时,会将该相关数据的其它数据取出来填满一个缓存行,这时如果其它数据更新,则缓存行缓存的该数据也会失效,当下次需要使用该数据时又需要重新从内存中提取数据。
- ArrayBlockingQueue 获取数据时,很容易碰到伪共享导致缓存行失效,而 Disruptor这里当在 value 的左右各填充 7 个 long 类型的数据时,每次取都能确保该数据独占缓存行,也不会有其他的数据更新导致该数据失效。避免了伪共享的问题( jdk 的并发包下也有一些消除伪共享的设计)。
文章图片
RingBuffer:它是一个首尾相接的环状的容器,用来在多线程中传递数据。第一张图里面创建 Disruptor 的多个参数其实都是用来创建 RingBuffer 的,比如生产者类型(单 or 多)、实例化工厂、容器长度、等待策略等。
简单分析,多个生产者同时向 ringbuffer 投递数据,假设此时俩个生产者将 ringbuffer 已经填满,因为 sequence 的序号是自增+1(若不满足获取条件则循环挂起当前线程),所以生产的时候能保证线程安全,只需要一个 sequence 即可。
【优化技术专题「高性能框架」终极关注Disruptor源码和Java8的@Contended伪共享】当多消费者来消费的时候,因为消费速度不同,例如消费者 1 来消费 0、1,消费者 2 消费 2、4,消费者 3 消费 3。
当消费者消费完 0 后,消费者 2 消费完 2 后,消费者 3 消费完 3 后,生产者再往队列投递数据时,其他位置还未被消费,会投递到第 0 个位置, 此时再想投递数据时,虽然消费 2 的第二个位置空缺、消费者 3 的第三个位置空缺,消费者还在消费 1 时,无法继续投递。因为是通过比较消费者自身维护的 sequence 的最小的序号,来进行比较。
Util.getMinimumSequence(gatingSequences, current) 方法也就无需再多说,它就是为了获取到多个消费者的最小序号,判断当前 ringBuffer 中的剩余可用序号是否大于消费者最小序号,是的话,则不能投递,需要阻塞当前线程(LockSupport.parkNanos(1))。
- BlockingWaitStrategy 使用了锁,低效的策略。
- SleepingWaitStrategy 对生产者线程的影响最小,适合用于异步日志类似的场景。(不加锁空等)
- YieldingWaitStrategy 性能最好,适合用于低延迟的系统,在要求极高性能且之间处理线数小于 cpu 逻辑核心数的场景中,推荐使用。
@Override
public long waitFor(
final long sequence, Sequence cursor, final Sequence dependentSequence, final SequenceBarrier barrier)
throws AlertException, InterruptedException{
long availableSequence。
int counter = SPIN_TRIES。//100
while ((availableSequence = dependentSequence.get()) <
sequence){
counter = applyWaitMethod(barrier, counter)。
}
return availableSequence。
}
private int applyWaitMethod(final SequenceBarrier barrier, int counter)
throws AlertException
{
barrier.checkAlert()。if (0 == counter)
{
Thread.yield()。
}
else
{
--counter。
}
return counter。
}
java 8 Contended注解
- 在Java 8中,可以采用@Contended在类级别上的注释,来进行缓存行填充。这样,可以解决多线程情况下的伪共享冲突问题。
- Contended可以用于类级别的修饰,同时也可以用于字段级别的修饰,当应用于字段级别时,被注释的字段将和其他字段隔离开来,会被加载在独立的缓存行上。在字段级别上,@Contended还支持一个“contention group”属性(Class-Level不支持),同一group的字段们在内存上将是连续(64字节范围内),但和其他他字段隔离开来。
@Contended
public static class ContendedTest2 {
private Object plainField1;
private Object plainField2;
private Object plainField3;
private Object plainField4;
}
将使整个字段块的两端都被填充:(以下是使用 –XX:+PrintFieldLayout的输出)
TestContended$ContendedTest2: field layout
Entire class is marked contended
@140 --- instance fields start ---
@140 "plainField1" Ljava.lang.Object;
@144 "plainField2" Ljava.lang.Object;
@148 "plainField3" Ljava.lang.Object;
@152 "plainField4" Ljava.lang.Object;
@288 --- instance fields end ---
@288 --- instance ends ---
在字段上应用Contended:
public static class ContendedTest1 {
@Contended
private Object contendedField1;
private Object plainField1;
private Object plainField2;
private Object plainField3;
private Object plainField4;
}
将导致该字段从连续的字段块中分离开来并高效的添加填充:
TestContended$ContendedTest1: field layout
@ 12 --- instance fields start ---
@ 12 "plainField1" Ljava.lang.Object;
@ 16 "plainField2" Ljava.lang.Object;
@ 20 "plainField3" Ljava.lang.Object;
@ 24 "plainField4" Ljava.lang.Object;
@156 "contendedField1" Ljava.lang.Object;
(contended, group = 0)
@288 --- instance fields end ---
@288 --- instance ends ---
注解多个字段使他们分别被填充:
public static class ContendedTest4 {
@Contended
private Object contendedField1;
@Contended
private Object contendedField2;
private Object plainField3;
private Object plainField4;
}
被注解的2个字段都被独立地填充:
TestContended$ContendedTest4: field layout
@ 12 --- instance fields start ---
@ 12 "plainField3" Ljava.lang.Object;
@ 16 "plainField4" Ljava.lang.Object;
@148 "contendedField1" Ljava.lang.Object;
(contended, group = 0)
@280 "contendedField2" Ljava.lang.Object;
(contended, group = 0)
@416 --- instance fields end ---
@416 --- instance ends ---
在有些cases中,你会想对字段进行分组,同一组的字段会和其他字段有访问冲突,但是和同一组的没有。例如,(同一个线程的)代码同时更新2个字段是很常见的情况。
public static class ContendedTest5 {
@Contended("updater1")
private Object contendedField1;
@Contended("updater1")
private Object contendedField2;
@Contended("updater2")
private Object contendedField3;
private Object plainField5;
private Object plainField6;
}
内存布局是:
TestContended$ContendedTest5: field layout
@ 12 --- instance fields start ---
@ 12 "plainField5" Ljava.lang.Object;
@ 16 "plainField6" Ljava.lang.Object;
@148 "contendedField1" Ljava.lang.Object;
(contended, group = 12)
@152 "contendedField2" Ljava.lang.Object;
(contended, group = 12)
@284 "contendedField3" Ljava.lang.Object;
(contended, group = 15)
@416 --- instance fields end ---
@416 --- instance ends ---
@Contended在字段级别,并且带分组的情况下,是否能解决伪缓存问题。
import sun.misc.Contended;
public class VolatileLong {
@Contended("group0")
public volatile long value1 = 0L;
@Contended("group0")
public volatile long value2 = 0L;
@Contended("group1")
public volatile long value3 = 0L;
@Contended("group1")
public volatile long value4 = 0L;
}
用2个线程来修改字段
- 测试1:线程0修改value1和value2;线程1修改value3和value4;他们都在同一组中。
- 测试2:线程0修改value1和value3;线程1修改value2和value4;他们在不同组中。
public final class FalseSharing implements Runnable {
public final static long ITERATIONS = 500L * 1000L * 1000L;
private static Volatile Long volatileLong;
private String groupId;
public FalseSharing(String groupId) {
this.groupId = groupId;
}
public static void main(final String[] args) throws Exception {
// Thread.sleep(10000);
System.out.println("starting....");
volatileLong = new VolatileLong();
final long start = System.nanoTime();
runTest();
System.out.println("duration = " + (System.nanoTime() - start));
}private static void runTest() throws InterruptedException {
Thread t0 = new Thread(new FalseSharing("t0"));
Thread t1 = new Thread(new FalseSharing("t1"));
t0.start();
t1.start();
t0.join();
t1.join();
}
public void run() {
long i = ITERATIONS + 1;
if (groupId.equals("t0")) {
while (0 != --i) {
volatileLong.value1 = i;
volatileLong.value2 = i;
}
} else if (groupId.equals("t1")) {
while (0 != --i) {
volatileLong.value3 = i;
volatileLong.value4 = i;
}
}
}
}public void run() {
long i = ITERATIONS + 1;
if (groupId.equals("t0")) {
while (0 != --i) {
volatileLong.value1 = i;
volatileLong.value3 = i;
}
} else if (groupId.equals("t1")) {
while (0 != --i) {
volatileLong.value2 = i;
volatileLong.value4 = i;
}
}
}
推荐阅读
- uni-app技术分享| 用uni-app实现拖动的诀窍
- 木棉花(手机游戏——黑白翻棋)
- 短代码未在Divi Builder中执行
- 以编程方式设置WordPress主菜单
- 以编程方式设置WordPress语言()
- 设置WordPress主题-发布内容可使屏幕滚动
- 在wp_enqueue_style中为不同的媒体设置多CSS文件
- WP主题选项中带有容器的单独字段数组(Carbon Fields)
- 使用wp_login()钩子设置Cookie