netty ByteBuf对象池和内存泄漏检测实现走读

ByteBuf存放在堆外内存中,采用引用计数法的方式进行内存回收,具体的实现在AbstractReferenceCountByteBuf中。

privatestatic final AtomicIntegerFieldUpdater refCntUpdater; static { AtomicIntegerFieldUpdater updater = PlatformDependent.newAtomicIntegerFieldUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); if (updater == null) { updater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); } refCntUpdater = updater; }private volatile int refCnt = 1;

其中refCntUpdater中作为一个静态变量,并没有具体的存放该ByteBuf对应的堆外内存的引用计数,而是封装了通过unsafe来操作成员变量refCnt的操作。
而refCnt,在初始化的之后就为1,默认为该堆外内存原始的ByteBuf就存在一次引用。
UnsafeAtomicIntegerFieldUpdater(Unsafe unsafe, Class tClass, String fieldName) throws NoSuchFieldException { Field field = tClass.getDeclaredField(fieldName); if (!Modifier.isVolatile(field.getModifiers())) { throw new IllegalArgumentException("Must be volatile"); } this.unsafe = unsafe; offset = unsafe.objectFieldOffset(field); }@Override public boolean compareAndSet(T obj, int expect, int update) { return unsafe.compareAndSwapInt(obj, offset, expect, update); }

以上是UnsafeAtomicIntegerFieldUpdater中对于refCnt的操作的实现。
而AbstractReferenceCountByteBuf正是通过UnsafeAtomicIntegerFieldUpdater来进行引用计数的增加和减少,达到避免内存泄露的目的。
@Override public boolean release(int decrement) { return release0(checkPositive(decrement, "decrement")); }private boolean release0(int decrement) { for (; ; ) { int refCnt = this.refCnt; if (refCnt < decrement) { throw new IllegalReferenceCountException(refCnt, -decrement); }if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) { if (refCnt == decrement) { deallocate(); return true; } return false; } } }

在初始化后的AbstractReferenceCountByteBuf的refCnt为1,只有通过调用其release()方法,才会通过UnsafeAtomicIntegerFieldUpdater来对refCnt进行减1,当refCnt与相减的值的差为0时,将会调用deallocate()方法对堆外内存进行回收,deallocate()的实现就会根据具体场景实现。对于非池化且没有采用nocleaner的ByteBuf,当调用到deallocate()的时候,将会把这块堆外内存直接释放,在忘记release的情况下,也会在gc中凭借cleaner被回收掉,而对于池化的ByteBuf(必定采用nocleaner策略),调用了deallocate()的时候,将会把这块内存回收到内存对象池recycler中,而不是直接释放,存在内存泄漏的风险。


而如果需要增加计数,则可以通过retain()方法进行增加计数,这样即使调用了一次release(1),也不会导致维护的这块堆外内存被释放。

关于池化ButeBuf的内存对象池Recycler的实现。
内存对象池的核心主要是两个数据结构,一个是每个线程通过ThreadLocal实现的私有的堆栈Stack,用来存放由该线程申请并又由该线程释放的内存对象,可以有效减少对象的申请和回收的开销。

而另一个对象,则是其他线程回收别的线程对象的专属队列。
private static final FastThreadLocal> DELAYED_RECYCLED = new FastThreadLocal>() { @Override protected Map, WeakOrderQueue> initialValue() { return new WeakHashMap, WeakOrderQueue>(); } };

WeakOrderQueue则是一个链表,其中每个节点都是一个与Stack中堆栈实现方式一样的堆栈节点,当别的线程回收该线程的申请的对象的时候,将不会将该对象直接放到其Stack中,避免回收对象时候遇到的线程竞争,而是直接申请一个当前线程私有的目标线程的链表队列,将回收对象存放到对应的队列中,避免回收过程中的并发竞争。

说起来比较复杂绕口,举个例子。

举个例子,线程A申请了一个对象,当线程B需要将其回收时,将会根据这个对象的申请线程A在线程B的私有空间中寻找对应专属线程A的回收队列,如果没有,则创建,并将该队列作为线程A回收链表队列的首节点,如果有则直接把需要回收的对象存放到该队列中,由于该队列为线程B私有专门针对线程A的,所以不存在资源竞争。
当A本身的Stack不存在对象时,将会依次从其回收链表的首队列 中将对象放回到自己持有的Stack中,达到对象池的目的。


Netty本身对ByteBuf的内存泄漏,进行了检测手段。
在默认的场景下,会以一定的比例对ByteBuf进行采样,对于采样到的ByteBuf增加一个虚引用,以便进行内存泄漏的检测。
private final class DefaultResourceLeak extends PhantomReference implements ResourceLeak
【netty ByteBuf对象池和内存泄漏检测实现走读】核心在于,将会把需要进行内存检测的ByteBuf增加一个虚引用PhantomReference,这样便可以通过定时轮询引用队列的方式,当对象即将被回收而出现引用队列中的时候,判断是否已经将对应的堆外内存释放,即可判断是否出现了内存泄漏。同时leak可以在相应的buffer引用技术发生改变的时候将改动记录在其堆栈中,方便内存对象完整的调用链路展现。

    推荐阅读