dubbo 异步调用

本篇主要介绍一下 Dubbo 的异步编程,包括客户端如何发起异步调用、服务端如何异步执行,以及其实现原理。
客户端异步调用
先看下整体流程:
dubbo 异步调用
文章图片

再来看下怎么调用,有两种方式,定义 CompletableFuture 返回类型的接口以及使用 RpcContext。
第一种:定义一个 CompletableFuture 返回类型的接口及实现

public interface CityService { CompletableFuture getCityId(); }

@Service public class CityServiceImpl implements CityService { @Override public CompletableFuture getCityId() { CompletableFuture completableFuture = new CompletableFuture<>(); completableFuture.complete("ni hao"); return completableFuture; } }

客户端引入该接口并发起调用
@Reference CityService cityService; ... CompletableFuture future = cityService.getCityId(); future.whenComplete((o, t) -> { System.out.println(o); });

第二种:使用async配置,可以用在 Reference 或者 Method 级别
还是定义一个服务接口
public interface CityService { String getCityName(); }

@Service public class CityServiceImpl implements CityService { @Override public String getCityName() { return "hangzhou"; } }

客户端引入该接口并发起调用
@Reference(async = true) CityService cityService; ... //此处调用结果会返回null(2.7.1版本) cityService.getCityName(); FutureAdapter futureAdapter = (FutureAdapter)RpcContext.getContext().getFuture(); futureAdapter.getResultFuture().whenComplete((o, t) -> { System.out.println(o.getValue()); });
对于第二种方式调用,需要注意的是单线程下每次调完方法之后,要立刻获取对应的FutureAdapter,下面的原理部分会说明原因。如果你要确保消息发送成功的,则需要设置sent为 true。
对于第一种方式我们可以看到并没有定义async的值,这是因为对于返回类型为 CompletableFuture 的函数,dubbo里面判断为需要异步了。
服务端异步执行
服务端异步执行实际上只是为了提高吞吐,其实就是我们自己开新线程去处理业务,以便释放dubbo线程去处理其他请求。
dubbo 异步调用
文章图片

服务端的异步执行也有两种方式可以实现,定义 CompletableFuture 返回类型的接口以及使用 AsyncContext。
第一种:定义一个 CompletableFuture 返回类型的接口及实现,其实和上面的客户端异步的接口定义一样,只不过服务端实现不一样。
服务端实现
@Service public class CityServiceImpl implements CityService { @Override public CompletableFuture getCityId() { return CompletableFuture.supplyAsync(() -> "ni hao"); } }

上面用了 CompletableFuture 自带的线程池做异步执行,当然你也可以自定义,比如
@Service public class CityServiceImpl implements CityService { @Override public CompletableFuture getCityId() { CompletableFuture completableFuture = new CompletableFuture<>(); new Thread(() -> { completableFuture.complete("ni hao"); }).start(); return completableFuture; } }

这种方式对于接口的定义有一定要求,如果你接口还是想返回自定义的类型,则需要使用AsyncContext
第二种:使用AsyncContext
@Service public class CityServiceImpl implements CityService { @Override public String getCityId() { AsyncContext asyncContext = RpcContext.startAsync(); new Thread(() -> { //转储调用线程的RpcContext信息到当前线程,不用则可以注释掉 //asyncContext.signalContextSwitch(); asyncContext.write("zhejiang"); }).start(); return ""; } }

其实AsyncContext内部也是用了 CompletableFuture,Dubbo做了特殊处理来兼容异步操作,后面原理部分会讲到。
原理
基于上面的使用,这里对Dubbo的异步编程说明一下是如何实现的。也从客户端异步调用和服务端异步执行两个方面来分别介绍。
客户端异步调用
  • 定义 CompletableFuture 返回类型方式
    Dubbo客户端发起调用的时候,实际上是对于接口做了一层代理,最终调用的逻辑在org.apache.dubbo.rpc.proxy.InvokerInvocationHandler#invoke
    dubbo 异步调用
    文章图片

    从上图可以看到在调用的时候会通过函数 createInvocation 创建一个RpcInvocation
    dubbo 异步调用
    文章图片

    在创建的时候会判断接口的返回类型是否是 CompletableFuture,如果是的话就是设置异步调用标记
    dubbo 异步调用
    文章图片

    最终在org.apache.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke会进行判断是否需要异步调用,需要的话会返回 CompletableFuture,然后调用端等待监听回调
    dubbo 异步调用
    文章图片
  • 【dubbo 异步调用】设置async值为true
    我们在 Reference 或者 Method 配置中将async设置为 true 之后,Dubbo 在启动的时候会将配置注入到对应的 ReferenceBean 中,在客户端发起调用的时候会进行传递,具体设置逻辑在
    org.apache.dubbo.rpc.protocol.AbstractInvoker#invoke
    dubbo 异步调用
    文章图片

    如果 Method 级别没有配置则取 Reference 级别,后面的逻辑和第一种方式一样,不再赘述。在使用的时候我们有说到 "每次调完方法之后,要立刻获取对应的FutureAdapter",这里说明一下原因
    dubbo 异步调用
    文章图片

    可以看到在发请求之后Dubbo会设置对应的FutureAdapter到 RpcContext 中,而 RpcContext.getContext()是 ThreadLocal 级别的,也就是如果一个线程调用了两次,第二次就会把第一次的异步结果给覆盖了,所以说我们每次调用之后都要先把Future取回来然后再发起下一次调用。
    dubbo 异步调用
    文章图片

    可以看到是ThreadLocal级别的存储
    dubbo 异步调用
    文章图片

    服务端异步执行说完客户端异步调用的原理之后,这里再讲一下服务端是如何做异步执行的,还是以上面介绍的两种方式分别说明。
  • 定义一个 CompletableFuture 返回类型的接口
    服务端哪里接收请求,如何处理请求不在这里介绍,后面会有文章分析,这里直接贴关键代码,看看服务端如何实现异步执行的,看这里
    org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleRequest
    dubbo 异步调用
    文章图片

    从这段逻辑我们可以看出服务端在处理请求之后会返回一个 CompletableFuture,然后等待异步执行完毕。我们再看一下图中的 handler.reply 函数
    dubbo 异步调用
    文章图片

    可以看到对于 AsyncRpcResult 的返回类型会做异步等待,其他的则直接返回值来结束 CompletableFuture,继续跟进这个 invoker.invoke 函数,实际就是org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#invoke
    dubbo 异步调用
    文章图片

    可以看到这里有对于返回类型的判断,对于 CompletableFuture 类型的返回,对应 Invocation 中的future_returntype 值则为 true,最终返回一个对 future 包装的 AsyncRpcResult 结果类,进入到 AsyncRpcResult 类看下
    dubbo 异步调用
    文章图片

    外部拿到的是新建的 resultFuture,valueFuture 则是服务端业务逻辑异步接口返回的 CompletableFuture,这里可以看到如果我们业务逻辑异步执行完毕,则会触发外部拿到 resultFuture的设值并结束,以此来结束外部等待
  • 使用AsyncContext
    再来说一下第二种方式的原理,这种方式据dubbo官方介绍是参考的Servlet 3.0 的异步接口AsyncContext,我们来看下怎么实现的,同样看一下第一种方式实现的
    org.apache.dubbo.rpc.proxy.AbstractProxyInvoker#invoke
    dubbo 异步调用
    文章图片

    这里判断了 rpcContext.isAsyncStarted(),如果启动了则会返回异步的结果,我们在上面使用中介绍过,在业务代码中首先要调用AsyncContext asyncContext = RpcContext.startAsync()来拿到 AsyncContext,先看下这个 startAsync 逻辑
    dubbo 异步调用
    文章图片

    可以看到这里会获取到当前线程的 RpcContext,然后新建一个 AsyncContext(如果没有的话),然后启动 AsyncContext,看下启动逻辑
    dubbo 异步调用
    文章图片

    其实就是把状态设置为已启动并且新建一个 CompletableFuture,而这个 Future 就是外部拿到等待业务逻辑异步结果的,在业务逻辑中执行完后得到结果后,我们需要调用asyncContext.write(xxx)对结果进行设置,看下write函数
    dubbo 异步调用
    文章图片

    可以看到用完之后就会把Context关掉并且结束 Future 的等待。所以本质上这种方式是利用了ThreadLocal的特性来存储Context进行传递 Future,再利用 Future 来做线程间的等待唤醒。
    总结
    本篇详细介绍了Dubbo客户端异步调用以及服务端异步执行的使用和原理,可以看到Dubbo在很多细节上确实实现得很巧妙,这里是基于2.7.1版本的源码解析,2.6.x版本是不支持 CompletableFuture类型的异步的,对于之后的版本,大概逻辑是差不多的,看起来也就水到渠成了。

    推荐阅读