[Netty源码分析]ByteBuf(二)

  1. ByteBufAllocator
ByteBufAllocator是字节缓冲区分配器,根据Netty字节缓冲区的实现不同,分为两种不同的分配器PooledByteBufAllocator和UnpooledByteBufAllocator。他们提供了不同ByteBuf的分配方法
[Netty源码分析]ByteBuf(二)
文章图片
ByteBufAllocator 继承体系.png API分类:
  1. buffer*前缀为分配普通 buffer;
  2. 【[Netty源码分析]ByteBuf(二)】ioBuffer*前缀为分配适用于 I/O 操作的 buffer;
  3. directBuffer*前缀为分配直接内存 buffer;
  4. composite*前缀为分配合成 buffer;
Q:heapBuffer和directBuffer就是分配堆内内存和堆外内存了,为什么还要上面的buffer方法?后面的AbstractByteBufAllocator就解释分配内存时先调用buffer方法,再在这个方法里面地调用heapBufffer和directBuffer
ByteBufAllocator的buffer:
/** * Allocate a {@link ByteBuf}. If it is a direct or heap buffer * depends on the actual implementation. */ ByteBuf buffer(); ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ //AbstractByteBufAllocator @Override public ByteBuf buffer() { if (directByDefault) { return directBuffer(); } return heapBuffer(); }→→→→→ directBuffer具体实现 @Override public ByteBuf directBuffer() { return directBuffer(DEFAULT_INITIAL_CAPACITY, Integer.MAX_VALUE); }↓↓↓↓↓↓↓↓↓↓ @Override public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } validate(initialCapacity, maxCapacity); return newDirectBuffer(initialCapacity, maxCapacity); }↓↓↓↓↓↓↓↓↓↓ /** * 依赖底层实现. */ protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity); →→→→→ heapBuffer具体实现 @Override public ByteBuf heapBuffer() { return heapBuffer(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY); }↓↓↓↓↓↓↓↓↓↓ @Override public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf; } validate(initialCapacity, maxCapacity); return newHeapBuffer(initialCapacity, maxCapacity); }↓↓↓↓↓↓↓↓↓↓↓ /** * 依赖底层实现. */ protected abstract ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity);

  1. PooledByteBufAllocator和UnpooledByteBufAllocator
  1. Heap内存分配
//UnpooledByteBufAllocator的newDirectBuffer和newHeapBuffer @Override protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { ByteBuf buf; //是否unsafe是由jdk底层去实现的,如果能够获取到unsafe对象,就使用unsafe if (PlatformDependent.hasUnsafe()) { buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity); } else { buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }@Override protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) { //Netty5不区分安全与非安全 return new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity); //Netty4的实现如下: //return PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) : new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity); }↓↓↓↓↓↓↓↓↓↓↓↓↓Netty5对UnpooledHeapByteBuf的实现 public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); checkNotNull(alloc, "alloc"); if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); } this.alloc = alloc; setArray(allocateArray(initialCapacity)); setIndex(0, 0); }↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ //AbstractReferenceCountedByteBuf protected AbstractReferenceCountedByteBuf(int maxCapacity) { super(maxCapacity); refCntUpdater.set(this, 1); }↓↓↓↓↓↓↓↓↓↓↓↓↓Netty4对UnpooledUnsafeHeapByteBuf的实现 //UnpooledUnsafeHeapByteBuf UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(alloc, initialCapacity, maxCapacity); }↓↓↓↓↓↓↓↓↓↓↓↓↓ //UnpooledHeapByteBuf protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { this(alloc, new byte[initialCapacity], 0, 0, maxCapacity); }//再跳回原来的UnpooledHeapByteBuf private UnpooledHeapByteBuf( ByteBufAllocator alloc, byte[] initialArray, int readerIndex, int writerIndex, int maxCapacity) { //略 }↓↓↓↓↓↓↓↓↓↓↓↓↓保存字节数据和指针 private void setArray(byte[] initialArray) { array = initialArray; tmpNioBuf = null; }@Override public ByteBuf setIndex(int readerIndex, int writerIndex) { if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity()) { throw new IndexOutOfBoundsException(String.format( "readerIndex: %d, writerIndex: %d (expected: 0 <= readerIndex <= writerIndex <= capacity(%d))", readerIndex, writerIndex, capacity())); } setIndex0(readerIndex, writerIndex); return this; }//安全和非安全看这里,Nett5和Netty4的newHeapBuffer不一样了,以下举例Netty4: //UnpooledHeapByteBuf @Override protected byte _getByte(int index) { return HeapByteBufUtil.getByte(array, index); }↓↓↓↓↓↓↓↓↓ //HeapByteBufUtil static byte getByte(byte[] memory, int index) { return memory[index]; }//UnpooledUnsafeHeapByteBuf @Override protected byte _getByte(int index) { return UnsafeByteBufUtil.getByte(array, index); }↓↓↓↓↓↓↓↓↓↓↓↓ //UnsafeByteBufUtil static byte getByte(long address) { return PlatformDependent.getByte(address); }

  1. direct内存分配
//UnpooledDirectByteBuf public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); if (alloc == null) { throw new NullPointerException("alloc"); } if (initialCapacity < 0) { throw new IllegalArgumentException("initialCapacity: " + initialCapacity); } if (maxCapacity < 0) { throw new IllegalArgumentException("maxCapacity: " + maxCapacity); } if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); }this.alloc = alloc; setByteBuffer(ByteBuffer.allocateDirect(initialCapacity)); }↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ private void setByteBuffer(ByteBuffer buffer) { ByteBuffer oldBuffer = this.buffer; if (oldBuffer != null) { if (doNotFree) { doNotFree = false; } else { freeDirect(oldBuffer); } } this.buffer = buffer; tmpNioBuf = null; capacity = buffer.remaining(); }→→→→→→JDK底层方法ByteBuffer.allocateDirect(initialCapacity) //ByteBuffer public static ByteBuffer allocateDirect(int capacity) { return new DirectByteBuffer(capacity); }// public UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super(maxCapacity); if (alloc == null) { throw new NullPointerException("alloc"); } if (initialCapacity < 0) { throw new IllegalArgumentException("initialCapacity: " + initialCapacity); } if (maxCapacity < 0) { throw new IllegalArgumentException("maxCapacity: " + maxCapacity); } if (initialCapacity > maxCapacity) { throw new IllegalArgumentException(String.format( "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity)); }this.alloc = alloc; //allocateDirect(initialCapacity)同调用JDK的API setByteBuffer(allocateDirect(initialCapacity), false); }↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ //非安全的区别在于通过反射UnSafe算出内存地址保存memoryAddress final void setByteBuffer(ByteBuffer buffer, boolean tryFree) { if (tryFree) { ByteBuffer oldBuffer = this.buffer; if (oldBuffer != null) { if (doNotFree) { doNotFree = false; } else { freeDirect(oldBuffer); } } } this.buffer = buffer; memoryAddress = PlatformDependent.directBufferAddress(buffer); tmpNioBuf = null; capacity = buffer.remaining(); }→→→→→→→→→→→→ //getByte的时候通过地址去找 @Override protected byte _getByte(int index) { return UnsafeByteBufUtil.getByte(addr(index)); }

    推荐阅读