【Netty】四、事件循环EventLoop与EventLoopGroup

一、简介 在netty中,事件循环EventLoop是一个很重要的组件,用于处理已注册Channel的各种IO事件,而EventLoopGroup对应了一个或多个EventLoop,可以看做EvenLoopGroup就是EventLoop的集合。下面是EventLoop和EventLoopGroup相关类图:
【Netty】四、事件循环EventLoop与EventLoopGroup
文章图片

从上面类图可以看到,netty在jdk原生接口ScheduledExecutorService上衍生了EventExecutorGroup接口,其通过next()方法来获取EventExecutor事件执行器,并在ScheduledExecutorService的基础上添加了优雅关闭、是否正在关闭等操作,如下图
【Netty】四、事件循环EventLoop与EventLoopGroup
文章图片

【Netty】四、事件循环EventLoop与EventLoopGroup
文章图片

而EventLoopGroup继承了EventExecutorGroup接口,重写next()方法并添加注册Channel的操作,如下图
【Netty】四、事件循环EventLoop与EventLoopGroup
文章图片

EventLoop接口本身比较简单,继承于EventExecutor及EventLoopGroup接口,如下
【Netty】四、事件循环EventLoop与EventLoopGroup
文章图片

最常使用的 NioEventLoopGroup 和 NioEventLoop,分别继承于抽象类 MultithreadEventLoopGroup 和 SingleThreadEventLoop,而这两个抽象类本身实现不难,其主要是继承了 MultithreadEventExecutorGroup 和 SingleThreadEventExecutor,所以下面来看下 MultithreadEventExecutorGroup 和 SingleThreadEventExecutor 的主要代码逻辑
二、MultithreadEventExecutorGroup和SingleThreadEventExecutor 2.1 MultithreadEventExecutorGroup
MultithreadEventExecutorGroup表示通过多个EventExecutor来处理所提交的任务
2.1.1 重要属性 有两个较为重要的属性children和chooser,children对应EventExecutor数组,而chooser选取器的作用是从children选取EventExecutor来执行任务。如下

// 对应的EventExecutor数组 private final EventExecutor[] children; // 选取器,作用是从children里选取EventExecutor来执行任务 private final EventExecutorChooserFactory.EventExecutorChooser chooser;

2.1.2 初始化 MultithreadEventExecutorGroup的构造函数会对children和chooser进行初始化,大致步骤如下:
  1. 根据传进来的nThreads线程数来初始化children数组 children = new EventExecutor[nThreads]
  2. 通过newChild()方法来实例化children的每个EventExecutor,newChild()为抽象方法,需要子类(如NioEventLoopGroup)具体实现。如果没有成功的实例化children数组,则逐一优雅关闭EventExecutor
  3. 初始化chooser选取器
  4. 给children中的每个EventExecutor添加terminationListener终止监听器,每有一个EventExecutor终止了,就会将terminatedChildren加1,等到terminatedChildren==children总数时,说明所有的EventExecutor已经全部终止
源码如下:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { checkPositive(nThreads, "nThreads"); if (executor == null) { // 如果传进的executor执行器为空,设置为ThreadPerTaskExecutor执行器,该执行器会单独创建一个线程来处理每个任务 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); }// 根据传进来的nThreads线程数来实例化children children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { // newChild作用是生成具体的EventExecutor,其为抽象方法,需要子类(如NioEventLoopGroup)去具体实现 children[i] = newChild(executor, args); success = true; } catch (Exception e) { throw new IllegalStateException("failed to create a child event loop", e); } finally { // 如果没有成功的实例化children数组,则逐一优雅关闭EventExecutor if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); }// 等待终止所有的EventExecutor for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { Thread.currentThread().interrupt(); break; } } } } }// 初始化chooser选取器 chooser = chooserFactory.newChooser(children); // 给children中的每个EventExecutor添加终止监听器 // 每有一个EventExecutor终止了,就会将terminatedChildren加1 // 等到terminatedChildren==children总数时,说明所有的EventExecutor已经全部终止 final FutureListener terminationListener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } }
2.1.3 提交任务 MultithreadEventExecutorGroup提交任务的大致流程如下图:
【Netty】四、事件循环EventLoop与EventLoopGroup
文章图片

提交任务时,MultithreadEventExecutorGroup是直接使用父类AbstractEventExecutorGroup的submit方法来提交,而该submit方法中是通过调用next()方法来选取到某个EventExecutor,再调用EventExecutor的submit()方法来提交的,如下
@Override public Future submit(Runnable task) { return next().submit(task); }

而next()方法则是通过chooser选取器来选取到某个EventExecutor的,如下
@Override public EventExecutor next() { return chooser.next(); }

2.2 SingleThreadEventExecutor
从上面我们可以得知MultithreadEventExecutorGroup提交任务时,实质上是选取到某个EventExecutor,再由该EventExecutor来进行提交
由于我们常用的NioEventLoop的大多数操作其实是由SingleThreadEventExecutor提供了默认实现(当然NioEventLoop也有其具体的一些操作,后续会详解),所以在深入NioEventLoop之前,有必要先了解一下SingleThreadEventExecutor
2.2.1 重要属性 SingleThreadEventExecutor中有一个存放任务的taskQueue任务队列,还有一个与之绑定的thread线程,还有一些优雅关闭相关属性,如下
// 存放任务的队列 private final Queue taskQueue; // 与该SingleThreadEventExecutor绑定的thread private volatile Thread thread; // 执行器,首次启动时通过该执行器来启动线程,再由该线程来消费taskQueue的任务 private final Executor executor; // 该属性很重要,表示addTask添加任务时,是否自动唤醒线程,如果不能自动唤醒,需要主动调用wakeup方法来唤醒 // 如:DefaultEventExecutor的addTaskWakesUp为true,而NioEventLoop为false private final boolean addTaskWakesUp; // 队列的最大容量 private final int maxPendingTasks; // 优雅关闭的静默时间 private volatile long gracefulShutdownQuietPeriod; // 优雅关闭的超时时间 private volatile long gracefulShutdownTimeout; // 优雅关闭的开始时间 private long gracefulShutdownStartTime;

2.2.2 状态管理 SingleThreadEventExecutor总共有5种状态,如下
  • ST_NOT_STARTED (未启动)
  • ST_STARTED (启动)
  • ST_SHUTTING_DOWN (关闭中)
  • ST_SHUTDOWN (已关闭)
  • ST_TERMINATED (已终止)
初始状态为ST_NOT_STARTED未启动,如下
private static final int ST_NOT_STARTED = 1; private static final int ST_STARTED = 2; private static final int ST_SHUTTING_DOWN = 3; private static final int ST_SHUTDOWN = 4; private static final int ST_TERMINATED = 5; private volatile int state = ST_NOT_STARTED;

状态转换:ST_NOT_STARTED --> ST_STARTED --> ST_SHUTTING_DOWN --> ST_SHUTDOWN --> ST_TERMINATED
2.2.3 提交任务 SingleThreadEventExecutor提交任务的流程图如下:
【Netty】四、事件循环EventLoop与EventLoopGroup
文章图片

SingleThreadEventExecutor在初次提交任务时,会将state设置为已启动,启动工作线程,并将该工作线程与thread属性进行绑定,后续再次提交任务时,只会将任务添加到taskQueue任务队列中。源码如下
private void execute(Runnable task, boolean immediate) { // 判断当前线程与该SingleThreadEventExecutor绑定的线程是否是同一个 boolean inEventLoop = inEventLoop(); // 添加任务至taskQueue任务队列 addTask(task); if (!inEventLoop) { // 如果state为未启动,则将state更新为已启动,启动工作线程,并将工作线程与该SingleThreadEventExecutor绑定 startThread(); // 如果state为已关闭,则拒绝添加任务 if (isShutdown()) { boolean reject = false; try { if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) { } if (reject) { reject(); } } }if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); } }

private void startThread() { // 如果state为未启动 if (state == ST_NOT_STARTED) { // 将state更新为已启动 if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { boolean success = false; try { // 启动工作线程 doStartThread(); success = true; } finally { if (!success) { STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED); } } } } }

doStartThread部分代码如下:
private void doStartThread() { assert thread == null; executor.execute(new Runnable() { @Override public void run() { // 绑定工作线程 thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); }boolean success = false; updateLastExecutionTime(); try { // run方法,从taskQueue中获取任务来执行,由子类去具体实现 SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); }

2.2.4 优雅关闭 上面已经介绍了有3个属性是跟优雅关闭相关的,有gracefulShutdownQuietPeriod静默时间、gracefulShutdownTimeout超时时间、gracefulShutdownStartTime开始时间
gracefulShutdownQuietPeriod:如果当前时间-上一次执行时间 < 静默时间,那么暂时先不关闭,否则进行关闭
gracefulShutdownTimeout:如果当前时间-优雅关闭的开始时间 > 超时时间,那么进行关闭
gracefulShutdownStartTime:优雅关闭的开始时间
接下来看下优雅关闭shutdownGracefully方法,该方法会将state状态设置为关闭中,并直接返回terminationFuture,源码如下:
public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod"); if (timeout < quietPeriod) { throw new IllegalArgumentException( "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))"); } ObjectUtil.checkNotNull(unit, "unit"); // 如果state >= 关闭中,直接返回terminationFuture if (isShuttingDown()) { return terminationFuture(); }boolean inEventLoop = inEventLoop(); boolean wakeup; int oldState; for (; ; ) { // 再次判断,如果state >= 关闭中,直接返回terminationFuture if (isShuttingDown()) { return terminationFuture(); } int newState; wakeup = true; // 是否需要唤醒 oldState = state; if (inEventLoop) { newState = ST_SHUTTING_DOWN; } else { switch (oldState) { case ST_NOT_STARTED: case ST_STARTED:// 如果旧状态为已启动,则设置新状态为关闭中 newState = ST_SHUTTING_DOWN; break; default:// 如果旧状态 >= 已启动,那么将wakeup设置为false,不唤醒工作线程 newState = oldState; wakeup = false; } } // 通过CAS操作来更新状态 if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { break; } } gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod); gracefulShutdownTimeout = unit.toNanos(timeout); if (ensureThreadStarted(oldState)) { return terminationFuture; } // 如果需要唤醒,则将WAKEUP_TASK放到队列中,来唤醒工作线程 if (wakeup) { taskQueue.offer(WAKEUP_TASK); if (!addTaskWakesUp) { wakeup(inEventLoop); } }return terminationFuture(); }

state状态更新为关闭中后,工作线程从taskQueue队列中每次拿到任务后,将会对state状态进行判断,如果是关闭中,会进一步判断是否确认关闭,如果确认关闭,则会跳出run方法,工作线程执行结束,最终该SingleThreadEventExecutor的state状态更新为已终止。这里我们可以通过查看SingleThreadEventExecutor的默认实现DefaultEventExecutor的run方法,源码如下:
protected void run() { for (; ; ) { // 从taskQueue中获取任务,如果是WAKEUP_TASK,则拿到的task为null Runnable task = takeTask(); if (task != null) { // 执行任务 runTask(task); // 更新上一次执行时间 updateLastExecutionTime(); } // 确认是否关闭,如果是,跳出死循环 if (confirmShutdown()) { break; } } }

confirmShutdown的源码如下:
protected boolean confirmShutdown() { // 如果state < 关闭中,直接返回false if (!isShuttingDown()) { return false; }// 取消定时任务 cancelScheduledTasks(); if (gracefulShutdownStartTime == 0) { // 设置优雅关闭的开始时间为当前时间 gracefulShutdownStartTime = ScheduledFutureTask.nanoTime(); } // 如果taskQueue里还有任务,运行所有任务,否则运行关闭钩子 if (runAllTasks() || runShutdownHooks()) { if (isShutdown()) { return true; } // 如果静默期为0,返回true确认关闭 if (gracefulShutdownQuietPeriod == 0) { return true; } taskQueue.offer(WAKEUP_TASK); return false; }final long nanoTime = ScheduledFutureTask.nanoTime(); // 如果state >= 已关闭,返回true确认关闭 // 否则判断当前时间-优雅关闭的开始时间 是否大于 超时时间,如果大于,返回true确认关闭 if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) { return true; } // 如果当前时间 - 上一次任务执行时间 小于等于 静默时间,说明在这一段时间内还有任务执行,则线程休眠100毫秒,返回false暂不关闭 if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) { // 休眠100毫秒后,用于继续唤醒工作线程 taskQueue.offer(WAKEUP_TASK); try { Thread.sleep(100); } catch (InterruptedException e) { } return false; }return true; }

当confirmShutdown返回true跳出run方法后
  1. 工作线程开始收尾工作
  2. 由于此时可能又有一些任务被添加到taskQueue里面,所以需要再次调用confirmShutdown方法
  3. state状态更新为已关闭(这时已经不能再接收新的任务了)
  4. 再次调用confirmShutdown方法(理由同上,如果是这样的话,第一次的confirmShutdown调用是不是没有必要?)。
  5. 最终调用cleanup()钩子方法,并将state状态更新为已终止,设置terminationFuture结果为成功
三、NioEventLoopGroup和NioEventLoop 3.1 NioEventLoopGroup
NioEventLoopGroup可以说是netty中我们最熟悉的类之一,继承于MultithreadEventLoopGroup,而MultithreadEventLoopGroup继承于MultithreadEventExecutorGroup(上面已经分析过该类),如下图
【Netty】四、事件循环EventLoop与EventLoopGroup
文章图片

由于NioEventLoopGroup本身实现比较简单,所以这里我们只看下它的newChild()方法,这个方法之前在介绍MultithreadEventExecutorGroup也有提及,该方法是一个抽象方法,需要子类具体实现,源码如下:
@Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { // SelectorProvider有打开选择器openSelector、打开服务端通道openServerSocketChannel等方法 SelectorProvider selectorProvider = (SelectorProvider) args[0]; // select策略工厂,用于产生SelectStrategy SelectStrategyFactory selectStrategyFactory = (SelectStrategyFactory) args[1]; // 拒绝执行处理器 RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) args[2]; // taskQueue工厂,taskQueue之前已有提及 EventLoopTaskQueueFactory taskQueueFactory = null; // tailTaskQueue工厂 EventLoopTaskQueueFactory tailTaskQueueFactory = null; int argsLength = args.length; if (argsLength > 3) { taskQueueFactory = (EventLoopTaskQueueFactory) args[3]; } if (argsLength > 4) { tailTaskQueueFactory = (EventLoopTaskQueueFactory) args[4]; } // new 一个NioEventLoop实例 return new NioEventLoop(this, executor, selectorProvider, selectStrategyFactory.newSelectStrategy(), rejectedExecutionHandler, taskQueueFactory, tailTaskQueueFactory); }

可以看到,newChild方法的最后就是new一个NioEventLoop实例,所以最后我们需要来看下NioEventLoop中的源码,看看它到底是如何来运作的?
3.2 NioEventLoop
学习过NIO的同学都知道,多个Channel可以注册到一个Selector,这样我们就可以在单线程中通过一个Selector来管理多个Channel,这就是IO多路复用,而NioEventLoop就是IO多路复用的一个具体实现
在最开始的类图我们可以看到 NioEventLoop 和 EpollEventLoop 都继承于SingleThreadEventLoop,由于NioEventLoop更为经常使用,所以这里只介绍NioEventLoop。NioEventLoop继承于SingleThreadEventLoop,而SingleThreadEventLoop继承于SingleThreadEventExecutor,如下图
【Netty】四、事件循环EventLoop与EventLoopGroup
文章图片

SinleThreadEventLoop 在 SingleThreadEventExecutor 的基础上,添加了一个tailTasks任务队列(runAllTasks方法中执行完taskQueue中的任务后,会再执行tailTasks中的任务
3.2.1 构造函数 NioEventLoop的构造函数,接收多个参数,有executor执行器、select策略、拒绝执行处理器、taskQueue工厂、tailTaskQueue工厂,如下
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) { super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory), rejectedExecutionHandler); this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); // select策略 this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); // 创建SelectorTuple, final SelectorTuple selectorTuple = openSelector(); // 包装后的Selector,类型为SelectedSelectionKeySetSelector,包含了SelectedSelectionKeySet this.selector = selectorTuple.selector; // 未包装的Selector this.unwrappedSelector = selectorTuple.unwrappedSelector; }

3.2.2 工作流程 在之前SingleThreadEventExecutor的介绍中,我们已经知道,它会在启动工作线程后,调用run方法,而run方法由子类具体实现,所以NioEventLoop的run方法就是其工作流程,大致的工作如下:
  1. 判断taskQueue和tailTaskQueue是否有任务,如果没有任务,则通过调用Selector.select方法来阻塞或超时阻塞获取IO事件
  2. 如果有任务,调用Selector.selectNow方法非阻塞获取IO事件
  3. 判断是否有IO事件准备好,如果有,先处理IO事件
  4. 再处理taskQueue和tailTaskQueue中的任务
  5. 返回第一步,无限循环
流程图如下:
【Netty】四、事件循环EventLoop与EventLoopGroup
文章图片

结合源码来看,源码如下:
protected void run() { int selectCnt = 0; // 死循环 for (; ; ) { try { int strategy; try { // 计算strategy的值 // 如果hasTasks为true,代表taskQueue或tailQueue里有任务,则直接调用Selector.selectNow()来获取当前已准备好的IO事件数量,并直接返回 // 如果hasTask为false,代表taskQueue或tailQueue里没有任务,则返回SelectStrategy.SELECT(值为-1) strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: case SelectStrategy.SELECT: // 查看scheduledTaskQueue里的定时任务,如果定时任务不为空,将定时任务的deadlineNanos过期时间赋值给curDeadlineNanos long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; } nextWakeupNanos.set(curDeadlineNanos); try { // 如果没有任务,则进一步判断 // 如果curDeadlineNanos为NONE,则调用Selector.select()进行阻塞,直到有IO事件准备好 // 如果curDeadlineNanos不为NONE,则调用selector.select(timeoutMillis)进行超时阻塞 if (!hasTasks()) { strategy = select(curDeadlineNanos); } } finally { nextWakeupNanos.lazySet(AWAKE); } default: } } catch (IOException e) { // 这里是selector.select()的一个bug,即在某种情况下,在没有IO事件准备好时,select()也没有进行阻塞,此时需要重建Selector // 后续会进行详细介绍 rebuildSelector0(); selectCnt = 0; handleLoopException(e); continue; } // select的次数 selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; // ioRatio表示processSelectedKeys方法(处理IO事件)和runAllTasks()方法所用的事件占比 // 如果ioRatio为50,则时间比为1:1,如果为60,则时间比为3:2 final int ioRatio = this.ioRatio; boolean ranTasks; if (ioRatio == 100) {// 如果ioRatio为100 try { if (strategy > 0) { // 处理准备好的IO事件 processSelectedKeys(); } } finally { // 执行taskQueue里的所有任务 ranTasks = runAllTasks(); } } else if (strategy > 0) { final long ioStartTime = System.nanoTime(); try { // 处理准备好的IO事件 processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; // 限时执行,时间到了之后需要先返回,所以可能只能执行taskQueue里的部分任务 ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { // 只执行一个任务 ranTasks = runAllTasks(0); }// 如果至少有一个任务执行成功,runTasks则为true,则重置selectCnt为0 if (ranTasks || strategy > 0) { if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } selectCnt = 0; } else if (unexpectedSelectorWakeup(selectCnt)) {// 如果未预料到的Selector被唤醒,说明可能是bug出现了,重建Selector并重置selectCnt selectCnt = 0; } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } } catch (Error e) { throw e; } catch (Throwable t) { handleLoopException(t); } finally { // 判断是否关闭,代码这里省略 } } }

3.2.3 处理IO事件 上面已经介绍了NioEventLoop的工作流程,那么在判断如果有准备好的IO事件,那么会调用processSelectedKeys来处理这些IO事件,接下来就来看下它的源码
private void processSelectedKeysOptimized() { for (int i = 0; i < selectedKeys.size; ++i) { // 获取对应IO事件的SelectionKey,以及它的附件 final SelectionKey k = selectedKeys.keys[i]; selectedKeys.keys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { // 真正来处理对应的IO事件,将SelectionKey以及对应的NioChannel传入 processSelectedKey(k, (AbstractNioChannel) a); } else { NioTask task = (NioTask) a; processSelectedKey(k, task); }if (needsToSelectAgain) { selectedKeys.reset(i + 1); selectAgain(); i = -1; } } }

这里来看下processSelectedKey是如何真正的来处理每个IO事件的
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // 判断SelectionKey是否有效,如果无效,直接关闭channel通道并返回 if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { return; }if (eventLoop == this) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); } return; }try { // 获取当前SelectionKey的已准备好的事件集 int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0) {//如果有OP_CONNECT连接事件 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); // 设置ops,表示该SelectionKey不再关注连接事件unsafe.finishConnect(); // 下一章节会详细介绍,内部就是开始执行pipeline,去执行各个ChannelHandler对应的方法 }if ((readyOps & SelectionKey.OP_WRITE) != 0) {//如果有OP_WRITE写事件 ch.unsafe().forceFlush(); // 下一章节会详细介绍,内部就是开始执行pipeline,去执行各个ChannelHandler对应的方法 }// 如果有OP_READ读事件或者OP_ACCEPT接收事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); // 下一章节会详细介绍,内部就是开始执行pipeline,去执行各个ChannelHandler对应的方法 } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }

3.2.4 空轮询问题 在执行Selector.select()方法时,正常情况下如果没有准备好的Channel时,线程会被阻塞 。
空轮询是因为Selector.select()没有正确工作,在没有准备好的Channel时,就直接被唤醒,而没有进行阻塞。从而导致run方法一直在死循环,CPU达到了100%
那么在什么时候,代表该bug出现了?
空轮询次数selectCnt大于等于SELECTOR_AUTO_REBUILD_THRESHOLD(默认为512)时,表示该bug出现,则进行重建Selector,源码如下:
private boolean unexpectedSelectorWakeup(int selectCnt) { if (Thread.interrupted()) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } return true; } // 判断如果空轮询次数selectCnt大于SELECTOR_AUTO_REBUILD_THRESHOLD时 if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); // 重建Selector rebuildSelector(); return true; } return false; }

如何重建Selector?
大致步骤:
  1. 重新打开一个Selector
  2. 将旧Selector的所有Channel和对应的附件,都注册到新的Selector上
  3. 关闭旧Selector
private void rebuildSelector0() { final Selector oldSelector = selector; final SelectorTuple newSelectorTuple; if (oldSelector == null) { return; } // 打开一个新的Selector newSelectorTuple = openSelector(); int nChannels = 0; for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); try { if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) { continue; } int interestOps = key.interestOps(); key.cancel(); // 将旧Selector上的channel都注册到新的Selector SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a); if (a instanceof AbstractNioChannel) { // Update SelectionKey ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { // 省略部分代码 } } selector = newSelectorTuple.selector; unwrappedSelector = newSelectorTuple.unwrappedSelector; try { // 关闭旧Selector oldSelector.close(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } } }

四、总结 【【Netty】四、事件循环EventLoop与EventLoopGroup】EventLoop和EventLoopGroup是netty中最重要的组件之一,也是netty能够构建高性能程序的关键所在,了解其工作机制与原理是非常有必要的。

    推荐阅读