【Netty】二、ByteBuf

一、ByteBuf 1.1 ByteBuf介绍
NIO的ByteBuffer大家比较熟悉,它其实就是一个字节容器,但是使用过ByteBuffer的童鞋都知道,ByteBuffer使用较为复杂,例如写模式完成之后需要手动调用flip()方法切换到读模式,再进行相应的读取。
Netty里的ByteBuf是ByteBuffer的升级版,给开发者提供了更方便的API,并拓展了一些其他特性,接下来就来介绍一下ByteBuf
先看一下ByteBuf相关类的类图,对其有个整体的认识,类图如下
【Netty】二、ByteBuf
文章图片

可以看到ByteBuf实现了ReferenceCounted接口,这是因为ByteBuf使用了引用计数,当ByteBuf的引用数为0,则表示该ByteBuf已不可用,需要显式释放。
而ByteBuf下面的AbstractByteBuf,实现了ByteBuf的一些常用方法,比如isReadable、isWritable、readableBytes、writableBytes等。继续往下看是AbstractReferenceCountedByteBuf,它在AbstractByteBuf的基础上完成了引用计数的相关操作。
而最下面的几个类可以分为3种类型的ByteBuf,分别是PooledByteBuf(池化)、UnpooledByteBuf(非池化)及CompositeByteBuf(复合),其中PooledByteBuf又分为PooledHeapByteBuf(池化堆内存)和PooledDirectByteBuf(池化直接内存),UnpooledByteBuf同理。
ByteBuf主要提供了如下操作

方法 描述
alloc() 返回创建该ByteBuf的ByteBufAllocator分配器
capacity() ByteBuf可存储的最大字节数
hasArray() 如果ByteBuf内部是一个字节数组,返回true
array() 返回ByteBuf内部的字节数组,通常与hasArray()配合使用
readerIndex() 返回ByteBuf当前的读取索引
writerIndex() 返回ByteBuf当前的写入索引
isReadable() 是否有字节可供读取
isWritable() 是否有字节可供写入
readableBytes() 可读取的字节数
writableBytes() 可写入的字节数
getByte(int) 返回给定索引处的字节,读索引保持不变,还提供了与getByte类似方法,如getInt()、getLong、getShort
setByte(int index, int value) 设定给定索引出的字节值,写索引保持不变,还提供了与setByte类似方法,如setInt、setLong、setShort
readByte() 返回当前读取索引readerIndex处的字节,并将readerIndex加1
writeByte() 在当前写入索引writerIndex处写入一个字节值,并将writerIndex加1
可以通过readerIndex()、writerIndex()方法看出,ByteBuf拥有两个索引,一个用于读取,一个用于写入,所以它可以写完之后直接进行读取,不需要像ByteBuffer那样调用flip()方法来切换为读模式,如下图:
【Netty】二、ByteBuf
文章图片

1.2 引用计数
上面提到ByteBuf实现了ReferenceCounted接口,ReferenceCounted接口里主要有refCnt()引用数量、retain()增加引用数量、release()减少引用数量等操作,如下:
// 返回引用数量 int refCnt(); // 引用数量加1 ReferenceCounted retain(); // 引用数量加increment ReferenceCounted retain(int increment); // 引用数量减1 boolean release(); // 引用数量减decrement boolean release(int decrement);

ByteBuf具体引用计数的相关操作是在AbstractReferenceCountedByteBuf中完成的,接下来看下AbstractReferenceCountedByteBuf的源码,部分源码如下:
public int refCnt() {return updater.refCnt(this); } public ByteBuf retain() {return updater.retain(this); } public ByteBuf retain(int increment) {return updater.retain(this, increment); } public boolean release() {return handleRelease(updater.release(this)); } public boolean release(int decrement) {return handleRelease(updater.release(this, decrement)); }

可以看到这些方法内部都是通过updater更新器来实现的,所以来看下这个updater更新器是什么?
private static final long REFCNT_FIELD_OFFSET = ReferenceCountUpdater.getUnsafeOffset(AbstractReferenceCountedByteBuf.class, "refCnt"); private static final AtomicIntegerFieldUpdater AIF_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt"); private static final ReferenceCountUpdater updater = new ReferenceCountUpdater() { @Override protected AtomicIntegerFieldUpdater updater() { return AIF_UPDATER; } @Override protected long unsafeOffset() { return REFCNT_FIELD_OFFSET; } };

如源码所示,updater更新器是ReferenceCountUpdater类型,其内部就是通过update()方法获取到的AtomicIntegerFieldUpdater来原子地更新 引用数量字段,在这里引用数量字段是refCnt。
ReferenceCountUpdate的计数方式与普通计数有些不同
  1. refCnt初始值是2,代表实际引用数量为1
  2. 每增加一个引用,refCnt对应值会加2,而减少一个引用,refCnt对应值减2
  3. 如果释放了最后一个引用,则refCnt对应值会变为1
所以如果refCnt值为1,代表实际引用数量为0,否则实际引用数量 = refCnt/2
1.3 PooledByteBuf 和 UnpooledByteBuf
PooledByteBuf(池化)和UnpooledByteBuf(非池化)是两种类型的ByteBuf,顾名思义,一个是从内存池中分配的ByteBuf,使用完之后归还至池中,而另一个则是从内存中直接分配的ByteBuf,使用完即回收。
UnpooledByteBuf又分为UnpooledHeapByteBuf(非池化堆内存)和UnpooledDirectByteBuf(非池化直接内存)。UnpooledHeapByteBuf和UnpooledDirectByteBuf的实现较为简单,UnpooledHeapByteBuf内部是使用字节数组来实现,而UnpooledDirectByteBuf内部是通过NIO的DirectByteBuffer来实现的。
1.3.1 PooledByteBuf 每个线程绑定一个PoolThreadCache,它包含了一个heapArena(堆内存)和一个directArena(直接内存),这两个都是PoolArena类型,每个线程通过PooledByteBufAllocator来分配内存时,会在其绑定的PoolThreadCache里进行分配。PoolArena的结构图如下
【Netty】二、ByteBuf
文章图片

【【Netty】二、ByteBuf】下面是如何分配一个池化的PooledByteBuf,源码细节较多,这里只是大概介绍一下
入口在PooledByteBufAllocator的 newHeapBuffer 和 newDirectBuffer 方法,这里以newHeapBuffer举例,如下
@Override protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { // 获取当前线程的PoolThreadCache PoolThreadCache cache = threadCache.get(); // 获取PoolThreadCache种的heapArena PoolArena heapArena = cache.heapArena; final ByteBuf buf; if (heapArena != null) { // 使用heapArena进行分配ByteBuf buf = heapArena.allocate(cache, initialCapacity, maxCapacity); } else { // 如果没有heapArena,分配非池化的ByteBuf buf = PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) : new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }

调用heapArena的allocate方法进行分配,先通过所需要的内存大小来计算sizeIdx,再判断sizeIdx,分为3种情况 small、normal、huge。如下:
private void allocate(PoolThreadCache cache, PooledByteBuf buf, final int reqCapacity) { // 根据需要分配的内存大小来计算sizeIdx final int sizeIdx = size2SizeIdx(reqCapacity); if (sizeIdx <= smallMaxSizeIdx) {// 如果sizeIdx <= smallMaxSizeIdx,说明是所需内存较小,主要使用PoolSubpage来分配 tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx); } else if (sizeIdx < nSizes) {// 如果sizeIdx <= nSizes,说明所需内存中等,主要使用PoolChunk来分配 tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx); } else { // 否则当sizeIdx大于等于nSizes,说明所需内存较大,则不在池里分配,直接分配内存 int normCapacity = directMemoryCacheAlignment > 0 ? normalizeSize(reqCapacity) : reqCapacity; allocateHuge(buf, normCapacity); } }

1.4 总结
ByteBuf是netty的基础组件,相对于ByteBuffer,ByteBuf不仅使用更为方便,提供了更全面的API,并使用了池化技术来优化内存的分配效率。

    推荐阅读