[Netty源码分析]ByteBuf(二)
- ByteBufAllocator
文章图片
ByteBufAllocator 继承体系.png API分类:
- buffer*前缀为分配普通 buffer;
- 【[Netty源码分析]ByteBuf(二)】ioBuffer*前缀为分配适用于 I/O 操作的 buffer;
- directBuffer*前缀为分配直接内存 buffer;
- composite*前缀为分配合成 buffer;
ByteBufAllocator的buffer:
/**
* Allocate a {@link ByteBuf}. If it is a direct or heap buffer
* depends on the actual implementation.
*/
ByteBuf buffer();
↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
//AbstractByteBufAllocator
@Override
public ByteBuf buffer() {
if (directByDefault) {
return directBuffer();
}
return heapBuffer();
}→→→→→ directBuffer具体实现
@Override
public ByteBuf directBuffer() {
return directBuffer(DEFAULT_INITIAL_CAPACITY, Integer.MAX_VALUE);
}↓↓↓↓↓↓↓↓↓↓
@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
validate(initialCapacity, maxCapacity);
return newDirectBuffer(initialCapacity, maxCapacity);
}↓↓↓↓↓↓↓↓↓↓
/**
* 依赖底层实现.
*/
protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity);
→→→→→ heapBuffer具体实现
@Override
public ByteBuf heapBuffer() {
return heapBuffer(DEFAULT_INITIAL_CAPACITY, DEFAULT_MAX_CAPACITY);
}↓↓↓↓↓↓↓↓↓↓
@Override
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
validate(initialCapacity, maxCapacity);
return newHeapBuffer(initialCapacity, maxCapacity);
}↓↓↓↓↓↓↓↓↓↓↓
/**
* 依赖底层实现.
*/
protected abstract ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity);
- PooledByteBufAllocator和UnpooledByteBufAllocator
- Heap内存分配
//UnpooledByteBufAllocator的newDirectBuffer和newHeapBuffer
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
ByteBuf buf;
//是否unsafe是由jdk底层去实现的,如果能够获取到unsafe对象,就使用unsafe
if (PlatformDependent.hasUnsafe()) {
buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
//Netty5不区分安全与非安全
return new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
//Netty4的实现如下:
//return PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity)
: new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}↓↓↓↓↓↓↓↓↓↓↓↓↓Netty5对UnpooledHeapByteBuf的实现
public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
checkNotNull(alloc, "alloc");
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
setArray(allocateArray(initialCapacity));
setIndex(0, 0);
}↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
//AbstractReferenceCountedByteBuf
protected AbstractReferenceCountedByteBuf(int maxCapacity) {
super(maxCapacity);
refCntUpdater.set(this, 1);
}↓↓↓↓↓↓↓↓↓↓↓↓↓Netty4对UnpooledUnsafeHeapByteBuf的实现
//UnpooledUnsafeHeapByteBuf
UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}↓↓↓↓↓↓↓↓↓↓↓↓↓
//UnpooledHeapByteBuf
protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
this(alloc, new byte[initialCapacity], 0, 0, maxCapacity);
}//再跳回原来的UnpooledHeapByteBuf
private UnpooledHeapByteBuf(
ByteBufAllocator alloc, byte[] initialArray, int readerIndex, int writerIndex, int maxCapacity) {
//略
}↓↓↓↓↓↓↓↓↓↓↓↓↓保存字节数据和指针
private void setArray(byte[] initialArray) {
array = initialArray;
tmpNioBuf = null;
}@Override
public ByteBuf setIndex(int readerIndex, int writerIndex) {
if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity()) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex: %d, writerIndex: %d (expected: 0 <= readerIndex <= writerIndex <= capacity(%d))",
readerIndex, writerIndex, capacity()));
}
setIndex0(readerIndex, writerIndex);
return this;
}//安全和非安全看这里,Nett5和Netty4的newHeapBuffer不一样了,以下举例Netty4:
//UnpooledHeapByteBuf
@Override
protected byte _getByte(int index) {
return HeapByteBufUtil.getByte(array, index);
}↓↓↓↓↓↓↓↓↓
//HeapByteBufUtil
static byte getByte(byte[] memory, int index) {
return memory[index];
}//UnpooledUnsafeHeapByteBuf
@Override
protected byte _getByte(int index) {
return UnsafeByteBufUtil.getByte(array, index);
}↓↓↓↓↓↓↓↓↓↓↓↓
//UnsafeByteBufUtil
static byte getByte(long address) {
return PlatformDependent.getByte(address);
}
- direct内存分配
//UnpooledDirectByteBuf
public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
if (alloc == null) {
throw new NullPointerException("alloc");
}
if (initialCapacity < 0) {
throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
}
if (maxCapacity < 0) {
throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
}
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}this.alloc = alloc;
setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));
}↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
private void setByteBuffer(ByteBuffer buffer) {
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null) {
if (doNotFree) {
doNotFree = false;
} else {
freeDirect(oldBuffer);
}
}
this.buffer = buffer;
tmpNioBuf = null;
capacity = buffer.remaining();
}→→→→→→JDK底层方法ByteBuffer.allocateDirect(initialCapacity)
//ByteBuffer
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}//
public UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
if (alloc == null) {
throw new NullPointerException("alloc");
}
if (initialCapacity < 0) {
throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
}
if (maxCapacity < 0) {
throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
}
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}this.alloc = alloc;
//allocateDirect(initialCapacity)同调用JDK的API
setByteBuffer(allocateDirect(initialCapacity), false);
}↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
//非安全的区别在于通过反射UnSafe算出内存地址保存memoryAddress
final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
if (tryFree) {
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null) {
if (doNotFree) {
doNotFree = false;
} else {
freeDirect(oldBuffer);
}
}
}
this.buffer = buffer;
memoryAddress = PlatformDependent.directBufferAddress(buffer);
tmpNioBuf = null;
capacity = buffer.remaining();
}→→→→→→→→→→→→
//getByte的时候通过地址去找
@Override
protected byte _getByte(int index) {
return UnsafeByteBufUtil.getByte(addr(index));
}
推荐阅读
- 如何寻找情感问答App的分析切入点
- D13|D13 张贇 Banner分析
- 自媒体形势分析
- 2020-12(完成事项)
- Android事件传递源码分析
- Python数据分析(一)(Matplotlib使用)
- Quartz|Quartz 源码解析(四) —— QuartzScheduler和Listener事件监听
- 泽宇读书会——如何阅读一本书笔记
- Java内存泄漏分析系列之二(jstack生成的Thread|Java内存泄漏分析系列之二:jstack生成的Thread Dump日志结构解析)
- [源码解析]|[源码解析] NVIDIA HugeCTR,GPU版本参数服务器---(3)