Netty使用案例|Netty使用案例 -发送队列积压导致内存泄漏(二)


  • 发送队列积压导致内存泄漏
    • 其他可能导致发送消息队列积压的因素
    • 客户端代码改造
    • Netty消息发送工作机制和源码分析
      • WriteAndFlushTask原理和源码分析
    • 写入发送源码分析
      • 发送次数限制
      • 不同消息发送策略
      • 已发送消息内存释放
      • 写半包
    • 消息发送高水位控制

发送队列积压导致内存泄漏 上个文章模拟高并发发送消息导致内存泄漏,分析了没有设置高水位导致的内存泄漏,其实并不是在高并发时候才会导致积压,在别的场景下也会导致积压。
其他可能导致发送消息队列积压的因素 【Netty使用案例|Netty使用案例 -发送队列积压导致内存泄漏(二)】在一些场景下,尽管系统流量不大,但任然可能导致消息积压,可能的场景如下:
  • 网络瓶颈导致积压,当发送速度超过网络链接处理能力,会导致发送队列积压。
  • 当对端读取速度小于乙方发送速度,导致自身TCP发送缓冲区满,频繁发生write 0字节时,待发送消息会在Netty发送队列中排队。
客户端代码改造 客户端每1ms发送一条消息,服务端不读取网络消息会导致客户端的发送队列积压。
public class LoadRunnerSleepClientHandler extends ChannelInboundHandlerAdapter {private final ByteBuf firstMessage; Runnable loadRunner; static final int SIZE = Integer.parseInt(System.getProperty("size", "10240")); public LoadRunnerSleepClientHandler() { firstMessage = Unpooled.buffer(SIZE); for (int i = 0; i < firstMessage.capacity(); i ++) { firstMessage.writeByte((byte) i); } }@Override public void channelActive(final ChannelHandlerContext ctx) { loadRunner = new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(30); } catch (InterruptedException e) { e.printStackTrace(); } ByteBuf msg = null; while(true) { byte [] body = new byte[SIZE]; msg = Unpooled.wrappedBuffer(body); ctx.writeAndFlush(msg); try { //模拟每1ms发送一条消息 TimeUnit.MILLISECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }; new Thread(loadRunner, "LoadRunner-Thread").start(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ReferenceCountUtil.release(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }public class LoadRunnerClient { static final String HOST = System.getProperty("host", ""); static final int PORT = Integer.parseInt(System.getProperty("port", "8080")); @SuppressWarnings({"unchecked", "deprecation"}) public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) //设置请求的高水位 .option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 1024 * 1024) .handler(new ChannelInitializer>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new LoadRunnerSleepClientHandler()); } }); ChannelFuture f = b.connect(HOST, PORT).sync();; } finally { group.shutdownGracefully(); } } }

public final class LoadRunnerServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8080")); public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap();, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new EchoServerHandler()); } }); ChannelFuture f = b.bind(PORT).sync();; } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 发生异常关闭连接 cause.printStackTrace(); ctx.close(); } }

利用netstat -ano等命令可以监控到某个端口的TCP接收和发送队列的积压情况,一旦发现自己的发送队列有大量的积压,说明消息的收发存在瓶颈,需要及时解决,防止因Netty发送队列积压导致内存泄漏,在日常监控中,需要将Netty的链路数,网络读写速速等指标纳入监控系统,发现问题之后需要及时告警。
Netty消息发送工作机制和源码分析 业务调用write方法后,经过ChnnelPipline职责链处理。消息被投递到发送缓存区待发送,调用flush之后会执行真正的发送操作,底层通过调用JavaNIO的SocketChannel进行非阻塞write操作,将消息发送到网络上。
  • 线程切换,
  • 消息队列,
  • 高低水位和半包消息,
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { //这里生成write任务 AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); }else { task = WriteTask.newInstance(next, m, promise); } //依靠NioEventLoop的execute去执行(若是外部线程存储,会唤醒正在阻塞的selector,如果是第一次被调用,则会启动一个本地线程做为nioeventloop的载体) safeExecute(executor, task, promise, m); } }

//这里只是添加可执行任务 public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } //构建当前线程的EventLoop boolean inEventLoop = inEventLoop(); //添加任务队列 addTask(task); //如果创建不成功,重新启动 if (!inEventLoop) { startThread(); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }

@Override public final void run() { try { // 检查是否为null,因为如果通道已经关闭,它可以被设置为null if (ESTIMATE_TASK_SIZE_ON_SUBMIT) { ctx.pipeline.decrementPendingOutboundBytes(size); } write(ctx, msg, promise); } finally { // 设置为null被GC在年轻代回收回收 ctx = null; msg = null; promise = null; handle.recycle(this); } }

public final void write(Object msg, ChannelPromise promise) { assertEventLoop(); //ChannelOutboundBuffer是保存待发送的数据 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; }int size; try { //如果是ByteBuf尝试把其包装成directByteBuf,如果是FileRegion直接发送其他的都不会发送 msg = filterOutboundMessage(msg); //获得要发送数据的大小 size = pipeline.estimatorHandle().size(msg); //大小小于0把他置位0 if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } //把当前的msg加入outboundBuffer的内部存储链表 outboundBuffer.addMessage(msg, size, promise); }

  • 多个业务线程并发调用write相关方法是线程安全的,Netty会将发送消息封装成Task,由I/O线程异步执行。
  • 由于单个Channel由其对应的NioEventLoop线程执行,如果并行调用某个Channel的write操作超时对应的NioEventLoop线程的执行能力会导致WriteTask积压。
  • NioEventLoop线程需要处理网络读写操作,以及注册到NioEventLoop上的各种Task,两者相互影响,如果网络读写任务较重,或者注册的Task太多,都会导致对方延迟执行,引发性能问题。
写入发送源码分析 对于ChannelOutboundBuffer可以自行看看。
//NioSocketChannel protected void doWrite(ChannelOutboundBuffer in) throws Exception { SocketChannel ch = javaChannel(); int writeSpinCount = config().getWriteSpinCount(); do { if (in.isEmpty()) { // All written so clear OP_WRITE clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; }// Ensure the pending writes are made of ByteBufs only. int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); //获取待发送消息的ByteBuffer数 int nioBufferCnt = in.nioBufferCount(); // Always us nioBuffers() to workaround data-corruption. // See switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. writeSpinCount -= doWrite0(in); break; case 1: { // Only one ByteBuf so use non-gathering write // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. //直接通过nioBuffers[0]获取待发送消息 ByteBuffer buffer = nioBuffers[0]; int attemptedBytes = buffer.remaining(); //完成消息发送 final int localWrittenBytes = ch.write(buffer); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } default: { // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need // to check if the total size of all the buffers is non-zero. // We limit the max amount to int above so cast is safe long attemptedBytes = in.nioBufferSize(); final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above. adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, maxBytesPerGatheringWrite); in.removeBytes(localWrittenBytes); --writeSpinCount; break; } } //这里判断是否写入TCP缓冲区为0 } while (writeSpinCount > 0); incompleteWrite(writeSpinCount < 0); }

  • 如果待发送消息的ByteBuffer数量等于1,则直接通过nioBuffers[0]获取待发送消息的ByteBuffer,通过JDK的SocketChannel直接完成消息发送,以上代码的case 1;
  • 如果待发送消息的ByteBuffer数量大于1,则调用SocketChannel的批量发送接口,将nioBuffers数组写入TCP发送缓冲区;以上代码default
  • 如果待发送的消息包含的JDK原生ByteBuffer数为0,则调用父类AbstractNioByteChannel的doWrite0方法,将Netty的Bytebuf发送到TCP缓冲区。以上代码case 0;
  • 如果发送对象是JDK的ByteBuffer,跟进发送的字节数计算需要被释放的发送对象的个数,代码如ChannelOutBoundBuffer
public void removeBytes(long writtenBytes) { for (; ; ) { Object msg = current(); if (!(msg instanceof ByteBuf)) { assert writtenBytes == 0; break; } final ByteBuf buf = (ByteBuf) msg; //可读开始 final int readerIndex = buf.readerIndex(); //可读字节数 final int readableBytes = buf.writerIndex() - readerIndex; //发送的字节数大于可读字节数,当前ByteBuffer已经完全发送出去, if (readableBytes <= writtenBytes) { if (writtenBytes != 0) { //更新ChannelOutboundBuffer的发送进度信息 progress(readableBytes); //发送减去一条消息的字节数,循环判断后续的消息,直到所有的消息都被删除 writtenBytes -= readableBytes; } remove(); } else { // readableBytes > writtenBytes if (writtenBytes != 0) { buf.readerIndex(readerIndex + (int) writtenBytes); progress(writtenBytes); } break; } } clearNioBuffers(); }

  • 发送对象是Netty的ByteBuf,通过判断当前的ByteBuf的isReadable来获取消息发送结果,如果发送完成,则调用ChannelOutbounfBuffer的remove方法删除并释放ByteBuf,代码AbstractNioByteChannel类
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception { //判断是netty的ByteBuf if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; //判断当前BuyBuf的方法来获得发送结果,这里返回处理 if (!buf.isReadable()) { //删除释放 in.remove(); return 0; }final int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount > 0) { in.progress(localFlushedAmount); if (!buf.isReadable()) { in.remove(); } return 1; } } else if (msg instanceof FileRegion) { FileRegion region = (FileRegion) msg; if (region.transferred() >= region.count()) { in.remove(); return 0; } // long localFlushedAmount = doWriteFileRegion(region); if (localFlushedAmount > 0) { in.progress(localFlushedAmount); if (region.transferred() >= region.count()) { in.remove(); } return 1; } } else { // Should not reach here. throw new Error(); } return WRITE_STATUS_SNDBUF_FULL; }

//循环执行write操作 final int localWrittenBytes = ch.write(buffer); if (localWrittenBytes <= 0) { incompleteWrite(true); return; } // 循环riteSpinCount 代码省略 --writeSpinCount;

//AbstractNioChannel.setOpWrite 注册SelectionKey.OP_WRITE相关的操作 protected final void setOpWrite() { final SelectionKey key = selectionKey(); // Check first if the key is still valid as it may be canceled as part of the deregistration // from the EventLoop // See if (!key.isValid()) { return; } final int interestOps = key.interestOps(); if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } }

消息发送高水位控制 为了对发送速度和消息积压数进行控制,Netty提供了高低水位机制,当消息队列中积压的待发送消息总字节数达到了高水位时,修改Channel的状态为不可写。
private void incrementPendingOutboundBytes(long size, boolean invokeLater) { if (size == 0) { return; } long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); //这里我们之前的代码设置过 if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { //设置为不可写 setUnwritable(invokeLater); } }

private void fireChannelWritabilityChanged(boolean invokeLater) { final ChannelPipeline pipeline = channel.pipeline(); if (invokeLater) { Runnable task = fireChannelWritabilityChangedTask; if (task == null) { fireChannelWritabilityChangedTask = task = new Runnable() { @Override public void run() { //同步可写状态 pipeline.fireChannelWritabilityChanged(); } }; } channel.eventLoop().execute(task); } else { //同步可写状态 pipeline.fireChannelWritabilityChanged(); } }

private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { if (size == 0) { return; }long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); //当前积压的待发送字节数达到或者低于低水位,则修改Channel状态为可写 if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { setWritable(invokeLater); } }

