解读Disruptor系列--解读源码(2)之生产者

之前我们一起分析了Disruptor的初始化和启动代码,接下来我们来分析下生产者的发布代码。还不太了解的同学建议看看我之前发的Disruptor原理翻译和导读文章,尤其是一些名词概念最好要清楚是做什么用的。
1 生产者线程 生产者一般就是我们的应用线程,在发布通常使用一个EventTranslator将数据转移到RingBuffer上,因为不涉及共享数据和实例变量,通常使用同一个EventTranslator实例进行操作(注:translate经常是“翻译”的意思,但其实还有“ move from one place or condition to another.”的转移、转换的意思)。
根据同一事件传入参数的多少,可以选择不同接口接收参数。
解读Disruptor系列--解读源码(2)之生产者
文章图片
image.png

/** * 生产者在发布事件时,使用翻译器将原始对象设置到RingBuffer的对象中 */ static class IntToExampleEventTranslator implements EventTranslatorOneArg{static final IntToExampleEventTranslator INSTANCE = new IntToExampleEventTranslator(); @Override public void translateTo(ExampleEvent event, long sequence, Integer arg0) { event.data = https://www.it610.com/article/arg0 ; System.err.println("put data "+sequence+", "+event+", "+arg0); } }// 生产线程0 Thread produceThread0 = new Thread(new Runnable() { @Override public void run() { int x = 0; while(x++ < events / 2){ // 除了下面这行代码,其他都没有关系 disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x); } } }); // 生产线程1 Thread produceThread1 = new Thread(new Runnable() { @Override public void run() { int x = 0; while(x++ < events / 2){ disruptor.publishEvent(IntToExampleEventTranslator.INSTANCE, x); } } }); produceThread0.start(); produceThread1.start();

在demo中,我们实例化并启动了两个线程,用来生产事件放置到Disruptor中。
接下来我们跟随源码一点点深入。
2 生产事件的整体逻辑
// Disruptor.java /** * Publish an event to the ring buffer. 使用给定的事件翻译器,发布事件 * * @param eventTranslator the translator that will load data into the event. * @param argA single argument to load into the event */ publicvoid publishEvent(final EventTranslatorOneArg eventTranslator, final A arg) { ringBuffer.publishEvent(eventTranslator, arg); } //之前也讲过,Disruptor这个类是一个辅助类,在发布事件时其实是委托给RingBuffer完成发布操作。 //RingBuffer.publishEvent()的逻辑大概分为两个步骤:第一步先占有RingBuffer上的一个可用位置,我们简称为“占坑”;第二步在可用位置发布数据,我们简称为“填坑”。 // RingBuffer publicvoid publishEvent(EventTranslatorOneArg translator, A arg0) { final long sequence = sequencer.next(); // 第一步 占坑 translateAndPublish(translator, sequence, arg0); // 第二步 填坑 }

其中第二步中,在填坑完毕还要调用Sequencer接口的publish方法对外发布事件。为啥呢?先留个疑问。
在第一步占坑中,首先通过调用Sequencer.next()获取RingBuffer实例下一个能用的序号。
AbstractSequencer作为一个抽象类,实现了Sequencer接口,是单生产者Sequencer和多生产者Sequencer的父类。
3 Disruptor的核心--Sequencer接口 为什么说Sequencer是Disruptor的核心呢?其实这也不是我说的,是Disruptor官方Wiki Introduction上说的:

解读Disruptor系列--解读源码(2)之生产者
文章图片
image.png
Sequencer是用来保证生产者和消费者之间正确、高速传递数据的。我们先来看看以生产者的角度看Sequencer有什么作用。
先来张类图。
解读Disruptor系列--解读源码(2)之生产者
文章图片
Sequencer类图
下边是Sequencer接口及其父接口Cursored、Sequenced 定义。
// Sequencer /** * Coordinates claiming sequences for access to a data structure while tracking dependent {@link Sequence}s */ public interface Sequencer extends Cursored, Sequenced { /** * Set to -1 as sequence starting point * 序号开始位置 */ long INITIAL_CURSOR_VALUE = https://www.it610.com/article/-1L; /** * Claim a specific sequence.Only used if initialising the ring buffer to * a specific value. * * @param sequence The sequence to initialise too. * 声明指定序号,只用在初始化RingBuffer到指定值,基本上不用了 */ void claim(long sequence); /** * Confirms if a sequence is published and the event is available for use; non-blocking. * * @param sequence of the buffer to check * @return true if the sequence is available for use, false if not * 用非阻塞方式,确认某个序号是否已经发布且事件可用。 */ boolean isAvailable(long sequence); /** * Add the specified gating sequences to this instance of the Disruptor.They will * safely and atomically added to the list of gating sequences. * * @param gatingSequences The sequences to add. * 增加门控序列(消费者序列),用于生产者在生产时避免追尾消费者 */ void addGatingSequences(Sequence... gatingSequences); /** * Remove the specified sequence from this sequencer. * * @param sequence to be removed. * @return true if this sequence was found, false otherwise. * 从门控序列中移除指定序列 */ boolean removeGatingSequence(Sequence sequence); /** * Create a new SequenceBarrier to be used by an EventProcessor to track which messages * are available to be read from the ring buffer given a list of sequences to track. * * @param sequencesToTrack * @return A sequence barrier that will track the specified sequences. * @see SequenceBarrier * 消费者使用,用于追踪指定序列(通常是上一组消费者的序列) */ SequenceBarrier newBarrier(Sequence... sequencesToTrack); /** * Get the minimum sequence value from all of the gating sequences * added to this ringBuffer. * * @return The minimum gating sequence or the cursor sequence if * no sequences have been added. * 获取追踪序列中最小的序列 */ long getMinimumSequence(); /** * Get the highest sequence number that can be safely read from the ring buffer.Depending * on the implementation of the Sequencer this call may need to scan a number of values * in the Sequencer.The scan will range from nextSequence to availableSequence.If * there are no available values >= nextSequence the return value will be * nextSequence - 1.To work correctly a consumer should pass a value that * is 1 higher than the last sequence that was successfully processed. * * @param nextSequenceThe sequence to start scanning from. * @param availableSequence The sequence to scan to. * @return The highest value that can be safely read, will be at least nextSequence - 1. * * 获取能够从环形缓冲读取的最高的序列号。依赖Sequencer的实现,可能会扫描Sequencer的一些值。扫描从nextSequence * 到availableSequence。如果没有大于等于nextSequence的可用值,返回值将为nextSequence-1。为了工作正常,消费者 * 应该传递一个比最后成功处理的序列值大1的值。 */ long getHighestPublishedSequence(long nextSequence, long availableSequence); EventPoller newPoller(DataProvider provider, Sequence... gatingSequences); }// Cursored.java /** * Implementors of this interface must provide a single long value * that represents their current cursor value.Used during dynamic * add/remove of Sequences from a * {@link SequenceGroups#addSequences(Object, java.util.concurrent.atomic.AtomicReferenceFieldUpdater, Cursored, Sequence...)}. * 游标接口,用于获取生产者当前游标位置 */ public interface Cursored { /** * Get the current cursor value. * * @return current cursor value */ long getCursor(); }// Sequenced.java public interface Sequenced { /** * The capacity of the data structure to hold entries. * * @return the size of the RingBuffer. * 获取环形缓冲的大小 */ int getBufferSize(); /** * Has the buffer got capacity to allocate another sequence.This is a concurrent * method so the response should only be taken as an indication of available capacity. * * @param requiredCapacity in the buffer * @return true if the buffer has the capacity to allocate the next sequence otherwise false. * 判断是否含有指定的可用容量 */ boolean hasAvailableCapacity(final int requiredCapacity); /** * Get the remaining capacity for this sequencer. * * @return The number of slots remaining. * 剩余容量 */ long remainingCapacity(); /** * Claim the next event in sequence for publishing. * * @return the claimed sequence value * 生产者发布时,申请下一个序号 */ long next(); /** * Claim the next n events in sequence for publishing.This is for batch event producing.Using batch producing * requires a little care and some math. *
* int n = 10; * long hi = sequencer.next(n); * long lo = hi - (n - 1); * for (long sequence = lo; sequence <= hi; sequence++) { *// Do work. * } * sequencer.publish(lo, hi); *

* * @param n the number of sequences to claim * @return the highest claimed sequence value * 申请n个序号,用于批量发布 */ long next(int n); /** * Attempt to claim the next event in sequence for publishing.Will return the * number of the slot if there is at least requiredCapacity slots * available. * * @return the claimed sequence value * @throws InsufficientCapacityException * next()的非阻塞模式 */ long tryNext() throws InsufficientCapacityException; /** * Attempt to claim the next n events in sequence for publishing.Will return the * highest numbered slot if there is at least requiredCapacity slots * available.Have a look at {@link Sequencer#next()} for a description on how to * use this method. * * @param n the number of sequences to claim * @return the claimed sequence value * @throws InsufficientCapacityException * next(n)的非阻塞模式 */ long tryNext(int n) throws InsufficientCapacityException; /** * Publishes a sequence. Call when the event has been filled. * * @param sequence * 数据填充后,发布此序号 */ void publish(long sequence); /** * Batch publish sequences.Called when all of the events have been filled. * * @param lo first sequence number to publish * @param hi last sequence number to publish * 批量发布序号 */ void publish(long lo, long hi); }

3.1 单生产者发布事件
下边先看使用单生产者SingleProducerSequencer具体是怎么占坑的。
// SingleProducerSequencer.java @Override public long next() { return next(1); }/** * @see Sequencer#next(int) */ @Override public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); } // 复制上次申请完毕的序列值 long nextValue = https://www.it610.com/article/this.nextValue; // 加n,得到本次需要申请的序列值,单个发送n为1 long nextSequence = nextValue + n; // 本次要验证的值 // 可能发生绕环的点,本次申请值 - 一圈长度 long wrapPoint = nextSequence - bufferSize; long cachedGatingSequence = this.cachedValue; // 数值最小的序列值,也就是最慢消费者 // wrapPoint 等于 cachedGatingSequence 将发生绕环行为,生产者将在环上,从后方覆盖未消费的事件。 // 如果即将生产者超一圈从后方追消费者尾(要申请的序号落了最慢消费者一圈)或 消费者追生产者尾,将进行等待。后边这种情况应该不会发生吧? // 针对以上值举例:400米跑道(bufferSize),小明跑了599米(nextSequence),小红(最慢消费者)跑了200米(cachedGatingSequence)。小红不动,小明再跑一米就撞翻小红的那个点,叫做绕环点wrapPoint。 // 没有空坑位,将进入循环等待。 if (wrapPoint> cachedGatingSequence || cachedGatingSequence > nextValue) { cursor.setVolatile(nextValue); // StoreLoad fencelong minSequence; // 只有当消费者消费,向前移动后,才能跳出循环 // 由于外层判断使用的是缓存的消费者序列最小值,这里使用真实的消费者序列进行判断,并将最新结果在跳出while循环之后进行缓存 while (wrapPoint > (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) {// 唤醒等待的消费者 waitStrategy.signalAllWhenBlocking(); LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? } // 当消费者向前消费后,更新缓存的最小序号 this.cachedValue = https://www.it610.com/article/minSequence; } // 将成功申请的序号赋值给对象实例变量 this.nextValue = nextSequence; return nextSequence; }

next()占坑成功将会返回坑位号,回到RingBuffer的publishEvent方法,执行translateAndPublish方法,进行填坑和发布操作。
// RingBuffer.java private void translateAndPublish(EventTranslator translator, long sequence) { try { translator.translateTo(get(sequence), sequence); } finally { sequencer.publish(sequence); } }

translator参数用户定义的对EventTranslator接口的实现对象。
上文已经介绍过EventTranslator接口,除EventTranslator外,还有EventTranslatorOneArg,EventTranslatorTwoArg,EventTranslatorThreeArg,EventTranslatorVararg。功能是将给定的数据填充到指定坑位的对象(因为RingBuffer上已经预先分配了对象)上,只不过分别对应不同参数。简单看下EventTranslatorOneArg接口定义。
public interface EventTranslatorOneArg { /** * Translate a data representation into fields set in given event * * @param event into which the data should be translated. * @param sequence that is assigned to event. * @param arg0 The first user specified argument to the translator */ void translateTo(final T event, long sequence, final A arg0); }

在放好数据后,就可以调用sequencer的publish方法发布对象了。首先是更新当前游标,更新完毕再通知等待中的消费者,消费者将继续消费。关于消费者的等待策略,后续还会讲到。
// SingleProducerSequencer.java @Override public void publish(long sequence) {// 在发布此位置可用时,需要更新Sequencer内部游标值,并在使用阻塞等待策略时,通知等待可用事件的消费者进行继续消费 cursor.set(sequence); // 除signalAllWhenBlocking外都是空实现 waitStrategy.signalAllWhenBlocking(); }// BlockingWaitStrategy.java @Override public void signalAllWhenBlocking() { lock.lock(); try { processorNotifyCondition.signalAll(); } finally { lock.unlock(); } }

3.2 插播Disruptor中的高效AtomicLong--Sequence
注意那个cursor,这个cursor可不是简单的long类型,而是Disruptor内部实现的Sequence类。
class LhsPadding { protected long p1, p2, p3, p4, p5, p6, p7; }class Value extends LhsPadding {// value的前后各有7个long变量,用于缓存行填充,前后各7个保证了不管怎样,当64位的缓存行加载时value,不会有其他变量共享缓存行,从而解决了伪共享问题 protected volatile long value; }class RhsPadding extends Value { protected long p9, p10, p11, p12, p13, p14, p15; }/** * Concurrent sequence class used for tracking the progress of * the ring buffer and event processors.Support a number * of concurrent operations including CAS and order writes. * * Also attempts to be more efficient with regards to false * sharing by adding padding around the volatile field. * Sequence可以按照AtomicLong来理解,除了Sequence消除了伪共享问题,更加高效 */ public class Sequence extends RhsPadding { static final long INITIAL_VALUE = https://www.it610.com/article/-1L; private static final Unsafe UNSAFE; private static final long VALUE_OFFSET; static { UNSAFE = Util.getUnsafe(); try { VALUE_OFFSET = UNSAFE.objectFieldOffset(Value.class.getDeclaredField("value")); } catch (final Exception e) { throw new RuntimeException(e); } }/** * Create a sequence initialised to -1. */ public Sequence() { this(INITIAL_VALUE); }/** * Create a sequence with a specified initial value. * * @param initialValue The initial value for this sequence. */ public Sequence(final long initialValue) { UNSAFE.putOrderedLong(this, VALUE_OFFSET, initialValue); }/** * Perform a volatile read of this sequence's value. * * @return The current value of the sequence. */ public long get() { return value; }/** * Perform an ordered write of this sequence.The intent is * a Store/Store barrier between this write and any previous * store. * * @param value The new value for the sequence. * 此方法等同于AtomicLong#lazySet(long newValue), * 和直接修改volatile修饰的value相比,非阻塞,更高效,但更新的值会稍迟一点看到 */ public void set(final long value) { UNSAFE.putOrderedLong(this, VALUE_OFFSET, value); }/** * Performs a volatile write of this sequence.The intent is * a Store/Store barrier between this write and any previous * write and a Store/Load barrier between this write and any * subsequent volatile read. * * @param value The new value for the sequence. */ public void setVolatile(final long value) { UNSAFE.putLongVolatile(this, VALUE_OFFSET, value); }/** * Perform a compare and set operation on the sequence. * * @param expectedValue The expected current value. * @param newValue The value to update to. * @return true if the operation succeeds, false otherwise. */ public boolean compareAndSet(final long expectedValue, final long newValue) { return UNSAFE.compareAndSwapLong(this, VALUE_OFFSET, expectedValue, newValue); }/** * Atomically increment the sequence by one. * * @return The value after the increment */ public long incrementAndGet() { return addAndGet(1L); }/** * Atomically add the supplied value. * * @param increment The value to add to the sequence. * @return The value after the increment. */ public long addAndGet(final long increment) { long currentValue; long newValue; do { currentValue = https://www.it610.com/article/get(); newValue = currentValue + increment; } while (!compareAndSet(currentValue, newValue)); return newValue; }@Override public String toString() { return Long.toString(get()); } }

这个Sequence其实相当于AtomicLong,最大的区别在于Sequence解决了伪共享问题。另外Sequence#set相当于AtomicLong#lazySet。
致此,使用单生产者发布事件的流程就完成了。
3.3 多生产者发布事件
如果使用的是多生产者,占坑则调用MultiProducerSequencer.next()。
@Override public long next() { return next(1); }/** * @see Sequencer#next(int) */ @Override public long next(int n) { if (n < 1) { throw new IllegalArgumentException("n must be > 0"); }long current; long next; do { current = cursor.get(); // 当前游标值,初始化时是-1 next = current + n; long wrapPoint = next - bufferSize; long cachedGatingSequence = gatingSequenceCache.get(); if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current) { long gatingSequence = Util.getMinimumSequence(gatingSequences, current); if (wrapPoint > gatingSequence) { waitStrategy.signalAllWhenBlocking(); LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy? continue; }gatingSequenceCache.set(gatingSequence); } else if (cursor.compareAndSet(current, next)) { break; } } while (true); return next; }

可以发现,多生产者模式占坑和放置数据的逻辑和单生产者模式区别不大。区别主要是最后调用publish发布坑位的逻辑。
// MultiProducerSequencer.java private static final Unsafe UNSAFE = Util.getUnsafe(); private static final long BASE = UNSAFE.arrayBaseOffset(int[].class); // 获取int[]数组类的第一个元素与该类起始位置的偏移。 private static final long SCALE = UNSAFE.arrayIndexScale(int[].class); // 每个元素需要占用的位置,也有可能返回0。BASE和SCALE都是为了操作availableBufferprivate final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); // availableBuffer tracks the state of each ringbuffer slot // see below for more details on the approach private final int[] availableBuffer; // 初始全是-1 private final int indexMask; private final int indexShift; @Override public void publish(final long sequence) { setAvailable(sequence); waitStrategy.signalAllWhenBlocking(); // 如果使用BlokingWaitStrategy,才会进行通知。否则不会操作 }@Override public void publish(long lo, long hi) { for (long l = lo; l <= hi; l++) { setAvailable(l); } waitStrategy.signalAllWhenBlocking(); } / * availableBuffer设置可用标志 * 主要原因是避免发布者线程之间共享一个序列对象。 * 游标和最小门控序列的差值应该永远不大于RingBuffer的大小(防止生产者太快,覆盖未消费完的数据) */ private void setAvailable(final long sequence) { // calculateIndex 求模%, calculateAvailabilityFlag 求除/ setAvailableBufferValue(calculateIndex(sequence), calculateAvailabilityFlag(sequence)); }private void setAvailableBufferValue(int index, int flag) {// 使用Unsafe更新属性,因为是直接操作内存,所以需要计算元素位置对应的内存位置bufferAddress long bufferAddress = (index * SCALE) + BASE; // availableBuffer是标志可用位置的int数组,初始全为-1。随着sequence不断上升,buffer中固定位置的flag(也就是sequence和bufferSize相除的商)会一直增大。 UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag); }private int calculateAvailabilityFlag(final long sequence) { // 求商 就是 sequence / bufferSize , bufferSize = 2^indexShift。 return (int) (sequence >>> indexShift); }private int calculateIndex(final long sequence) { // 计算位置即求模,直接使用序号 与 掩码(2的平方-1,也就是一个全1的二进制表示),相当于 sequence % (bufferSize), bufferSize = indexMask + 1 return ((int) sequence) & indexMask; }

对比SingleProducerSequencer的publish,MultiProducerSequencer的publish没有设置cursor,而是将内部使用的availableBuffer数组对应位置进行设置。availableBuffer是一个记录RingBuffer槽位状态的数组,通过对序列值sequence取ringBuffer大小的模,获得槽位号,再通过与ringBuffer大小相除,获取序列值所在的圈数,进行设置。这里没有直接使用模运算和触发运算,而使用更高效的位与和右移操作。
其他的操作,MultiProducerSequencer和SingleProducerSequencer类似,就不再赘述了。
4 剖析SingleProducerSequencer设计 上面已经把Disruptor的主要发布事件流程过了一遍,好奇如你,必然觉得意犹未尽。如果你没有,那肯定还是我讲的有问题,不代表Disruptor本身的精彩。
接下来说一说SingleProducerSequencer的设计。从中我们可以看到Disruptor解决伪共享问题的实际代码。
SingleProducerSequencer继承了抽象类SingleProducerSequencerFields,SingleProducerSequencerFields又继承了抽象类SingleProducerSequencerPad。其中SingleProducerSequencerFields是实际放置有效实例变量的位置。
// SingleProducerSequencer.java abstract class SingleProducerSequencerPad extends AbstractSequencer { protected long p1, p2, p3, p4, p5, p6, p7; public SingleProducerSequencerPad(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); } }abstract class SingleProducerSequencerFields extends SingleProducerSequencerPad { public SingleProducerSequencerFields(int bufferSize, WaitStrategy waitStrategy) { super(bufferSize, waitStrategy); }/** * Set to -1 as sequence starting point */ protected long nextValue = https://www.it610.com/article/Sequence.INITIAL_VALUE; // 生产者申请的下一个序列值 protected long cachedValue = Sequence.INITIAL_VALUE; // 缓存上一次比较的门控序列组和next的较小值(最慢消费者序列值) }/** * Coordinator for claiming sequences for access to a data structure while tracking dependent {@link Sequence}s. * Not safe for use from multiple threads as it does not implement any barriers.
* * Note on {@link Sequencer#getCursor()}:With this sequencer the cursor value is updated after the call * to {@link Sequencer#publish(long)} is made. */public final class SingleProducerSequencer extends SingleProducerSequencerFields { protected long p1, p2, p3, p4, p5, p6, p7; // ...省略 }

可以发现,在两个实例变量前后各有7个long型变量。为什么这样做呢?对CPU缓存有了解的同学一定知道的……对,就是为了解决伪共享问题。
CPU在加载内存到缓存行时,一个缓存行中最多只有这两个有效变量,最大限度地避免了因伪共享问题,导致缓存失效,而造成性能损失。
为了更清晰地阐述这个道理,我们尝试看一下SingleProducerSequencer实例的内存布局。
使用HSDB(HotSpot Debugger,可通过 java -cp .; "%JAVA_HOME%/lib/sa-jdi.jar" sun.jvm.hotspot.HSDB 启动)跟踪demo对应的已断点的HotSpot进程,从Object Histogram对象图中筛选出SingleProducerSequencer实例,并通过Inspector工具对SingleProducerSequencer实例进行查看。
本例中,0x00000000828026f8为com.lmax.disruptor.SingleProducerSequencer实例在JVM中的内存起始位置。以此内存地址通过mem命令查看后续的30个内存地址内容。为啥要30个呢?其实20个就够了,可以看到"Object Histogram"中SingleProducerSequencer实例的size是160字节,mem打印一行表示一字长,对应到我本机的64位机器即8字节,所以长度选择大于等于160/8=20就可以看到SingleProducerSequencer实例的内存布局全貌。
解读Disruptor系列--解读源码(2)之生产者
文章图片
hsdb 左侧红框中的地址0x0000000082802750和0x0000000082802758分别对应右侧红框中的nextValue和cachedValue两个实例变量。而在它们前后,各有7个连续的long型整数0。CPU在加载连续内存到缓存时,以缓存行为单位。缓存行通常为64B,通过占位,可以让实际变量独享一个缓存行。从而解决了伪共享问题。
缓存行查看:linux可使用以下命令查看。
cat /sys/devices/system/cpu/cpu0/cache/index0/coherency_line_size
windows可使用CPU-Z查看。
解读Disruptor系列--解读源码(2)之生产者
文章图片
cpu-z 附录:JAVA对象的内存布局相关知识 最后再说点Java对象的内存布局,和本文主题关系不大,可以略过。
HotSpot对象内存布局:
HotSpot中一个对象(非数组)的内存布局大概是这样的:对象头(Mark Word + klass pointer) + 实际数据 + 为了保持8字节对齐的填充。其中对象头的Mark Word和klass pointer长度各为一机器字(machine-word),即32位机器对应32bit(4字节),64位机器对应64bit(8字节)。如64位JVM开启了指针压缩,klass pointer将压缩到4字节。
查看是否开启了指针压缩:
jinfo -flag UseCompressedOops pid 返回-XX:+UseCompressedOops即为开启,或jinfo -flags pid 查看全部选项。
此例中返回了-XX:+UseCompressedOops,表示开启了指针压缩(jdk1.8默认开启)。此时普通类型指针将被压缩为4字节。
下面通过SingleProducerSequencer举一个实际的例子。
SingleProducerSequencer属性
解读Disruptor系列--解读源码(2)之生产者
文章图片
image.png 使用HSDB Inspector查看实例。
解读Disruptor系列--解读源码(2)之生产者
文章图片
image.png 查看对象内存内容:
hsdb> mem 0x00000000828026f8 20
0x00000000828026f8: 0x0000000000000009 // mark word 存储对象运行时数据,如哈希码、GC分代年龄、锁状态标志、线程持有锁、偏向线程ID、偏向时间戳
0x0000000082802700: 0x000000082000de38 // 高4位(82802704~82802707):int bufferSize 8 ,低4位(82802700~8280273):2000de38。由于开启了指针压缩,低4位表示klass pointer,由于使用的JDK1.8,klass metadata保存在Metadataspace中。
0x0000000082802708: 0x828028e082809e98 // 高4位:ref cursor,低4位: ref waitStrategy
0x0000000082802710: 0x000000008284b390 // ref gatingSequences ObjArray
0x0000000082802718: 0x0000000000000000 // 包括当前行的以下7行 SingleProducerSequencerPad中定义的p1~p7
0x0000000082802720: 0x0000000000000000
0x0000000082802728: 0x0000000000000000
0x0000000082802730: 0x0000000000000000
0x0000000082802738: 0x0000000000000000
0x0000000082802740: 0x0000000000000000
0x0000000082802748: 0x0000000000000000
0x0000000082802750: 0x0000000000000001 // nextValue 1
0x0000000082802758: 0xffffffffffffffff // cachedValue -1
0x0000000082802760: 0x0000000000000000 // SingleProducerSequencer定义的p1~p7
0x0000000082802768: 0x0000000000000000
0x0000000082802770: 0x0000000000000000
0x0000000082802778: 0x0000000000000000
0x0000000082802780: 0x0000000000000000
0x0000000082802788: 0x0000000000000000
0x0000000082802790: 0x0000000000000000
【解读Disruptor系列--解读源码(2)之生产者】计算此对象的Shallow Heap size 和 Retained Heap size:
可以发现此对象一共占用20*8=160B内存,此值即Shallow Heap size。也可以手工计算:mark_word[8] + klass_pointer[4] + 2 * ref[4] + ObjArray_ref[8] + 16 * long[8] + int[4] = 160B
而保留内存大小Retained Heap size = Shallow Heap size + (当前对象的引用对象排除GC Root引用对象)的Shallow Heap size。
这里涉及到的引用为:cursor 0x00000000828028e0 ,waitStrategy 0x0000000082809e98 ,gatingSequences 0x000000008284b390。
分别使用revptrs命令查找反向引用,发现只有gatingSequences为此对象唯一引用,故计算gatingSequences(com.lmax.disruptor.Sequence[1] ) Shallow Heap size = 12 + 4 + 1 * 4 + 4 = 24B。这里由于开启了压缩指针,引用指针占用4B,此时占用20B,需要填充4B补满24B。故对象的Retained Heap size为160+24=184。
hsdb> mem 0x000000008284b390 3
0x000000008284b390: 0x0000000000000009
0x000000008284b398: 0x000000012000e08d
0x000000008284b3a0: 0x000000008284abc0
数组对象的Shallow Heap size=引用对象头大小12字节+存储数组长度的空间大小4字节+数组的长度*数组中对象的Shallow Heap size+padding大小
最后还有个问题,我们知道从Java8开始,Metaspace替代之前的PermGen存储元信息。使用Java7的HSDB是可以通过universe命令查看到PermGen信息的,而Java8就查不到Metaspace信息。
Heap Parameters:
ParallelScavengeHeap [ PSYoungGen [
eden = [0x00000000d6300000,0x00000000d66755d0,0x00000000d8300000] ,
from = [0x00000000d8300000,0x00000000d8300000,0x00000000d8800000] ,
to = [0x00000000d8800000,0x00000000d8800000,0x00000000d8d00000] ]
PSOldGen [ [0x0000000082800000,0x00000000829d79c0,0x0000000084a00000] ] ]
Disruptor生产者相关源码就分享到这,后续将对消费者一探究竟。
参考资料:
  1. Java对象内存布局(推荐,写的很棒) http://www.jianshu.com/p/91e398d5d17c
  2. JVM——深入分析对象的内存布局 http://www.cnblogs.com/zhengbin/p/6490953.html
  3. 借HSDB来探索HotSpot VM的运行时数据 http://rednaxelafx.iteye.com/blog/1847971
  4. markOop.hpp https://github.com/dmlloyd/openjdk/blob/jdk8u/jdk8u/hotspot/src/share/vm/oops/markOop.hpp
  5. Shallow and retained sizes http://toolkit.globus.org/toolkit/testing/tools/docs/help/sizes.html
  6. AtomicLong.lazySet是如何工作的? http://ifeve.com/how-does-atomiclong-lazyset-work/
  7. 《深入理解Java虚拟机》2.3.2 对象的内存布局

    推荐阅读