Netty学习|Netty学习 - Bootstrap引导

引导 在Netty中,有两种引导,一是Bootstrap,用于引导客户端或者无连接服务器;另一种便是ServerBootstrap,用于引导面向连接的服务器。Bootstrap整个类层次如下图所示,本文将依次分析AbstractBootstrap、Bootstrap和ServerBootstrap。

Netty学习|Netty学习 - Bootstrap引导
文章图片
Bootstrap类层次.png AbstractBootstrap类 AbstractBootstrap类的javadoc说明如下:

AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel. It support method-chaining to provide an easy way to configure the AbstractBootstrap.
When not used in a ServerBootstrap context, the bind() methods are useful for connectionless transports such as datagram (UDP).
上述文字表明:AbstractBootstrap类支持方法的链式调用,当不使用ServerBootstrap时,bind方法可以用于无连接的协议如UDP等,这与日常用法相一致。
成员变量和构造函数
volatile EventLoopGroup group; @SuppressWarnings("deprecation") private volatile ChannelFactory channelFactory; private volatile SocketAddress localAddress; private final Map, Object> options = new LinkedHashMap, Object>(); private final Map, Object> attrs = new LinkedHashMap, Object>(); private volatile ChannelHandler handler; AbstractBootstrap() { // Disallow extending from a different package. }AbstractBootstrap(AbstractBootstrap bootstrap) { group = bootstrap.group; channelFactory = bootstrap.channelFactory; handler = bootstrap.handler; localAddress = bootstrap.localAddress; synchronized (bootstrap.options) { options.putAll(bootstrap.options); } synchronized (bootstrap.attrs) { attrs.putAll(bootstrap.attrs); } }

以上代码有以下几点需要注意:
  • 构造函数中直接访问了另一个实例的私有变量,这种是被允许的,因为访问控制是控制其他类是否能够访问本类的变量或者方法,具体可参阅Controlling Access to Members of a Class;
  • 为什么要有两个synchronized代码块呢?因为putAll方法的javadoc表明如果源map在被put到目的map的过程中被修改,这个过程是未定义的,具体可参阅官方文档。
成员方法 AbstractBootstrap类是一种构建者模式(Builder)
  • group方法设置了成员变量group;
  • channel/channelFactory方法设置了成员变量channelFactory,用来创建Channel。区别在于channelFactory方法直接指定了工厂,channel则是利用类参数创建了ReflectiveChannelFactory实例然后接着调用了channelFactory方法。channelFactory方法的javadoc提到:
    This method is usually only used if channel(Class) is not working for you because of some more complex needs. If your Channel implementation has a no-args constructor, its highly recommend to just use channel(Class) to simplify your code
  • localAddress方法设置了成员变量localAddress,这个方法有几种重载的形式;
  • option方法和attr方法相似,都可以使用null移除键;
  • validate方法验证group和channelFactory均不为null,这个方法会在bind方法中被调用。注意子类可以重写该方法,但必须调用基类的方法,后续会看到Bootstrap和ServerBootstrap都重写了该方法,加入了自己额外的验证。《Netty实战》8.2.2节有以下描述,这正是父类和子类validate方法的作用;
    在引导的过程中,在调用bind()或者connect()方法之前,必须调用以下方法来设置所需的组件:
    group();
    channel()或者channelFactory();
    handler().
    如果不这样做,则将会导致IllegalStateException。对handler()方法的调用尤其重要,因为它需要配置好ChannelPipeline。
  • bind方法比较复杂,下面详细分析一下。
bind方法 bind方法会在内部调用doBind方法,首先会调用initAndRegister方法初始化并注册通道,接下来按注册是否结束分情况讨论,都是交由doBind0方法处理。
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; }if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }

initAndRegister方法
final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); }ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }abstract void init(Channel channel) throws Exception;

initAndRegister方法是一个模板方法
  1. 利用channelFactory新建通道,以前述的ReflectiveChannelFactory为例,其newChannel方法会根据传入的Channel类型调用对应的无参构造函数返回新建的通道;
    public ReflectiveChannelFactory(Class clazz) { if (clazz == null) { throw new NullPointerException("clazz"); } this.clazz = clazz; }@Override public T newChannel() { try { return clazz.getConstructor().newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } }

  2. 调用抽象的init方法初始化新建的通道,子类需要重写该方法;
  3. 将新建的通道注册到与该引导类关联的EventLoopGroup上。
doBind0方法
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered.Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }

doBind0方法使得在Channel绑定的EventLoop上执行具体的通道绑定操作。注意从doBind0被调用的位置可以看到其一定是在注册操作完成之后被调用:
  • doBind方法中if (regFuture.isDone()) 代码块内,这时注册已经完成,不用去管是否成功,因为doBind0内部会判断;
  • doBind方法中的else代码块与上面类似。
Bootstrap类 Bootstrap类继承了AbstractBootstrap类,新增加了remoteAddress和resolver成员变量,与之对应有remoteAddress和resolver成员方法。
成员方法
  • validate方法:前文提到Bootstrap会重写AbstractBootstrap类的该方法,Bootstrap类除了调用基类的方法,还验证了handler不为null;
  • 【Netty学习|Netty学习 - Bootstrap引导】init方法:为通道添加了配置的处理器,设置了通道的选项和属性;
    void init(Channel channel) throws Exception { ChannelPipeline p = channel.pipeline(); p.addLast(config.handler()); final Map, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); }final Map, Object> attrs = attrs0(); synchronized (attrs) { for (Entry, Object> e: attrs.entrySet()) { channel.attr((AttributeKey) e.getKey()).set(e.getValue()); } } }
  • connect方法:connect方法会在内部调用doResolveAndConnect方法完成解析远程域名和连接的工作,整体流程与bind方法很相似,也是先初始化并注册通道,接下来按注册是否结束分情况讨论,委托给了doResolveAndConnect0方法。
    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.isDone()) { if (!regFuture.isSuccess()) { return regFuture; } return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise()); } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // Directly obtain the cause and do a null check so we only need one volatile read in case of a // failure. Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doResolveAndConnect0(channel, remoteAddress, localAddress, promise); } } }); return promise; } }

    private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { try { final EventLoop eventLoop = channel.eventLoop(); final AddressResolver resolver = this.resolver.getResolver(eventLoop); if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) { // Resolver has no idea about what to do with the specified remote address or it's resolved already. doConnect(remoteAddress, localAddress, promise); return promise; }final Future resolveFuture = resolver.resolve(remoteAddress); if (resolveFuture.isDone()) { final Throwable resolveFailureCause = resolveFuture.cause(); if (resolveFailureCause != null) { // Failed to resolve immediately channel.close(); promise.setFailure(resolveFailureCause); } else { // Succeeded to resolve immediately; cached? (or did a blocking lookup) doConnect(resolveFuture.getNow(), localAddress, promise); } return promise; }// Wait until the name resolution is finished. resolveFuture.addListener(new FutureListener() { @Override public void operationComplete(Future future) throws Exception { if (future.cause() != null) { channel.close(); promise.setFailure(future.cause()); } else { doConnect(future.getNow(), localAddress, promise); } } }); } catch (Throwable cause) { promise.tryFailure(cause); } return promise; }private static void doConnect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {// This method is invoked before channelRegistered() is triggered.Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. final Channel channel = connectPromise.channel(); channel.eventLoop().execute(new Runnable() { @Override public void run() { if (localAddress == null) { channel.connect(remoteAddress, connectPromise); } else { channel.connect(remoteAddress, localAddress, connectPromise); } connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } }); }

  • ServerBootstrap类 ServerBootstrap类继承了AbstractBootstrap类,新增加了childHandler、childGroup、childOptions和childAttrs成员变量,并增加了与之对应的成员方法。
    成员方法