全流程分析Netty设计思路与实践

一万年来谁著史,三千里外欲封侯。这篇文章主要讲述全流程分析Netty设计思路与实践相关的知识,希望能为你提供帮助。
1. 背景通过上一篇文章:深入理解NIO多路复用,了解到内核态通过事件通知+中断检测socket事件,用户态可以使用1个线程处理所有socket请求,时间复杂度为O(1)。看上去该IO方案已经很完美了,但是当连接数越来越多时,且活跃的连接越来越多时,比如10w+,单线程处理可能会很吃力。而Netty可以设置线程池处理socket事件,从而分摊单线程压力;同时,Netty框架封装了通用逻辑,大大方便了业务开发。本文将会分析Netty线程池调用关系,即reactor模型,直接了解Netty最核心的设计思想;同时还会优化下NIO Server代码,实现一个最简单的Netty服务端Demo;最后介绍下Netty常见问题。
2. Netty基本概念Netty通用代码:
创建两个线程池,分别用于处理ServerSocket事件和Socket事件;并指定ServerSocket和Socket发生事件时执行自定义类ServerHandler中的方法:

全流程分析Netty设计思路与实践

文章图片

Netty业务代码:
ServerHander定义了方法,当服务端接受到了客户端发送的数据时,调用channelRead方法处理数据;当socket/serverSocket注册到selector中时,调用channelRegistered:
全流程分析Netty设计思路与实践

文章图片

上述代码中,netty架构图如下所示:
全流程分析Netty设计思路与实践

文章图片

从Netty架构图中可以看到NioEventLoopGroup和pipeline是最重要的概念,后面将会从Netty工作流程详细分析这两个概念的实现思想。
3. Netty工作流程 3.1 创建bossGroup和workerGroup对象如下,bossGroup对应NioEventLoopGroup创建1个NioEventLoop,workerGroup创建10个NioEventLoop。每个NioEventLoop内部包含一个新的多路复用器Selector和线程,bossGroup的selector用于注册serverSocketChannel,workerGroup的selector用于注册socketChannel。在线程中则是处理selector注册的socket上发生的事件。
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(10);

NioEventLoopGroup从子类到父类的初始化顺序为:NioEventLoopGroup -> MultithreadEventLoopGroup -> MultithreadEventExecutorGroup。
3.1.1 创建SelectorProvider
SelectorProvider是Selector多路复用的工厂类,用于创建Selector的实现类。NioEventLoopGroup初始化时,创建了SelectorProvider对象:
public NioEventLoopGroup(int nThreads, Executor executor) this(nThreads, executor, SelectorProvider.provider());

SelectorProvider类通过rt.jar包中的sun.nio.ch.DefaultSelectorProvider类调用create方法,创建SelectorProvider实现:
public abstract class SelectorProvider public static SelectorProvider provider() synchronized (lock) if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction< SelectorProvider> () public SelectorProvider run() if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; );

不同操作系统的jdk包中rt.jar包中DefaultSelectorProvider实现不同,例如mac os的create方法返回KQueueSelectorProvider对象:
public class DefaultSelectorProvider private DefaultSelectorProvider() public static SelectorProvider create() return new KQueueSelectorProvider();

linux操作系统rt.jar包的create方法返回EPollSelectorProvider对象:
public class DefaultSelectorProvider private DefaultSelectorProvider() public static SelectorProvider create() String var0 = (String)AccessController.doPrivileged(new GetPropertyAction("os.name")); if (var0.equals("SunOS")) return createProvider("sun.nio.ch.DevPollSelectorProvider"); else return (SelectorProvider)(var0.equals("Linux") ? createProvider("sun.nio.ch.EPollSelectorProvider") : new PollSelectorProvider());

EPollSelectorProvider可以通过openSelector方法创建EPollSelectorImpl对象:
public class EPollSelectorProvider extends SelectorProviderImpl public EPollSelectorProvider() public AbstractSelector openSelector() throws IOException return new EPollSelectorImpl(this);

EPollSelectorImpl最底层封装了socket系统调用epoll_create、epoll_ctl,完成多路复用功能。
3.1.2 创建线程池
有了SelectorProvider,就可以创建线程执行器Executor了。线程池中每一个线程的创建动作由DefaultThreadFactory定义。Executor直接从线程池中使用一个线程:
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) //创建线程执行器, if (executor == null) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); //省略//创建线程池 protected ThreadFactory newDefaultThreadFactory() return new DefaultThreadFactory(getClass());

线程池的初始化操作如下:
public class DefaultThreadFactory implements ThreadFactory public DefaultThreadFactory(Class< ?> poolType) this(poolType, false, Thread.NORM_PRIORITY); public DefaultThreadFactory(Class< ?> poolType, boolean daemon, int priority) this(toPoolName(poolType), daemon, priority); public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) ObjectUtil.checkNotNull(poolName, "poolName"); if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) throw new IllegalArgumentException( "priority: " + priority + " (expected: Thread.MIN_PRIORITY < = priority < = Thread.MAX_PRIORITY)"); //使用统一的前缀作为线程名 prefix = poolName + - + poolId.incrementAndGet() + -; this.daemon = daemon; this.priority = priority; this.threadGroup = threadGroup; //可以调用newThread直接创建一个线程 public Thread newThread(Runnable r) Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet()); try if (t.isDaemon() != daemon) t.setDaemon(daemon); if (t.getPriority() != priority) t.setPriority(priority); catch (Exception ignored) // Doesnt matter even if failed to set.return t;

定义了线程名前缀:
全流程分析Netty设计思路与实践

文章图片

后续创建线程时,使用线程名做前缀:
全流程分析Netty设计思路与实践

文章图片

ThreadPerTaskExecutor调用execute时,直接从线程池中创建一个新线程:
public final class ThreadPerTaskExecutor implements Executor private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory"); @Override public void execute(Runnable command) threadFactory.newThread(command).start();

3.1.3 封装线程池和Selector
通过创建SelectorProvider和Executor两个重要依赖后,就可以构造NioEventLoop了:
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) //创建线程池 if (executor == null) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); children = new EventExecutor[nThreads]; //创建NioEventLoop,bossGroup指定1个NioEventLoop,workerGroup指定10个NioEventLoop for (int i = 0; i < nThreads; i ++) boolean success = false; try //创建NioEventLoop children[i] = newChild(executor, args); success = true; catch (Exception e) // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); finally //省略chooser = chooserFactory.newChooser(children); //省略//创建NioEventLoop的方法由NioEventLoopGroup类实现 protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;

NioEventLoopGroup实现了newChild方法,创建NioEventLoop对象:
public class NioEventLoopGroup extends MultithreadEventLoopGroup protected EventLoop newChild(Executor executor, Object... args) throws Exception EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null; return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);

NioEventLoop中,通过openSelector()方法创建selector,也就是EPollSelectorImpl对象。
public final class NioEventLoop extends SingleThreadEventLoop NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler); this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy"); final SelectorTuple selectorTuple = openSelector(); this.selector = selectorTuple.selector; this.unwrappedSelector = selectorTuple.unwrappedSelector; private SelectorTuple openSelector() final Selector unwrappedSelector; try //创建EPollSelectorImpl对象 unwrappedSelector = provider.openSelector(); catch (IOException e) throw new ChannelException("failed to open a new selector", e); //省略 return new SelectorTuple(unwrappedSelector);

3.2 NioEventLoopGroup总结NioEventLoopGroup包含多个NioEventLoop。每个NioEventLoop内部包含一个新的多路复用器Selector和线程,bossGroup的selector用于注册serverSocketChannel,workerGroup的selector用于注册socketChannel。每个NioEventLoop中,都包含一个Selector以及一个线程,线程暂时用ThreadPerTaskExecutor表示,执行ThreadPerTaskExecutor#executor就会创建NioEventLoop专属的线程。
3.3 创建启动类ServerBootstrap对象ServerBootstrap是启动类,将NioEventLoopGroup等参数传递到ServerBootstrap中,ServerBootstrap负责启动netty服务端。
3.3.1 指定SeverSocketChannel的实现类
指定NioserverSocketChannel作为netty的SeverSocketChannel实现类:
serverBootstrap.channel(NioServerSocketChannel.class);

NioServerSocketChannel的构造函数通过EPollSelectorProvider创建ServerSocketChannel对象
public class NioServerSocketChannel extends AbstractNioMessageChannel implements io.netty.channel.socket.ServerSocketChannel //DEFAULT_SELECTOR_PROVIDER就是EPollSelectorProvider对象 private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); public NioServerSocketChannel() this(newSocket(DEFAULT_SELECTOR_PROVIDER)); private static ServerSocketChannel newSocket(SelectorProvider provider) try //通过EPollSelectorProvider的父类SelectorProviderImpl的openServerSocketChannel()方法创建ServerSocketChannel对象。 return provider.openServerSocketChannel(); catch (IOException e) throw new ChannelException( "Failed to open a server socket.", e); public NioServerSocketChannel(ServerSocketChannel channel) super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket());

NioServerSocketChannel通过父类的AbstractNioChannel构造方法设置ServerSocketChannel为非阻塞:
public abstract class AbstractNioChannel extends AbstractChannel protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try ch.configureBlocking(false); catch (IOException e) try ch.close(); catch (IOException e2) logger.warn( "Failed to close a partially initialized socket.", e2); throw new ChannelException("Failed to enter non-blocking mode.", e);

NioServerSocketChannel的父类AbstractChannel会为ServerSocketChannel创建对应的Unsafe和Pipeline,这个后面再展开:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel protected AbstractChannel(Channel parent) this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); protected abstract AbstractUnsafe newUnsafe(); protected DefaultChannelPipeline newChannelPipeline() return new DefaultChannelPipeline(this);

3.3.2 配置handler
handler表示socket发生事件时,应该执行的操作。
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128) .handler(new ChannelInitializer< ServerSocketChannel> () @Override protected void initChannel(ServerSocketChannel ch) throws Exception ch.pipeline().addLast(new ServerHandler()); ) .childHandler(new ChannelInitializer< SocketChannel> () @Override protected void initChannel(SocketChannel ch) throws Exception ch.pipeline().addLast(new ServerHandler()); );

ServerBootstrap的父类AbstractBootstrap保存ServerSocketChannel对应的handler:
public abstract class AbstractBootstrap< B extends AbstractBootstrap< B, C> , C extends Channel> implements Cloneable public B handler(ChannelHandler handler) this.handler = ObjectUtil.checkNotNull(handler, "handler"); return self();

ServerBootstrap保存SocketChannel对应的childHander:
public class ServerBootstrap extends AbstractBootstrap< ServerBootstrap, ServerChannel> public ServerBootstrap childHandler(ChannelHandler childHandler) this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler"); return this;

3.4 netty服务端启动通过ServerBootstrap#bind方法启动netty服务端:
ChannelFuture future = serverBootstrap.bind(8080).sync();

3.4.1 创建ServerSocketChannel
调用ServerBootstrap的父类AbstractBootstrap的doBind方法,通过AbstractBootstrap#initAndRegister开始创建ServerSocketChannel:
public abstract class AbstractBootstrap< B extends AbstractBootstrap< B, C> , C extends Channel> implements Cloneable private ChannelFuture doBind(final SocketAddress localAddress) final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) return regFuture; //省略//创建ServerSocketChannel final ChannelFuture initAndRegister() Channel channel = null; try channel = channelFactory.newChannel(); init(channel); catch (Throwable t) //省略//省略

从上面的AbstractBootstrap#initAndRegister可以看到channelFactory#newChannel方法,它就调用了NioServerSocketChannel的构造函数,而NioServerSocketChannel构造函数里面就创建了ServerSocketChannel,并设置了非阻塞。
3.4.2 初始化pipeline
在创建完NioServerSocketChannel后,通过init方法,将主程序中定义的的Handler放到NioServerSocketChannel的pipeline中:
public class ServerBootstrap extends AbstractBootstrap< ServerBootstrap, ServerChannel> void init(Channel channel) setChannelOptions(channel, newOptionsArray(), logger); setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY)); ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry< ChannelOption< ?> , Object> [] currentChildOptions; synchronized (childOptions) currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY); final Entry< AttributeKey< ?> , Object> [] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY); p.addLast(new ChannelInitializer< Channel> () @Override public void initChannel(final Channel ch) final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) pipeline.addLast(handler); ch.eventLoop().execute(new Runnable() @Override public void run() pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); ); );

3.4.3 DefaultChannelPipeline插入元素
pipeline基于设计模式中的责任链模式。责任链模式为请求创建了一个处理对象的链。发起请求和具体处理请求的过程进行解耦:职责链上的处理者(Handler)负责处理请求,客户只需要将请求发送到职责链上即可,无须关心请求的处理细节和请求的传递。
当用户发起请求时,服务端逐步调用Inbound Handler,响应用户请求时,服务端逐步调用Outbound Handler。如下所示:
全流程分析Netty设计思路与实践

文章图片

在创建ServerSocketChannel时,创建了NioEventLoop对应的DefaultChannelPipeline对象,该pipeline专属于ServerSocketChannel。
如下可以看到,DefaultChannelPipeline就是一个链表结构,每次addLast方法插入一个handler,就将handler封装成DefaultChannelHandlerContext,加入到链表结尾:
public class DefaultChannelPipeline implements ChannelPipeline final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) final AbstractChannelHandlerContext newCtx; synchronized (this) checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventLoop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) newCtx.setAddPending(); callHandlerCallbackLater(newCtx, true); return this; //执行callHandlerAdded0方法 EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) callHandlerAddedInEventLoop(newCtx, executor); return this; callHandlerAdded0(newCtx); return this; private void addLast0(AbstractChannelHandlerContext newCtx) AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx;

关于pipeline中handler调用链中的调用方式,后面再展开。
3.4.4 ServerSocketChannel的pipeline添加handler
在3.4.2节中,init方法会增加一个handler,通过addLast添加到ServerSocketChannel的pipeline中:
void init(Channel channel) //省略 p.addLast(new ChannelInitializer< Channel> () @Override public void initChannel(final Channel ch) final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) pipeline.addLast(handler); ch.eventLoop().execute(new Runnable() @Override public void run() pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); ); );

上面这个handler是netty自定义的handler,它重写了ChannelInitializer的initChannel方法,当ServerSocketChannel发生了初始化事件时,会调用ChannelInitializer的initChannel方法,它负责将用户自定义的ServerHandler加入pipeline中。随后将netty自定义的ServerBootstrapAcceptor类放到pipeline中,而ServerBootstrapAcceptor负责将ServerSocketChannel创建的SocketChannel注册到Selector中:
public class ServerBootstrap extends AbstractBootstrap< ServerBootstrap, ServerChannel> private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter public void channelRead(ChannelHandlerContext ctx, Object msg) final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs); try childGroup.register(child).addListener(new ChannelFutureListener() @Override public void operationComplete(ChannelFuture future) throws Exception if (!future.isSuccess()) forceClose(child, future.cause()); ); catch (Throwable t) forceClose(child, t);

3.4.5 准备将ServerSocketChannel注册到selector中
public abstract class AbstractBootstrap< B extends AbstractBootstrap< B, C> , C extends Channel> implements Cloneable final ChannelFuture initAndRegister() Channel channel = null; try //创建socketServerChannel channel = channelFactory.newChannel(); init(channel); catch (Throwable t) if (channel != null) // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); //注册serverSocketChannel到selector中 ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) if (channel.isRegistered()) channel.close(); else channel.unsafe().closeForcibly(); return regFuture;

上述config().group()返回bossGroup,调用NioEventLoopGroup的父类MultithreadEventLoopGroup的register方法进行注册:
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup public ChannelFuture register(Channel channel) //注册serversocketchannel return next().register(channel);

next方法表示要选择一个在NioEventLoopGroup中选择一个NioEventLoop对象,NioEventLoop包含Selctor和线程。MultithreadEventExecutorGroup通过DefaultEventExecutorChooserFactory工厂创建chooser对象,该对象用于决定如何选择NioEventLoop执行对应任务。注意,由于bossGroup只有一个NioEventLoop,因此只会挑选唯一的NioEventLoop。而对于workerGroup,chooser才会在10个EventLoopGroup轮询选择。
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup private final EventExecutorChooserFactory.EventExecutorChooser chooser; protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); public EventExecutor next() return chooser.next();

如下所示,当NioEventLoopGroup的线程数为2的次方时,使用PowerOfTwoEventExecutorChooser实现;否则用GenericEventExecutorChooser实现。PowerOfTwoEventExecutorChooser通过位运算计算下一次轮询的NioEventLoop;GenericEventExecutorChooser通过算术运算计算下一次轮询的NioEventLoop。显然PowerOfTwoEventExecutorChooser效率更高:
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory(); private DefaultEventExecutorChooserFactory()@Override public EventExecutorChooser newChooser(EventExecutor[] executors) if (isPowerOfTwo(executors.length)) return new PowerOfTwoEventExecutorChooser(executors); else return new GenericEventExecutorChooser(executors); private static boolean isPowerOfTwo(int val) return (val & -val) == val; private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) this.executors = executors; @Override public EventExecutor next() //位运算 return executors[idx.getAndIncrement() & executors.length - 1]; private static final class GenericEventExecutorChooser implements EventExecutorChooser private final AtomicLong idx = new AtomicLong(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) this.executors = executors; @Override public EventExecutor next() //算术运算 return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];

挑选好要执行的NioEventLoop对象后,调用其父类SingleThreadEventLoop的register方法:
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop public ChannelFuture register(final ChannelPromise promise) ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise;

其中,promise.channel()就是NioServerSocketChannel,调用它的父类AbstractChannel的unsafe方法,返回unsafe成员:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel private final Unsafe unsafe; public Unsafe unsafe() return unsafe;

Unsafe成员是在NioServerSocketChannel初始化时创建的,调用newUnsafe方法创建:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel protected AbstractChannel(Channel parent) this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); //抽象方法,子类实现 protected abstract AbstractUnsafe newUnsafe();

newUnsafe由子类AbstractNioMessageChannel实现的unsafe()方法:
public abstract class AbstractNioMessageChannel extends AbstractNioChannel protected AbstractNioUnsafe newUnsafe() return new NioMessageUnsafe(); //NioMessageUnsafe定义,重写了读方法,后面的ServerSocketChannel的读操作就执行这个方法 private final class NioMessageUnsafe extends AbstractNioUnsafe private final List< Object> readBuf = new ArrayList< Object> (); @Override public void read() assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try try do int localRead = doReadMessages(readBuf); if (localRead == 0) break; if (localRead < 0) closed = true; break; allocHandle.incMessagesRead(localRead); while (allocHandle.continueReading()); catch (Throwable t) exception = t; int size = readBuf.size(); for (int i = 0; i < size; i ++) readPending = false; pipeline.fireChannelRead(readBuf.get(i)); readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); if (closed) inputShutdown = true; if (isOpen()) close(voidPromise()); finally // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending & & !config.isAutoRead()) removeReadOp();

由于NioMessageUnsafe继承了AbstractNioUnsafe方法,因此执行AbstractNioUnsafe父类AbstractChannel中的register方法。register方法确保EventLoop线程启动,如果没有启动,就当场启动。在线程中执行register0方法。register0负责注册及handler添加操作等,后面逐步展开:
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel protected abstract class AbstractUnsafe implements Unsafe public final void register(EventLoop eventLoop, final ChannelPromise promise) ObjectUtil.checkNotNull(eventLoop, "eventLoop"); if (isRegistered()) promise.setFailure(new IllegalStateException("registered to an event loop already")); return; if (!isCompatible(eventLoop)) promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; AbstractChannel.this.eventLoop = eventLoop; //确定eventLoop线程已经启动,如果已经启动,在线程中执行register0方法 if (eventLoop.inEventLoop()) register0(promise); else try //如果eventLoop线程没有启动,先启动线程,再在线程中执行register0方法 eventLoop.execute(new Runnable() @Override public void run() register0(promise); ); catch (Throwable t) logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: ", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); private void register0(ChannelPromise promise) try // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) return; boolean firstRegistration = neverRegistered; //执行注册动作 doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. //添加Handler pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) if (firstRegistration) pipeline.fireChannelActive(); else if (config().isAutoRead()) // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); catch (Throwable t) // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t);

3.4.6 启动NioEventLoop线程
NioEventLoop线程为慢启动,当需要执行task时,才启动线程。为了保证ServerSocketChannel注册到Selector是在子线程中执行的,在调用register时,会判断NioEventLoop是否启动,如下:
if (eventLoop.inEventLoop()) register0(promise); else try //如果eventLoop线程没有启动,先启动线程,再在线程中执行register0方法 eventLoop.execute(new Runnable() @Override public void run() register0(promise); ); catch (Throwable t) logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: ", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t);

NioEventLoop的父类SingleThreadEventExecutor维护了一个线程thread对象,默认情况下线程为null,传入的是main线程,因此初始情况下,inEventLoop返回false:
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor private volatile Thread thread; //输入参数为Thread.currentThread() public boolean inEventLoop(Thread thread) return thread == this.thread;

因此执行else分支。它封装了register0方法成为一个线程对象,传入NioEventLoop#execute方法中,
eventLoop.execute(new Runnable() @Override public void run() register0(promise); );

NioEventLoop的execute由其父类SingleThreadEventExecutor实现。先将注册任务加入任务队列中,然后启动线程:
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor private void execute(Runnable task, boolean immediate) boolean inEventLoop = inEventLoop(); //先将注册任务加入任务队列中 addTask(task); if (!inEventLoop) //启动线程 startThread(); if (isShutdown()) boolean reject = false; try if (removeTask(task)) reject = true; catch (UnsupportedOperationException e) // The task queue does not support removal so the best thing we can do is to just move on and // hope we will be able to pick-up the task before its completely terminated. // In worst case we will log on termination.if (reject) reject(); if (!addTaskWakesUp & & immediate) wakeup(inEventLoop);

通过Executor成员启动子线程,而Executor就是在创建NioEventLoop对象时指定的ThreadPerTaskExecutor,上面分析过,每次ThreadPerTaskExecutor#execute会执行 threadFactory.newThread(command).start(); 命令启动新线程。线程内部如下:
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor private final Executor executor; private void doStartThread() assert thread == null; executor.execute(new Runnable() @Override public void run() //当前线程为子线程,赋值给thread成员,供下次inEventLoop()判断是否启动子线程 thread = Thread.currentThread(); if (interrupted) thread.interrupt(); boolean success = false; updateLastExecutionTime(); try //执行SingleThreadEventExecutor#run方法 SingleThreadEventExecutor.this.run(); success = true; catch (Throwable t) logger.warn("Unexpected exception from an event executor: ", t); finally //省略);

run方法由NioEventLoop实现,负责处理发生的事件。它是一个死循环,有两个重要方法,processSelectedKeys负责处理事件,runAllTasks处理事件:
public final class NioEventLoop extends SingleThreadEventLoop protected void run() int selectCnt = 0; for (; ; ) try int strategy; try strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) curDeadlineNanos = NONE; // nothing on the calendarnextWakeupNanos.set(curDeadlineNanos); try //如果有task执行,就不阻塞select;否则阻塞等待socket事件 if (!hasTasks()) strategy = select(curDeadlineNanos); finally // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); // fall through default:catch (IOException e) // If we receive an IOException here its because the Selector is messed up. Lets rebuild // the selector and retry. https://github.com/netty/netty/issues/8566 rebuildSelector0(); selectCnt = 0; handleLoopException(e); continue; selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; boolean ranTasks; //ioRatio表示处理SelectedKeys和执行tasks的事件占比 //如果ioRatio为100,则执行完所有task if (ioRatio == 100) try if (strategy > 0) processSelectedKeys(); finally // Ensure we always run tasks. ranTasks = runAllTasks(); else if (strategy > 0) //如果ioRatio小于100,则只能花费一半处理SelectedKeys时间,去处理task final long ioStartTime = System.nanoTime(); try processSelectedKeys(); finally // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); else //当ioRatio小于等于0,只执行一个task ranTasks = runAllTasks(0); // This will run the minimum number of tasksif (ranTasks || strategy > 0) if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS & & logger.isDebugEnabled()) logger.debug("Selector.select() returned prematurelytimes in a row for Selector .", selectCnt - 1, selector); selectCnt = 0; else if (unexpectedSelectorWakeup(selectCnt))// Unexpected wakeup (unusual case) selectCnt = 0; catch (CancelledKeyException e) // Harmless exception - log anyway if (logger.isDebugEnabled()) logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector- JDK bug?", selector, e); catch (Throwable t) handleLoopException(t); // Always handle shutdown even if the loop processing threw an exception. try if (isShuttingDown()) closeAll(); if (confirmShutdown()) return; catch (Throwable t) handleLoopException(t);

由于这时ServerSocketChannel还没有注册到Selector中,因此Selector中不可能有事件,这时就直接开始执行register0这个task。
3.4.7 将ServerSocketChannel注册到selector中
上面发现NioEventLoop线程执行的死循环,最开始执行register0方法:
private void register0(ChannelPromise promise) try // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) return; boolean firstRegistration = neverRegistered; //注册 doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. ``safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) if (firstRegistration) pipeline.fireChannelActive(); else if (config().isAutoRead()) // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead();

上述方法中,通过doRegister方法注册,很简单就可以看到这是将ServerSocketChannel注册到了Selector中:
public abstract class AbstractNioChannel extends AbstractChannel protected void doRegister() throws Exception boolean selected = false; for (; ; ) try selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; catch (CancelledKeyException e) if (!selected) // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; else // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e;

3.4.8 执行ServerSocketChannel的pipeline中的initChannel方法
register0执行到pipeline.invokeHandlerAddedIfNeeded(); 最终执行到DefaultChannelPipeline.callHandlerAddedForAllHandlers(),执行pendingHandlerCallbackHead链表中的Handler:
public class DefaultChannelPipeline implements ChannelPipeline private void callHandlerAddedForAllHandlers() final PendingHandlerCallback pendingHandlerCallbackHead; synchronized (this) assert !registered; // This Channel itself was registered. registered = true; pendingHandlerCallbackHead = this.pendingHandlerCallbackHead; // Null out so it can be GCed. this.pendingHandlerCallbackHead = null; // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside // the EventLoop. PendingHandlerCallback task = pendingHandlerCallbackHead; while (task != null) task.execute(); task = task.next;

Handler是在前面代码中放入的。在NioServerSocketChannel初始化时,调用了init方法。它加入了一个netty实现的ChannelInitializer类,内部负责在pipeline中加入自定义的Handler和ServerBootstrapAcceptor这个Handler:
p.addLast(new ChannelInitializer< Channel> () @Override public void initChannel(final Channel ch) final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) pipeline.addLast(handler); ch.eventLoop().execute(new Runnable() @Override public void run() pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); ); );

而在addLast方法中,就通过callHandlerCallbackLater方法将ChannelInitializer这个Handler加入到pendingHandlerCallbackHead中:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) final AbstractChannelHandlerContext newCtx; synchronized (this) checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); addLast0(newCtx); // If the registered is false it means that the channel was not registered on an eventLoop yet. // In this case we add the context to the pipeline and add a task that will call // ChannelHandler.handlerAdded(...) once the channel is registered. if (!registered) newCtx.setAddPending(); //将ChannelInitializer这个Handler加入到pendingHandlerCallbackHead中 callHandlerCallbackLater(newCtx, true); return this; EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) callHandlerAddedInEventLoop(newCtx, executor); return this; callHandlerAdded0(newCtx); return this;

callHandlerCallbackLater方法将ChannelInitializer加入到链表中:
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) assert !registered; PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx); PendingHandlerCallback pending = pendingHandlerCallbackHead; if (pending == null) pendingHandlerCallbackHead = task; else // Find the tail of the linked-list. while (pending.next != null) pending = pending.next; pending.next = task;

最终,执行Handler的initChannel方法,如下:
public abstract class ChannelInitializer< C extends Channel> extends ChannelInboundHandlerAdapter public void handlerAdded(ChannelHandlerContext ctx) throws Exception if (ctx.channel().isRegistered()) // This should always be true with our current DefaultChannelPipeline implementation. // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers // will be added in the expected order. if (initChannel(ctx)) // We are done with init the Channel, removing the initializer now. removeState(ctx);

而initChannel方法则是向pipeline中加入Handler:
final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) //这里的Handler时main方法中自定义的Handler pipeline.addLast(handler); //在当前子线程中,封装一个task,task中负责将一个ServerBootstrapAcceptor放入pipeline中 ch.eventLoop().execute(new Runnable() @Override public void run() pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); );

在当前ChannelInitializer的方法没执行完前,又加入了一个自定义的Handler:
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128) .handler(new ChannelInitializer< ServerSocketChannel> () @Override protected void initChannel(ServerSocketChannel ch) throws Exception ch.pipeline().addLast(new ServerHandler()); )

下一次handlerAdded方法会执行这个initChannel方法,又加入了ServerHandler。
通过pipeline.invokeHandlerAddedIfNeeded(); 调用,依次执行了netty自定义的ChannelInitializer和main线程中自定义的ChannelInitializer。在ChannelInitializer中,放入了负责处理数据的Handler。这些Handler放到了pipeline中。
3.4.9 执行Pipeline中的handler的channelRegistered方法
调用完initChannel方法后,最后执行pipeline.fireChannelRegistered(),用于执行所有Handler中channelRegistered方法,fireChannelRegistered方法最终执行AbstractChannelHandlerContext#invokeChannelRegistered,开始执行pipeline中的handler实现的channelRegistered方法:
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint private void invokeChannelRegistered() if (invokeHandler()) try ((ChannelInboundHandler) handler()).channelRegistered(this); catch (Throwable t) invokeExceptionCaught(t); else fireChannelRegistered();

由于pipeline中加入了自定义的NettyHandler类。该类实现了channelRegistered方法。最终必定会执行该方法:
public class ServerHandler extends ChannelInboundHandlerAdapter @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception System.out.println("channelRegistered"); super.channelRegistered(ctx);

执行到super.channelRegistered(ctx),会尝试找下一个Handler执行。如下,继续执行invokeChannelRegistered方法:
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint public ChannelHandlerContext fireChannelRegistered() invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED)); return this;

其中,findContextInbound方法非常重要。他负责找下一个Handler:
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint private AbstractChannelHandlerContext findContextInbound(int mask) AbstractChannelHandlerContext ctx = this; EventExecutor currentExecutor = executor(); do ctx = ctx.next; while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND)); return ctx;

最终,通过在每个channelRegistered方法最后,加入super.channelRegistered(ctx)语句,就可以执行完所有责任链中所有Handler的channelRegistered方法。对于其他方法,比如channelActivechannelRead,通过加入类似语句,同样可以执行完责任链中的所有相关方法。
3.4.10 绑定端口
在上面ServerSocketChannel绑定到Selector后,且自定义的Handler和最重要的ServerBootstrapAcceptor这个Handler加入pipeline后,就可以将ServerSocketChannel绑定端口,提供服务了:
public abstract class AbstractBootstrap< B extends AbstractBootstrap< B, C> , C extends Channel> implements Cloneable private ChannelFuture doBind(final SocketAddress localAddress) final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) return regFuture; if (regFuture.isDone()) // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); //绑定端口 doBind0(regFuture, channel, localAddress, promise); return promise;

3.4.11 ServerSocketChannel接受客户端请求
上述register注册任务完成,并且bind完成后,NioEventLoop子线程也在进行死循环。只要task队列没有任务,就阻塞进行select,如果阻塞超时或者有socket事件发生,后面就调用processSelectedKeys方法处理socket事件:
protected void run() int selectCnt = 0; for (; ; ) try int strategy; try strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIOcase SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) curDeadlineNanos = NONE; // nothing on the calendarnextWakeupNanos.set(curDeadlineNanos); try if (!hasTasks()) strategy = select(curDeadlineNanos); finally // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); // fall through default://省略selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; boolean ranTasks; if (ioRatio == 100) try if (strategy > 0) //处理socket事件 processSelectedKeys(); finally // Ensure we always run tasks. ranTasks = runAllTasks(); else if (strategy > 0) final long ioStartTime = System.nanoTime(); try processSelectedKeys(); finally // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); else ranTasks = runAllTasks(0); // This will run the minimum number of tasks//省略

【全流程分析Netty设计思路与实践】NioEventLoop类中的select()方法就是通过EPollSelectorImpl的select()方法阻塞等待socket事件发生,当然会设置阻塞的超时时间:
public final class NioEventLoop extends SingleThreadEventLoop private S

    推荐阅读