Netty使用案例|Netty使用案例 -发送队列积压导致内存泄漏(二)


文章目录

  • 发送队列积压导致内存泄漏
    • 其他可能导致发送消息队列积压的因素
    • 客户端代码改造
    • Netty消息发送工作机制和源码分析
      • WriteAndFlushTask原理和源码分析
    • 写入发送源码分析
      • 发送次数限制
      • 不同消息发送策略
      • 已发送消息内存释放
      • 写半包
    • 消息发送高水位控制

发送队列积压导致内存泄漏 上个文章模拟高并发发送消息导致内存泄漏,分析了没有设置高水位导致的内存泄漏,其实并不是在高并发时候才会导致积压,在别的场景下也会导致积压。
其他可能导致发送消息队列积压的因素 【Netty使用案例|Netty使用案例 -发送队列积压导致内存泄漏(二)】在一些场景下,尽管系统流量不大,但任然可能导致消息积压,可能的场景如下:
  • 网络瓶颈导致积压,当发送速度超过网络链接处理能力,会导致发送队列积压。
  • 当对端读取速度小于乙方发送速度,导致自身TCP发送缓冲区满,频繁发生write 0字节时,待发送消息会在Netty发送队列中排队。
当出现大量排队时,很容易导致Netty的直接内存泄漏,对案例中的代码做改造,模拟直接内存泄漏。
客户端代码改造 客户端每1ms发送一条消息,服务端不读取网络消息会导致客户端的发送队列积压。
客户端代码改造如下:
public class LoadRunnerSleepClientHandler extends ChannelInboundHandlerAdapter {private final ByteBuf firstMessage; Runnable loadRunner; static final int SIZE = Integer.parseInt(System.getProperty("size", "10240")); public LoadRunnerSleepClientHandler() { firstMessage = Unpooled.buffer(SIZE); for (int i = 0; i < firstMessage.capacity(); i ++) { firstMessage.writeByte((byte) i); } }@Override public void channelActive(final ChannelHandlerContext ctx) { loadRunner = new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); } ByteBuf msg = null; while(true) { byte [] body = new byte[SIZE]; msg = Unpooled.wrappedBuffer(body); ctx.writeAndFlush(msg); try { //模拟每1ms发送一条消息 TimeUnit.MILLISECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }; new Thread(loadRunner, "LoadRunner-Thread").start(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ReferenceCountUtil.release(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }public class LoadRunnerClient { static final String HOST = System.getProperty("host", "127.0.0.1"); static final int PORT = Integer.parseInt(System.getProperty("port", "8080")); @SuppressWarnings({"unchecked", "deprecation"}) public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) //设置请求的高水位 .option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 1024 * 1024) .handler(new ChannelInitializer>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LoadRunnerSleepClientHandler()); } }); ChannelFuture f = b.connect(HOST, PORT).sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }

服务端代码
服务端在消息接收处Debug,模拟服务端处理慢,不读网络消息。由于服务端不读取网络消息,会导致客户端的发送队列积压。
public final class LoadRunnerServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8080")); public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new EchoServerHandler()); } }); ChannelFuture f = b.bind(PORT).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 发生异常关闭连接 cause.printStackTrace(); ctx.close(); } }

服务端在chnnelRead中设置断点,模拟阻塞NioEventLoop线程,因为Netty在发送消息时会把堆内存转化成直接内存,通过对内存监控无法直接看到直接内存的分配和使用情况,运行一段时间之后可以在客户端AbstractChannel.AbstractUnsafe的write处设置断点,查看发送队列ChannelOutboundBuffer堆积情况。
Netty使用案例|Netty使用案例 -发送队列积压导致内存泄漏(二)
文章图片

利用netstat -ano等命令可以监控到某个端口的TCP接收和发送队列的积压情况,一旦发现自己的发送队列有大量的积压,说明消息的收发存在瓶颈,需要及时解决,防止因Netty发送队列积压导致内存泄漏,在日常监控中,需要将Netty的链路数,网络读写速速等指标纳入监控系统,发现问题之后需要及时告警。
Netty消息发送工作机制和源码分析 业务调用write方法后,经过ChnnelPipline职责链处理。消息被投递到发送缓存区待发送,调用flush之后会执行真正的发送操作,底层通过调用JavaNIO的SocketChannel进行非阻塞write操作,将消息发送到网络上。
Netty使用案例|Netty使用案例 -发送队列积压导致内存泄漏(二)
文章图片
Netty的消息发送涉及以下考虑,实现比较复杂。
  • 线程切换,
  • 消息队列,
  • 高低水位和半包消息,
WriteAndFlushTask原理和源码分析
为了尽可能提升性能,Netty采用了串行无锁涉及,在I/O线程内部进行串行操作,避免多线程竞争导致性能下降,从表面看,串行化涉及的CPU利用率似乎不高,并发程度不够,但是,通过调整NIO线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程涉及相比“一个队列对应多个工作线程”模型性能更优。
Netty使用案例|Netty使用案例 -发送队列积压导致内存泄漏(二)
文章图片

当用户线程发起write操作时,netty会判断,如果发现不是NioEventLoop(I/O线程),则将发送消息封装成WriteTask任务,放入NioEventLoop的任务队列,由NioEventLoop线程执行,代码如果AbstractChannelHandlerContext类:
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { //这里生成write任务 AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); }else { task = WriteTask.newInstance(next, m, promise); } //依靠NioEventLoop的execute去执行(若是外部线程存储,会唤醒正在阻塞的selector,如果是第一次被调用,则会启动一个本地线程做为nioeventloop的载体) safeExecute(executor, task, promise, m); } }

Netty的NioEventLoop线程内部维护了一个QueuetaskQueue,除了处理网络I/O读写操作,同事还负责执行网络读写相关的Task,代码如(SingleThreadEventExecutor类)
//这里只是添加可执行任务 public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } //构建当前线程的EventLoop boolean inEventLoop = inEventLoop(); //添加任务队列 addTask(task); //如果创建不成功,重新启动 if (!inEventLoop) { startThread(); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }

NioEventLoop遍历taskQueue,执行消息发送任务,类AbstractWriteTask类
@Override public final void run() { try { // 检查是否为null,因为如果通道已经关闭,它可以被设置为null if (ESTIMATE_TASK_SIZE_ON_SUBMIT) { ctx.pipeline.decrementPendingOutboundBytes(size); } write(ctx, msg, promise); } finally { // 设置为null被GC在年轻代回收回收 ctx = null; msg = null; promise = null; handle.recycle(this); } }

在处理
public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); //ChannelOutboundBuffer是保存待发送的数据 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; }int size; try { //如果是ByteBuf尝试把其包装成directByteBuf,如果是FileRegion直接发送其他的都不会发送 msg = filterOutboundMessage(msg); //获得要发送数据的大小 size = pipeline.estimatorHandle().size(msg); //大小小于0把他置位0 if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } //把当前的msg加入outboundBuffer的内部存储链表 outboundBuffer.addMessage(msg, size, promise); }

经过一些系统处理操作,最终会调用ChannelOutboundBuffer的AddMessage方法,将发送消息加入发送链表队列。通过上边的分析,可以得出结论:
  • 多个业务线程并发调用write相关方法是线程安全的,Netty会将发送消息封装成Task,由I/O线程异步执行。
  • 由于单个Channel由其对应的NioEventLoop线程执行,如果并行调用某个Channel的write操作超时对应的NioEventLoop线程的执行能力会导致WriteTask积压。
  • NioEventLoop线程需要处理网络读写操作,以及注册到NioEventLoop上的各种Task,两者相互影响,如果网络读写任务较重,或者注册的Task太多,都会导致对方延迟执行,引发性能问题。
写入发送源码分析 对于ChannelOutboundBuffer可以自行看看。
发送次数限制
当SocketChannel无法一次将所有待发送的ByteBuf/ButeBuffer写入网络时,需要决定是注册WRITE在下一次Selector轮训时继续发送,还是在当前位置循环发送,等到所有消息都发送完成再返回。频繁注册会影响性能,如果TCP的发送缓存区已满,TCP处于KEEP-ALIVE状态,消息无法发送出去,如果不对循环发送次数进行控制,就会长时间处于发送状态,Reactor线程无法计数读取其他消息和排队的task任务,所以netty采取了折中的方式,
如果本次发送的字节数大于0,但是消息尚未发送完,则循环发送,一旦发现write字节数为0,说明TCP缓冲区已满,此时继续发送没有意义,注册SelectKey.OP_WRITE并退出循环,在下一个SelectionKey轮训周期继续发送。
//NioSocketChannel protected void doWrite(ChannelOutboundBuffer in) throws Exception { SocketChannel ch = javaChannel(); int writeSpinCount = config().getWriteSpinCount(); do { if (in.isEmpty()) { // All written so clear OP_WRITE clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; }// Ensure the pending writes are made of ByteBufs only. int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); //获取待发送消息的ByteBuffer数 int nioBufferCnt = in.nioBufferCount(); // Always us nioBuffers() to workaround data-corruption. // See https://github.com/netty/netty/issues/2761 switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. writeSpinCount -= doWrite0(in); break; case 1: { // Only one ByteBuf so use non-gathering write // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. //直接通过nioBuffers[0]获取待发送消息 ByteBuffer buffer = nioBuffers[0]; int attemptedBytes = buffer.remaining(); //完成消息发送 final int localWrittenBytes = ch.write(buffer); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } default: { // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. // We limit the max amount to int above so cast is safe long attemptedBytes = in.nioBufferSize(); final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above. adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } } //这里判断是否写入TCP缓冲区为0 } while (writeSpinCount > 0); incompleteWrite(writeSpinCount < 0); }

不同消息发送策略
消息发送有三种策略
  • 如果待发送消息的ByteBuffer数量等于1,则直接通过nioBuffers[0]获取待发送消息的ByteBuffer,通过JDK的SocketChannel直接完成消息发送,以上代码的case 1;
  • 如果待发送消息的ByteBuffer数量大于1,则调用SocketChannel的批量发送接口,将nioBuffers数组写入TCP发送缓冲区;以上代码default
  • 如果待发送的消息包含的JDK原生ByteBuffer数为0,则调用父类AbstractNioByteChannel的doWrite0方法,将Netty的Bytebuf发送到TCP缓冲区。以上代码case 0;
已发送消息内存释放
如果消息发送成功,Netty会释放已发送消息的内存,发送对象不同,释放策略也不同,
  • 如果发送对象是JDK的ByteBuffer,跟进发送的字节数计算需要被释放的发送对象的个数,代码如ChannelOutBoundBuffer
public void removeBytes(long writtenBytes) { for (; ; ) { Object msg = current(); if (!(msg instanceof ByteBuf)) { assert writtenBytes == 0; break; } final ByteBuf buf = (ByteBuf) msg; //可读开始 final int readerIndex = buf.readerIndex(); //可读字节数 final int readableBytes = buf.writerIndex() - readerIndex; //发送的字节数大于可读字节数,当前ByteBuffer已经完全发送出去, if (readableBytes <= writtenBytes) { if (writtenBytes != 0) { //更新ChannelOutboundBuffer的发送进度信息 progress(readableBytes); //发送减去一条消息的字节数,循环判断后续的消息,直到所有的消息都被删除 writtenBytes -= readableBytes; } remove(); } else { // readableBytes > writtenBytes if (writtenBytes != 0) { buf.readerIndex(readerIndex + (int) writtenBytes); progress(writtenBytes); } break; } } clearNioBuffers(); }

  • 发送对象是Netty的ByteBuf,通过判断当前的ByteBuf的isReadable来获取消息发送结果,如果发送完成,则调用ChannelOutbounfBuffer的remove方法删除并释放ByteBuf,代码AbstractNioByteChannel类
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception { //判断是netty的ByteBuf if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; //判断当前BuyBuf的方法来获得发送结果,这里返回处理 if (!buf.isReadable()) { //删除释放 in.remove(); return 0; }final int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount > 0) { in.progress(localFlushedAmount); if (!buf.isReadable()) { in.remove(); } return 1; } } else if (msg instanceof FileRegion) { FileRegion region = (FileRegion) msg; if (region.transferred() >= region.count()) { in.remove(); return 0; } // long localFlushedAmount = doWriteFileRegion(region); if (localFlushedAmount > 0) { in.progress(localFlushedAmount); if (region.transferred() >= region.count()) { in.remove(); } return 1; } } else { // Should not reach here. throw new Error(); } return WRITE_STATUS_SNDBUF_FULL; }

写半包
如果一次无法将待发送的消息全部写入TCP缓冲区,缓存writeSpinCount次仍未发送完,或者在发送过程中出现了TCP写入的字节数为0,则进入“写半包”模式,目的是在消息发送慢的时候不要死循环发送,这回阻塞NioEventLoop线程,注册SelectionKey.OP_WRITE到对应的Selector,对出循环,在下一次Selector轮询过程中解析执行write操作,上边代码NioSocketChannel.write();
//循环执行write操作 final int localWrittenBytes = ch.write(buffer); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } // 循环riteSpinCount 代码省略 --writeSpinCount;

//AbstractNioChannel.setOpWrite 注册SelectionKey.OP_WRITE相关的操作 protected final void setOpWrite() { final SelectionKey key = selectionKey(); // Check first if the key is still valid as it may be canceled as part of the deregistration // from the EventLoop // See https://github.com/netty/netty/issues/2104 if (!key.isValid()) { return; } final int interestOps = key.interestOps(); if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } }

消息发送高水位控制 为了对发送速度和消息积压数进行控制,Netty提供了高低水位机制,当消息队列中积压的待发送消息总字节数达到了高水位时,修改Channel的状态为不可写。
具体代码在ChannelOutboundBuffer.incrementPendingOutboundBytes
private void incrementPendingOutboundBytes(long size, boolean invokeLater) { if (size == 0) { return; } long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); //这里我们之前的代码设置过 if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { //设置为不可写 setUnwritable(invokeLater); } }

修改Channel状态后,调用ChannelPipeline发送通知事件,业务可以监听该事件及获取链路可写状态,代码ChannelOutboundBuffer.fireChannelWritabilityChanged
private void fireChannelWritabilityChanged(boolean invokeLater) { final ChannelPipeline pipeline = channel.pipeline(); if (invokeLater) { Runnable task = fireChannelWritabilityChangedTask; if (task == null) { fireChannelWritabilityChangedTask = task = new Runnable() { @Override public void run() { //同步可写状态 pipeline.fireChannelWritabilityChanged(); } }; } channel.eventLoop().execute(task); } else { //同步可写状态 pipeline.fireChannelWritabilityChanged(); } }

消息发送完成后,对低水位进行判断,如果当前积压的待发送字节数达到或者低于低水位,则修改Channel状态为可写,并发通知事件,代码ChannelOutboundBuffer.decrementPendingOutboundBytes
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { if (size == 0) { return; }long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); //当前积压的待发送字节数达到或者低于低水位,则修改Channel状态为可写 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { setWritable(invokeLater); } }

利用Netty的高低水位机制,可以防止在发送队列处于高水位时继续发送消息导致积压,甚至发生内存泄漏,在业务中合理利用Netty的高水位机制,可以提升系统的可靠性。

    推荐阅读