Netty使用案例|Netty使用案例 -发送队列积压导致内存泄漏(二)
文章目录
- 发送队列积压导致内存泄漏
- 其他可能导致发送消息队列积压的因素
- 客户端代码改造
- Netty消息发送工作机制和源码分析
- WriteAndFlushTask原理和源码分析
- 写入发送源码分析
- 发送次数限制
- 不同消息发送策略
- 已发送消息内存释放
- 写半包
- 消息发送高水位控制
发送队列积压导致内存泄漏 上个文章模拟高并发发送消息导致内存泄漏,分析了没有设置高水位导致的内存泄漏,其实并不是在高并发时候才会导致积压,在别的场景下也会导致积压。
其他可能导致发送消息队列积压的因素 【Netty使用案例|Netty使用案例 -发送队列积压导致内存泄漏(二)】在一些场景下,尽管系统流量不大,但任然可能导致消息积压,可能的场景如下:
- 网络瓶颈导致积压,当发送速度超过网络链接处理能力,会导致发送队列积压。
- 当对端读取速度小于乙方发送速度,导致自身TCP发送缓冲区满,频繁发生write 0字节时,待发送消息会在Netty发送队列中排队。
客户端代码改造 客户端每1ms发送一条消息,服务端不读取网络消息会导致客户端的发送队列积压。
客户端代码改造如下:
public class LoadRunnerSleepClientHandler extends ChannelInboundHandlerAdapter {private final ByteBuf firstMessage;
Runnable loadRunner;
static final int SIZE = Integer.parseInt(System.getProperty("size", "10240"));
public LoadRunnerSleepClientHandler() {
firstMessage = Unpooled.buffer(SIZE);
for (int i = 0;
i < firstMessage.capacity();
i ++) {
firstMessage.writeByte((byte) i);
}
}@Override
public void channelActive(final ChannelHandlerContext ctx) {
loadRunner = new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
ByteBuf msg = null;
while(true)
{
byte [] body = new byte[SIZE];
msg = Unpooled.wrappedBuffer(body);
ctx.writeAndFlush(msg);
try {
//模拟每1ms发送一条消息
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
new Thread(loadRunner, "LoadRunner-Thread").start();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
{
ReferenceCountUtil.release(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}public class LoadRunnerClient {
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
@SuppressWarnings({"unchecked", "deprecation"})
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
//设置请求的高水位
.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 1024 * 1024)
.handler(new ChannelInitializer>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoadRunnerSleepClientHandler());
}
});
ChannelFuture f = b.connect(HOST, PORT).sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
服务端代码
服务端在消息接收处Debug,模拟服务端处理慢,不读网络消息。由于服务端不读取网络消息,会导致客户端的发送队列积压。
public final class LoadRunnerServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoServerHandler());
}
});
ChannelFuture f = b.bind(PORT).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}}class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 发生异常关闭连接
cause.printStackTrace();
ctx.close();
}
}
服务端在chnnelRead中设置断点,模拟阻塞NioEventLoop线程,因为Netty在发送消息时会把堆内存转化成直接内存,通过对内存监控无法直接看到直接内存的分配和使用情况,运行一段时间之后可以在客户端AbstractChannel.AbstractUnsafe的write处设置断点,查看发送队列ChannelOutboundBuffer堆积情况。
文章图片
利用netstat -ano等命令可以监控到某个端口的TCP接收和发送队列的积压情况,一旦发现自己的发送队列有大量的积压,说明消息的收发存在瓶颈,需要及时解决,防止因Netty发送队列积压导致内存泄漏,在日常监控中,需要将Netty的链路数,网络读写速速等指标纳入监控系统,发现问题之后需要及时告警。
Netty消息发送工作机制和源码分析 业务调用write方法后,经过ChnnelPipline职责链处理。消息被投递到发送缓存区待发送,调用flush之后会执行真正的发送操作,底层通过调用JavaNIO的SocketChannel进行非阻塞write操作,将消息发送到网络上。
文章图片
Netty的消息发送涉及以下考虑,实现比较复杂。
- 线程切换,
- 消息队列,
- 高低水位和半包消息,
为了尽可能提升性能,Netty采用了串行无锁涉及,在I/O线程内部进行串行操作,避免多线程竞争导致性能下降,从表面看,串行化涉及的CPU利用率似乎不高,并发程度不够,但是,通过调整NIO线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程涉及相比“一个队列对应多个工作线程”模型性能更优。
文章图片
当用户线程发起write操作时,netty会判断,如果发现不是NioEventLoop(I/O线程),则将发送消息封装成WriteTask任务,放入NioEventLoop的任务队列,由NioEventLoop线程执行,代码如果AbstractChannelHandlerContext类:
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
//这里生成write任务
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
}else {
task = WriteTask.newInstance(next, m, promise);
}
//依靠NioEventLoop的execute去执行(若是外部线程存储,会唤醒正在阻塞的selector,如果是第一次被调用,则会启动一个本地线程做为nioeventloop的载体)
safeExecute(executor, task, promise, m);
}
}
Netty的NioEventLoop线程内部维护了一个QueuetaskQueue,除了处理网络I/O读写操作,同事还负责执行网络读写相关的Task,代码如(SingleThreadEventExecutor类)
//这里只是添加可执行任务
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
//构建当前线程的EventLoop
boolean inEventLoop = inEventLoop();
//添加任务队列
addTask(task);
//如果创建不成功,重新启动
if (!inEventLoop) {
startThread();
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
NioEventLoop遍历taskQueue,执行消息发送任务,类AbstractWriteTask类
@Override
public final void run() {
try {
// 检查是否为null,因为如果通道已经关闭,它可以被设置为null
if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
ctx.pipeline.decrementPendingOutboundBytes(size);
}
write(ctx, msg, promise);
} finally {
// 设置为null被GC在年轻代回收回收
ctx = null;
msg = null;
promise = null;
handle.recycle(this);
}
}
在处理
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
//ChannelOutboundBuffer是保存待发送的数据
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}int size;
try {
//如果是ByteBuf尝试把其包装成directByteBuf,如果是FileRegion直接发送其他的都不会发送
msg = filterOutboundMessage(msg);
//获得要发送数据的大小
size = pipeline.estimatorHandle().size(msg);
//大小小于0把他置位0
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
//把当前的msg加入outboundBuffer的内部存储链表
outboundBuffer.addMessage(msg, size, promise);
}
经过一些系统处理操作,最终会调用ChannelOutboundBuffer的AddMessage方法,将发送消息加入发送链表队列。通过上边的分析,可以得出结论:
- 多个业务线程并发调用write相关方法是线程安全的,Netty会将发送消息封装成Task,由I/O线程异步执行。
- 由于单个Channel由其对应的NioEventLoop线程执行,如果并行调用某个Channel的write操作超时对应的NioEventLoop线程的执行能力会导致WriteTask积压。
- NioEventLoop线程需要处理网络读写操作,以及注册到NioEventLoop上的各种Task,两者相互影响,如果网络读写任务较重,或者注册的Task太多,都会导致对方延迟执行,引发性能问题。
发送次数限制
当SocketChannel无法一次将所有待发送的ByteBuf/ButeBuffer写入网络时,需要决定是注册WRITE在下一次Selector轮训时继续发送,还是在当前位置循环发送,等到所有消息都发送完成再返回。频繁注册会影响性能,如果TCP的发送缓存区已满,TCP处于KEEP-ALIVE状态,消息无法发送出去,如果不对循环发送次数进行控制,就会长时间处于发送状态,Reactor线程无法计数读取其他消息和排队的task任务,所以netty采取了折中的方式,
如果本次发送的字节数大于0,但是消息尚未发送完,则循环发送,一旦发现write字节数为0,说明TCP缓冲区已满,此时继续发送没有意义,注册SelectKey.OP_WRITE并退出循环,在下一个SelectionKey轮训周期继续发送。
//NioSocketChannel
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {
// All written so clear OP_WRITE
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}// Ensure the pending writes are made of ByteBufs only.
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
//获取待发送消息的ByteBuffer数
int nioBufferCnt = in.nioBufferCount();
// Always us nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes.
writeSpinCount -= doWrite0(in);
break;
case 1: {
// Only one ByteBuf so use non-gathering write
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
//直接通过nioBuffers[0]获取待发送消息
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
//完成消息发送
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
// We limit the max amount to int above so cast is safe
long attemptedBytes = in.nioBufferSize();
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
//这里判断是否写入TCP缓冲区为0
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}
不同消息发送策略
消息发送有三种策略
- 如果待发送消息的ByteBuffer数量等于1,则直接通过nioBuffers[0]获取待发送消息的ByteBuffer,通过JDK的SocketChannel直接完成消息发送,以上代码的case 1;
- 如果待发送消息的ByteBuffer数量大于1,则调用SocketChannel的批量发送接口,将nioBuffers数组写入TCP发送缓冲区;以上代码default
- 如果待发送的消息包含的JDK原生ByteBuffer数为0,则调用父类AbstractNioByteChannel的doWrite0方法,将Netty的Bytebuf发送到TCP缓冲区。以上代码case 0;
如果消息发送成功,Netty会释放已发送消息的内存,发送对象不同,释放策略也不同,
- 如果发送对象是JDK的ByteBuffer,跟进发送的字节数计算需要被释放的发送对象的个数,代码如ChannelOutBoundBuffer
public void removeBytes(long writtenBytes) {
for (;
;
) {
Object msg = current();
if (!(msg instanceof ByteBuf)) {
assert writtenBytes == 0;
break;
}
final ByteBuf buf = (ByteBuf) msg;
//可读开始
final int readerIndex = buf.readerIndex();
//可读字节数
final int readableBytes = buf.writerIndex() - readerIndex;
//发送的字节数大于可读字节数,当前ByteBuffer已经完全发送出去,
if (readableBytes <= writtenBytes) {
if (writtenBytes != 0) {
//更新ChannelOutboundBuffer的发送进度信息
progress(readableBytes);
//发送减去一条消息的字节数,循环判断后续的消息,直到所有的消息都被删除
writtenBytes -= readableBytes;
}
remove();
} else { // readableBytes > writtenBytes
if (writtenBytes != 0) {
buf.readerIndex(readerIndex + (int) writtenBytes);
progress(writtenBytes);
}
break;
}
}
clearNioBuffers();
}
- 发送对象是Netty的ByteBuf,通过判断当前的ByteBuf的isReadable来获取消息发送结果,如果发送完成,则调用ChannelOutbounfBuffer的remove方法删除并释放ByteBuf,代码AbstractNioByteChannel类
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
//判断是netty的ByteBuf
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
//判断当前BuyBuf的方法来获得发送结果,这里返回处理
if (!buf.isReadable()) {
//删除释放
in.remove();
return 0;
}final int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (!buf.isReadable()) {
in.remove();
}
return 1;
}
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
if (region.transferred() >= region.count()) {
in.remove();
return 0;
}
//
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (region.transferred() >= region.count()) {
in.remove();
}
return 1;
}
} else {
// Should not reach here.
throw new Error();
}
return WRITE_STATUS_SNDBUF_FULL;
}
写半包
如果一次无法将待发送的消息全部写入TCP缓冲区,缓存writeSpinCount次仍未发送完,或者在发送过程中出现了TCP写入的字节数为0,则进入“写半包”模式,目的是在消息发送慢的时候不要死循环发送,这回阻塞NioEventLoop线程,注册SelectionKey.OP_WRITE到对应的Selector,对出循环,在下一次Selector轮询过程中解析执行write操作,上边代码NioSocketChannel.write();
//循环执行write操作
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
// 循环riteSpinCount 代码省略
--writeSpinCount;
//AbstractNioChannel.setOpWrite 注册SelectionKey.OP_WRITE相关的操作
protected final void setOpWrite() {
final SelectionKey key = selectionKey();
// Check first if the key is still valid as it may be canceled as part of the deregistration
// from the EventLoop
// See https://github.com/netty/netty/issues/2104
if (!key.isValid()) {
return;
}
final int interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
}
消息发送高水位控制 为了对发送速度和消息积压数进行控制,Netty提供了高低水位机制,当消息队列中积压的待发送消息总字节数达到了高水位时,修改Channel的状态为不可写。
具体代码在ChannelOutboundBuffer.incrementPendingOutboundBytes
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
//这里我们之前的代码设置过
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
//设置为不可写
setUnwritable(invokeLater);
}
}
修改Channel状态后,调用ChannelPipeline发送通知事件,业务可以监听该事件及获取链路可写状态,代码ChannelOutboundBuffer.fireChannelWritabilityChanged
private void fireChannelWritabilityChanged(boolean invokeLater) {
final ChannelPipeline pipeline = channel.pipeline();
if (invokeLater) {
Runnable task = fireChannelWritabilityChangedTask;
if (task == null) {
fireChannelWritabilityChangedTask = task = new Runnable() {
@Override
public void run() {
//同步可写状态
pipeline.fireChannelWritabilityChanged();
}
};
}
channel.eventLoop().execute(task);
} else {
//同步可写状态
pipeline.fireChannelWritabilityChanged();
}
}
消息发送完成后,对低水位进行判断,如果当前积压的待发送字节数达到或者低于低水位,则修改Channel状态为可写,并发通知事件,代码ChannelOutboundBuffer.decrementPendingOutboundBytes
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
//当前积压的待发送字节数达到或者低于低水位,则修改Channel状态为可写
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
setWritable(invokeLater);
}
}
利用Netty的高低水位机制,可以防止在发送队列处于高水位时继续发送消息导致积压,甚至发生内存泄漏,在业务中合理利用Netty的高水位机制,可以提升系统的可靠性。
推荐阅读
- 由浅入深理解AOP
- 【译】20个更有效地使用谷歌搜索的技巧
- mybatisplus如何在xml的连表查询中使用queryWrapper
- MybatisPlus|MybatisPlus LambdaQueryWrapper使用int默认值的坑及解决
- MybatisPlus使用queryWrapper如何实现复杂查询
- iOS中的Block
- Linux下面如何查看tomcat已经使用多少线程
- 使用composer自动加载类文件
- android|android studio中ndk的使用
- 使用协程爬取网页,计算网页数据大小