

ChannelFuture f = b.connect(host, port).sync();

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { //利用反射创建channel类,并且初始化它 final ChannelFuture regFuture = initAndRegister(); ... //真正的链接服务端 return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); } }

final ChannelFuture initAndRegister() { Channel channel = null; channel = channelFactory.newChannel(); //利用反射创建对象 init(channel); //初始化,添加逻辑处理器,设置channel的Option与属性Attribute ... ChannelFuture regFuture = config().group().register(channel); ... return regFuture; }

### @Override public ChannelFuture register(Channel channel) { return next().register(channel); }

### @Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); }

### @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... AbstractChannel.this.eventLoop = eventLoop; ... eventLoop.execute(new Runnable() {//有具体的线程池进行处理,参数传递过来的 @Override public void run() { register0(promise); } }); ... } }

### io.netty.util.concurrent.SingleThreadEventExecutor#execute @Override public void execute(Runnable task) { ... startThread(); //开启线程 addTask(task); //处理请求 ... }### io.netty.util.concurrent.SingleThreadEventExecutor#startThread private void startThread() { ... doStartThread(); ... }private void doStartThread() { assert thread == null; executor.execute(new Runnable() {//重点关注这个executor @Override public void run() { thread = Thread.currentThread(); if (interrupted) { thread.interrupt(); }boolean success = false; updateLastExecutionTime(); try {; //SingleThreadEventExecutor.this是NioEventLoop的事例 success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { for (; ; ) { int oldState = state; if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } }// Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + "before run() implementation terminates."); }try { // Run all remaining tasks and shutdown hooks. for (; ; ) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); } finally { STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) { logger.warn( "An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); }terminationFuture.setSuccess(null); } } } } }); }

### io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...) if (executor == null) { executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); } protected ThreadFactory newDefaultThreadFactory() { return new DefaultThreadFactory(getClass()); }### io.netty.util.concurrent.DefaultThreadFactory @Override public Thread newThread(Runnable r) { Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet()); try { if (t.isDaemon() != daemon) { t.setDaemon(daemon); }if (t.getPriority() != priority) { t.setPriority(priority); } } catch (Exception ignored) { // Doesn't matter even if failed to set. } return t; } private static final class DefaultRunnableDecorator implements Runnable {private final Runnable r; DefaultRunnableDecorator(Runnable r) { this.r = r; }@Override public void run() { try {; } finally { FastThreadLocal.removeAll(); } } }

### io.netty.util.concurrent.ThreadPerTaskExecutor public final class ThreadPerTaskExecutor implements Executor { private final ThreadFactory threadFactory; public ThreadPerTaskExecutor(ThreadFactory threadFactory) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.threadFactory = threadFactory; }@Override public void execute(Runnable command) { threadFactory.newThread(command).start(); } }

@Override protected void run() { for (; ; ) { ... processSelectedKeys(); //处理Nio中的SelectedKeys ... } } ### private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See if (eventLoop != this || eventLoop == null) { return; } // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; }try { int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) {//如何是链接的请求,调用unsafe的finishConnect // remove OP_CONNECT as otherwise will always return without blocking // See int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); }// Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); }// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {; //读取数据 } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }

@Override public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator); allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; break; }allocHandle.incMessagesRead(1); readPending = false; pipeline.fireChannelRead(byteBuf); //触发pipeline的生命周期方法,接收消息,处理消息 byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called or in channelRead(...) method // * The user called or in channelReadComplete(...) method // // See if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }

doConnect(remoteAddress, localAddress)### @Override protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { doBind0(localAddress); }boolean success = false; try { boolean connected = SocketUtils.connect(javaChannel(), remoteAddress); if (!connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; return connected; } finally { if (!success) { doClose(); } } }

【netty源码分析-注册及连接】socket 链接远程服务器,因为是异步链接,所以connected为false,那么就注册了OP_CONNECT事件,这样,当连接事件做好之后,在线程组中会有无限循环,查询准备好的事件,连接事件好了,就会进行处理,同时触发声明周期的方法,进行流程的流转。
