Netty分布式Future与Promise执行回调相关逻辑剖析

目录

  • Future和Promise执行回调
    • 首先我们看一段写在handler中的业务代码
    • 这里关注newPromise()方法, 跟进去
    • 我们继续跟write方法
    • 跟进tryFailure方法
    • 跟到addMessage方法中
    • 最后跟到AbstractUnsafe的flush方法
    • 我们跟到remove()方法中
    • 再跟到trySuccess方法中
    • 我们看用户代码
    • 跟到addListener0方法中
    • 回到addListener0方法中
    • 跟到isDone方法中
    • 跟到notifyListeners()方法中
  • 章节小结

    Future和Promise执行回调 Netty中的Future, 其实类似于jdk的Future, 用于异步获取执行结果
    Promise则相当于一个被观察者, 其中promise对象会一直跟随着channel的读写事件, 并跟踪着事件状态, 然后执行相应的回调
    这种设计思路也就是java设计模式的观察者模式

    首先我们看一段写在handler中的业务代码
    @Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ChannelFuture future = ctx.writeAndFlush("test data"); future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()){System.out.println("写出成功"); }else{System.out.println("写出失败"); }}}); }

    熟悉netty的小伙伴估计对这段代码并不陌生, 首先调用writeAndFlush方法将数据写出, 然后返回的future进行添加Listener, 并且重写回调函数
    这里举一个最简单的示例, 在回调函数中判断future的状态成功与否, 成功的话就打印"写出成功", 否则节打印"写出失败"
    这里如果写在handler中通常是NioEventLoop线程执行的, 在future返回之后才会执行添加listener的操作, 如果在用户线程中writeAndFlush是异步执行的, 在添加监听的时候有可能写出操作没有执行完毕, 等写出操作执行完毕之后才会执行回调
    【Netty分布式Future与Promise执行回调相关逻辑剖析】以上逻辑在代码中如何体现的呢?我们首先跟到writeAndFlush的方法中去
    这里会走到AbstractChannelHandlerContext中的writeAndFlush方法中:
    public ChannelFuture writeAndFlush(Object msg) {return writeAndFlush(msg, newPromise()); }

    这里的逻辑之前剖析过, 想必大家并不陌生

    这里关注newPromise()方法, 跟进去
    public ChannelPromise newPromise() {return new DefaultChannelPromise(channel(), executor()); }

    这里直接创建了DefaultChannelPromise这个对象并传入了当前channel和当前channel绑定NioEventLoop对象
    在DefaultChannelPromise构造方法中, 也会将channel和NioEventLoop对象绑定在自身成员变量中
    回到writeAndFlush方法继续跟
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {if (msg == null) {throw new NullPointerException("msg"); }if (!validatePromise(promise, true)) {ReferenceCountUtil.release(msg); return promise; }write(msg, true, promise); return promise; }

    这里的逻辑也不陌生, 注意这里最后返回了promise, 其实就是我们上一步创建DefaultChannelPromise对象
    DefaultChannelPromise实现了ChannelFuture接口, 所以方法如果返回该对象可以被ChannelFuture类型接收

    我们继续跟write方法
    private void write(Object msg, boolean flush, ChannelPromise promise) {AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) {if (flush) {next.invokeWriteAndFlush(m, promise); } else {next.invokeWrite(m, promise); }} else {AbstractWriteTask task; if (flush) {task = WriteAndFlushTask.newInstance(next, m, promise); }else {task = WriteTask.newInstance(next, m, promise); }safeExecute(executor, task, promise, m); }}

    这里的逻辑我们同样不陌生, 如果nioEventLoop线程, 我们继续调invokeWriteAndFlush方法, 如果不是nioEventLoop线程则将writeAndFlush事件封装成task, 交给eventLoop线程异步
    这里如果是异步执行, 则到这一步之后, 我们的业务代码中, writeAndFlush就会返回并添加监听, 有关添加监听的逻辑稍后分析
    走到这里, 无论同步异步, 都会执行到invokeWriteAndFlush方法:
    private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {if (invokeHandler()) { invokeWrite0(msg, promise); invokeFlush0(); } else {writeAndFlush(msg, promise); }}

    这里也是我们熟悉的逻辑, 我们看到在invokeWrite0方法中传入了我们刚才创建的DefaultChannelPromise
    后续逻辑想必大家都比较熟悉, 通过事件传播, 最终会调用head节点的write方法:
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {unsafe.write(msg, promise); }

    这里最终调用unsafe的write方法, 并传入了promise对象
    跟到AbstractUnsafe的write方法中:
    public final void write(Object msg, ChannelPromise promise) {assertEventLoop(); //负责缓冲写进来的byteBufChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) {safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); ReferenceCountUtil.release(msg); return; }int size; try { msg = filterOutboundMessage(msg); size = pipeline.estimatorHandle().size(msg); if (size < 0) {size = 0; }} catch (Throwable t) {safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; }//插入写队列outboundBuffer.addMessage(msg, size, promise); }

    这里的逻辑之前小节也剖析过, 这里我们首先关注两个部分, 首先看在catch中safeSetFailure这步
    因为是catch块, 说明发生了异常, 写到缓冲区不成功, safeSetFailure就是设置写出失败的状态
    我们跟到safeSetFailure方法中:
    protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause); }}

    这里看if判断, 首先我们的promise是DefaultChannelPromise, 所以!(promise instanceof VoidChannelPromise)为true
    重点分析promise.tryFailure(cause), 这里是设置失败状态, 这里会调用DefaultPromise的tryFailure方法

    跟进tryFailure方法
    public boolean tryFailure(Throwable cause) {if (setFailure0(cause)) {notifyListeners(); return true; }return false; }

    再跟到setFailure0(cause)中:
    private boolean setValue0(Object objResult) {if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {checkNotifyWaiters(); return true; }return false; }

    这里在if块中的cas操作, 会将参数objResult的值设置到DefaultPromise的成员变量result中, 表示当前操作为异常状态
    回到tryFailure方法:
    这里关注notifyListeners()这个方法, 这个方法是执行添加监听的回调函数, 当writeAndFlush和addListener是异步执行的时候, 这里有可能添加已经添加, 所以通过这个方法可以调用添加监听后的回调
    如果writeAndFlush和addListener是同步执行的时候, 也就是都在NioEventLoop线程中执行的时候, 那么走到这里addListener还没执行, 所以这里不能回调添加监听的回调函数, 那么回调是什么时候执行的呢?我们在剖析addListener步骤的时候会给大家分析
    具体执行回调我们再讲解添加监听的时候进行剖析
    以上就是记录异常状态的大概逻辑
    回到AbstractUnsafe的write方法:
    我们再关注这一步:
    outboundBuffer.addMessage(msg, size, promise);

    跟到addMessage方法中
    public void addMessage(Object msg, int size, ChannelPromise promise) {Entry entry = Entry.newInstance(msg, size, total(msg), promise); //代码省略}

    我们只需要关注包装Entry的newInstance方法, 该方法传入promise对象
    跟到newInstance中:
    static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {Entry entry = RECYCLER.get(); entry.msg = msg; entry.pendingSize = size; entry.total = total; entry.promise = promise; return entry; }

    这里将promise设置到Entry的成员变量中了, 也就是说, 每个Entry都关联了唯一的一个promise
    我们回到AbstractChannelHandlerContext的invokeWriteAndFlush方法中:
    private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {if (invokeHandler()) { invokeWrite0(msg, promise); invokeFlush0(); } else {writeAndFlush(msg, promise); }}

    我们刚才分析了write操作中promise的传递以及状态设置的大概过程, 我们继续看在flush中promise的操作过程
    这里invokeFlush0()并没有传入promise对象, 是因为我们刚才分析过, promise对象会绑定在缓冲区中entry的成员变量中, 可以通过其成员变量拿到promise对象
    invokeFlush0()我们之前也分析过, 通过事件传递, 最终会调用HeadContext的flush方法:
    public void flush(ChannelHandlerContext ctx) throws Exception {unsafe.flush(); }


    最后跟到AbstractUnsafe的flush方法
    public final void flush() {assertEventLoop(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) {return; }outboundBuffer.addFlush(); flush0(); }

    这块逻辑之前已分析过, 继续看flush0方法:
    protected void flush0() {//代码省略try {doWrite(outboundBuffer); } catch (Throwable t) {//代码省略} finally {inFlush0 = false; }}

    篇幅原因我们省略大段代码
    我们继续跟进doWrite方法:
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {int writeSpinCount = -1; boolean setOpWrite = false; for (; ; ) {Object msg = in.current(); if (msg == null) {clearOpWrite(); return; }if (msg instanceof ByteBuf) {//代码省略boolean done = false; //代码省略if (done) {//移除当前对象in.remove(); } else {break; }} else if (msg instanceof FileRegion) {//代码省略} else {throw new Error(); }}incompleteWrite(setOpWrite); }

    这里也省略了大段代码, 我们重点关注in.remove()这里, 之前介绍过, 如果done为true, 说明刷新事件已完成, 则移除当前entry节点

    我们跟到remove()方法中
    public boolean remove() {Entry e = flushedEntry; if (e == null) {clearNioBuffers(); return false; }Object msg = e.msg; ChannelPromise promise = e.promise; int size = e.pendingSize; removeEntry(e); if (!e.cancelled) {ReferenceCountUtil.safeRelease(msg); safeSuccess(promise); decrementPendingOutboundBytes(size, false, true); }e.recycle(); return true; }

    这里我们看这一步:
    ChannelPromise promise = e.promise;

    之前我们剖析promise对象会绑定在entry中, 而这步就是从entry中获取promise对象
    等remove操作完成, 会执行到这一步:
    safeSuccess(promise);

    这一步正好和我们刚才分析的safeSetFailure相反, 这里是设置成功状态
    跟到safeSuccess方法中:
    private static void safeSuccess(ChannelPromise promise) {if (!(promise instanceof VoidChannelPromise)) {PromiseNotificationUtil.trySuccess(promise, null, logger); }}


    再跟到trySuccess方法中
    public static < V> void trySuccess(Promise< ? super V> p, V result, InternalLogger logger) {if (!p.trySuccess(result) & & logger != null) {//代码省略}}

    这里再继续跟if中的trySuccess方法, 最后会走到DefaultPromise的trySuccess方法:
    public boolean trySuccess(V result) {if (setSuccess0(result)) {notifyListeners(); return true; }return false; }

    这里跟到setSuccess0方法中:
    private boolean setSuccess0(V result) {return setValue0(result == null ? SUCCESS : result); }

    这里的逻辑我们刚才剖析过了, 这里参数传入一个信号SUCCESS, 表示设置成功状
    再继续跟setValue方法:
    private boolean setValue0(Object objResult) {if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {checkNotifyWaiters(); return true; }return false; }

    同样, 在if判断中, 通过cas操作将参数传入的SUCCESS对象赋值到DefaultPromise的属性result中, 我们看这个属性:
    private volatile Object result;
    这里是Object类型, 也就是可以赋值成任何类型
    SUCCESS是一个Signal类型的对象, 这里我们可以简单理解成一种状态, SUCCESS表示一种成功的状态
    通过上述cas操作, result的值将赋值成SUCCESS
    我们回到trySuccess方法:
    public boolean trySuccess(V result) {if (setSuccess0(result)) {notifyListeners(); return true; }return false; }

    设置完成功状态之后, 则会通过notifyListeners()执行监听中的回调

    我们看用户代码
    @Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ChannelFuture future = ctx.writeAndFlush("test data"); future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()){System.out.println("写出成功"); }else{System.out.println("写出失败"); }}}); }

    在回调中会判断future.isSuccess(), promise设置为成功状态这里会返回true, 从而打印写出成功"
    跟到isSuccess方法中, 这里会调用DefaultPromise的isSuccess方法:
    public boolean isSuccess() {Object result = this.result; return result != null & & result != UNCANCELLABLE & & !(result instanceof CauseHolder); }

    我们看到首先会拿到result对象, 然后判断result不为空, 并且不是UNCANCELLABLE, 并且不属于CauseHolder对象
    我们刚才分析如果promise设置为成功装载, 则result为SUCCESS, 所以这里条件成立, 可以执行 if (future.isSuccess()) 中if块的逻辑
    和设置错误状态的逻辑一样, 这里也有同样的问题, 如果writeAndFlush是和addListener是异步操作, 那么执行到回调的时候, 可能addListener已经添加完成, 所以可以正常的执行回调
    那么如果writeAndFlush是和addListener是同步操作, writeAndFlush在执行回调的时候, addListener并没有执行, 所以无法执行回调方法, 那么回调方法是如何执行的呢, 我们看addListener这个方法:
    addListener传入ChannelFutureListener对象, 并重写了operationComplete方法, 也就是执行回调的方法
    这里会执行到DefaultChannelPromise的addListener方法, 跟进去
    public ChannelPromise addListener(GenericFutureListener< ? extends Future< ? super Void> > listener) {super.addListener(listener); return this; }

    跟到父类的addListener中:
    public Promise< V> addListener(GenericFutureListener< ? extends Future< ? super V> > listener) {checkNotNull(listener, "listener"); synchronized (this) {addListener0(listener); }if (isDone()) {notifyListeners(); }return this; }

    这里通过addListener0方法添加listener, 因为添加listener有可能会在不同的线程中操作, 比如用户线程和NioEventLoop线程, 为了防止并发问题, 这里简单粗暴的加了个synchronized关键字

    跟到addListener0方法中
    private void addListener0(GenericFutureListener< ? extends Future< ? super V> > listener) {if (listeners == null) {listeners = listener; } else if (listeners instanceof DefaultFutureListeners) {((DefaultFutureListeners) listeners).add(listener); } else {listeners = new DefaultFutureListeners((GenericFutureListener< ? extends Future< V> > ) listeners, listener); }}

    如果是第一次添加listener, 则成员变量listeners为null, 这样就把参数传入的GenericFutureListener赋值到成员变量listeners
    如果是第二次添加listener, listeners不为空, 会走到else if判断, 因为第一次添加的listener是GenericFutureListener类型, 并不是DefaultFutureListeners类型, 所以else if判断返回false, 进入到else块中
    else块中, 通过new的方式创建一个DefaultFutureListeners对象并赋值到成员变量listeners中
    DefaultFutureListeners的构造方法中, 第一个参数传入DefaultPromise中的成员变量listeners, 也就是第一次添加的GenericFutureListener对象, 第二个参数为第二次添加的GenericFutureListener对象, 这里通过两个GenericFutureListener对象包装成一个DefaultFutureListeners对象
    我们看listeners的定义:
    private Object listeners;

    这里是个Object类型, 所以可以保存任何类型的对象
    再看DefaultFutureListeners的构造方法:
    DefaultFutureListeners(GenericFutureListener< ? extends Future< ?> > first, GenericFutureListener< ? extends Future< ?> > second) {listeners = new GenericFutureListener[2]; //第0个listeners[0] = first; //第1个listeners[1] = second; size = 2; //代码省略}

    在DefaultFutureListeners类中也定义了一个成员变量listeners, 类型为GenericFutureListener数组
    构造方法中初始化listeners这个数组, 并且数组中第一个值赋值为我们第一次添加的GenericFutureListener, 第二个赋值为我们第二次添加的GenericFutureListener

    回到addListener0方法中
    private void addListener0(GenericFutureListener< ? extends Future< ? super V> > listener) {if (listeners == null) {listeners = listener; } else if (listeners instanceof DefaultFutureListeners) {((DefaultFutureListeners) listeners).add(listener); } else {listeners = new DefaultFutureListeners((GenericFutureListener< ? extends Future< V> > ) listeners, listener); }}

    经过两次添加listener, 属性listeners的值就变成了DefaultFutureListeners类型的对象, 如果第三次添加listener, 则会走到else if块中, DefaultFutureListeners对象通过调用add方法继续添加listener
    跟到add方法中:
    public void add(GenericFutureListener< ? extends Future< ?> > l) {GenericFutureListener< ? extends Future< ?> > [] listeners = this.listeners; final int size = this.size; if (size == listeners.length) {this.listeners = listeners = Arrays.copyOf(listeners, size < < 1); }listeners[size] = l; this.size = size + 1; //代码省略}

    这里的逻辑也比较简单, 就是为当前的数组对象listeners中追加新的GenericFutureListener对象, 如果listeners容量不足则进行扩容操作
    根据以上逻辑, 就完成了listener的添加逻辑
    那么再看我们刚才遗留的问题, 如果writeAndFlush和addListener是同步进行的, writeAndFlush执行回调时还没有addListener还没有执行回调, 那么回调是如何执行的呢?
    回到DefaultPromise的addListener中:
    public Promise< V> addListener(GenericFutureListener< ? extends Future< ? super V> > listener) {checkNotNull(listener, "listener"); synchronized (this) {addListener0(listener); }if (isDone()) {notifyListeners(); }return this; }

    我们分析完了addListener0方法, 再往下看
    这个会有if判断isDone(), isDone方法, 就是程序执行到这一步的时候, 判断刷新事件是否执行完成

    跟到isDone方法中
    public boolean isDone() {return isDone0(result); }

    继续跟isDone0, 这里传入了成员变量result
    private static boolean isDone0(Object result) {return result != null & & result != UNCANCELLABLE; }

    这里判断result不为null并且不为UNCANCELLABLE, 则就表示完成
    因为成功的状态是SUCCESS, 所以flush成功这里会返回true
    回到 addListener中:
    如果执行完成, 就通过notifyListeners()方法执行回调, 这也解释刚才的问题, 在同步操作中, writeAndFlush在执行回调时并没有添加listener, 所以添加listener的时候会判断writeAndFlush的执行状态, 如果状态时完成, 则会这里执行回调
    同样, 在异步操作中, 走到这里writeAndFlush可能还没完成, 所以这里不会执行回调, 由writeAndFlush执行回调
    所以, 无论writeAndFlush和addListener谁先完成, 都可以执行到回调方法

    跟到notifyListeners()方法中
    private void notifyListeners() {EventExecutor executor = executor(); if (executor.inEventLoop()) {final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); if (stackDepth < MAX_LISTENER_STACK_DEPTH) {threadLocals.setFutureListenerStackDepth(stackDepth + 1); try {notifyListenersNow(); } finally {threadLocals.setFutureListenerStackDepth(stackDepth); }return; }}safeExecute(executor, new Runnable() {@Overridepublic void run() {notifyListenersNow(); }}); }

    这里首先判断是否是eventLoop线程, 如果是eventLoop线程则执行if块中的逻辑, 如果不是eventLoop线程, 则把执行回调的逻辑封装成task丢到EventLoop的任务队列中异步执行
    我们重点关注notifyListenersNow()方法, 跟进去:
    private void notifyListenersNow() {Object listeners; synchronized (this) {if (notifyingListeners || this.listeners == null) {return; }notifyingListeners = true; listeners = this.listeners; this.listeners = null; }for (; ; ) {if (listeners instanceof DefaultFutureListeners) {notifyListeners0((DefaultFutureListeners) listeners); } else {notifyListener0(this, (GenericFutureListener< ? extends Future< V> > ) listeners); }//代码省略}}

    在无限for循环中, 首先首先判断listeners是不是DefaultFutureListeners类型, 根据我们之前的逻辑, 如果只添加了一个listener, 则listeners是GenericFutureListener类型
    通常在添加的时候只会添加一个listener, 所以我们跟到else块中的notifyListener0方法:
    private static void notifyListener0(Future future, GenericFutureListener l) {try {l.operationComplete(future); } catch (Throwable t) {logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t); }}

    我们看到, 这里执行了GenericFutureListener的中我们重写的回调函数operationComplete
    以上就是执行回调的相关逻辑

    章节小结 这一章讲解了有关write和flush的相关逻辑, 并分析了有关添加监听和异步写数据的相关步骤
    经过学习, 同学们应该掌握如下知识:
    write操作是如何将ByteBuf添加到发送缓冲区的
    flush操作是如何将ByteBuf写出到chanel中的
    抽象编码器MessageToByteEncoder中如何定义了编码器的骨架逻辑
    writeAndFlush和addListener在同步和异步操作中是如何执行回调的
    更多关于Netty分布式Future和Promise执行回调的资料请关注脚本之家其它相关文章!

      推荐阅读