dubbo|dubbo 服务调用过程

目录
dubbo拓展机制 SPI
dubbo 自适应拓展机制
dubbo 服务导出
dubbo 服务引用
dubbo 服务字典
dubbo 服务路由
dubbo 集群
dubbo 负载均衡
dubbo 服务调用过程
大致流程图:

dubbo|dubbo 服务调用过程
文章图片
image.png 1.服务调用方式 从dubbo源码提供的demo作为入口分析服务调用过程,反编译demo的代理类我不是使用的arthas,我使用的是jdk自带的工具HSDB。最终代理类如下:

public class proxy0 implements DC, EchoService, DemoService { public static Method[] methods; private InvocationHandler handler; public proxy0(InvocationHandler var1) { this.handler = var1; }public proxy0() { }public String sayHello(String var1) { Object[] var2 = new Object[]{var1}; Object var3 = this.handler.invoke(this, methods[0], var2); return (String)var3; }public Object $echo(Object var1) { Object[] var2 = new Object[]{var1}; Object var3 = this.handler.invoke(this, methods[1], var2); return (Object)var3; } }

其中InvocationHandler的实现为InvokerInvocationHandler,所以我们接着看它的invoker方法
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class[] parameterTypes = method.getParameterTypes(); // 拦截定义在 Object 类中的方法(未被子类重写),比如 wait/notify if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } // 如果 toString、hashCode 和 equals 等方法被子类重写了,这里也直接调用 if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); }// 将 method 和 args 封装到 RpcInvocation 中,并执行后续的调用 return invoker.invoke(new RpcInvocation(method, args)).recreate(); }

InvokerInvocationHandler 中的 invoker 成员变量类型为 MockClusterInvoker,MockClusterInvoker 内部封装了服务降级逻辑。下面简单看一下:
public Result invoke(Invocation invocation) throws RpcException { Result result = null; // 获取 mock 配置值 String value = https://www.it610.com/article/directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 ||"false".equalsIgnoreCase(value)) { //no mock // 无 mock 逻辑,直接调用其他 Invoker 对象的 invoke 方法, // 比如 FailoverClusterInvoker result = this.invoker.invoke(invocation); } else if (value.startsWith("force")) { if (logger.isWarnEnabled()) { logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl()); } //force:direct mock // force:xxx 直接执行 mock 逻辑,不发起远程调用 result = doMockInvoke(invocation, null); } else { //fail-mock // fail:xxx 表示消费方对调用服务失败后,再执行 mock 逻辑,不抛出异常 try { result = this.invoker.invoke(invocation); //fix:#4585 if(result.getException() != null && result.getException() instanceof RpcException){ RpcException rpcException= (RpcException)result.getException(); if(rpcException.isBiz()){ throwrpcException; }else { // 调用失败,执行 mock 逻辑 result = doMockInvoke(invocation, rpcException); } }} catch (RpcException e) { if (e.isBiz()) { throw e; }if (logger.isWarnEnabled()) { logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e); } result = doMockInvoke(invocation, e); } } return result; }

其中doMockInvoke是在发生异常或者是mock 的配置为 force的时候调用,我们接下来看看doMockInvoke
private Result doMockInvoke(Invocation invocation, RpcException e) { Result result = null; Invoker minvoker; // 通过directory列举invokers List> mockInvokers = selectMockInvoker(invocation); // 如果没有列举的invoker,新建一个MockInvoker if (CollectionUtils.isEmpty(mockInvokers)) { minvoker = (Invoker) new MockInvoker(directory.getUrl(), directory.getInterface()); } else { // 获取第一个invoker minvoker = mockInvokers.get(0); } try { result = minvoker.invoke(invocation); } catch (RpcException me) { if (me.isBiz()) { result = AsyncRpcResult.newDefaultAsyncResult(me.getCause(), invocation); } else { throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause()); } } catch (Throwable me) { throw new RpcException(getMockExceptionMessage(e, me), me.getCause()); } return result; }

接着进入FailoverClusterInvoker进行集群调用,集群调用中会使用负载均衡从invokers中选出一个调用。这个过程已经在前面分析过了,接下来会经过ListenerInvokerWrapper以及各种ListenableFilter,接着调用的是AsyncToSyncInvoker,从名字我们知道是处理异步转同步的,也就是该次调用是异步还是同步的。接着就是调用DubboInvoker,可以看到它的invoke方法在父类AbstractInvoker中,所以先看AbstractInvoker的invoke方法
public Result invoke(Invocation inv) throws RpcException { // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed if (destroyed.get()) { logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, " + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer"); } RpcInvocation invocation = (RpcInvocation) inv; // 设置 Invoker invocation.setInvoker(this); if (CollectionUtils.isNotEmptyMap(attachment)) { // 设置 attachment invocation.addAttachmentsIfAbsent(attachment); } Map contextAttachments = RpcContext.getContext().getAttachments(); if (CollectionUtils.isNotEmptyMap(contextAttachments)) { /** * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here, * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information). */ // 添加 contextAttachments 到 RpcInvocation#attachment 变量中 invocation.addAttachments(contextAttachments); }// 设置异步信息到 RpcInvocation#attachment 中 invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation)); // 如果异步就设置invoke_id RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); try { return doInvoke(invocation); } catch (InvocationTargetException e) { // biz exception Throwable te = e.getTargetException(); if (te == null) { return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } else { if (te instanceof RpcException) { ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION); } return AsyncRpcResult.newDefaultAsyncResult(null, te, invocation); } } catch (RpcException e) { if (e.isBiz()) { return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } else { throw e; } } catch (Throwable e) { return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } }

可以看到其中大部分代码用于添加信息到 RpcInvocation#attachment 变量中,添加完毕后,调用 doInvoke 执行后续的调用。doInvoke 是一个抽象方法,需要由子类实现,下面到 DubboInvoker 中看一下
protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); // 设置 path 和 version 到 attachment 中 inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version); ExchangeClient currentClient; // 从 clients 数组中获取 ExchangeClient if (clients.length == 1) { currentClient = clients[0]; } else { // 轮询获取链接 currentClient = clients[index.getAndIncrement() % clients.length]; } try { // 获取 dubbo:method 中return的设置,默认为true // true,则返回future,或回调onreturn等方法,如果设置为false,则请求发送成功后直接返回Null boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); // 获取方法超时配置 int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); // 发送请求 currentClient.send(inv, isSent); // 直接返回 appResponse 空对象 return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv); CompletableFuture responseFuture = currentClient.request(inv, timeout); // 异步设置结果到 asyncRpcResult 中, // 如果方式为同步,将在后面的 AsyncToSyncInvoker 中调用 get 方法阻塞知道获取结果 asyncRpcResult.subscribeTo(responseFuture); // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter // 兼容dubbo 2.6.x 版本,设置 future 到上下文中 FutureContext.getContext().setCompatibleFuture(responseFuture); return asyncRpcResult; } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
2.服务消费方发送请求 接下来我们继续分析请求的发送过程,ExchangeClient类以及它的一些子类。先来看下ExchangeClient的类图。

dubbo|dubbo 服务调用过程
文章图片
image.png 2.1发送请求 上面的代码中currentClient为ReferenceCountExchangeClient,我们现在分析下ReferenceCountExchangeClient这个类。
final class ReferenceCountExchangeClient implements ExchangeClient {private final URL url; // 引用计数 private final AtomicInteger referenceCount = new AtomicInteger(0); // 装饰对象 private ExchangeClient client; public ReferenceCountExchangeClient(ExchangeClient client) { this.client = client; // 增加计数 referenceCount.incrementAndGet(); this.url = client.getUrl(); } ....... }

ReferenceCountExchangeClient 内部定义了一个引用计数变量 referenceCount,每当该对象被引用一次 referenceCount 都会进行自增。每当 close 方法被调用时,referenceCount 进行自减。ReferenceCountExchangeClient 内部仅实现了一个引用计数的功能,其他方法并无复杂逻辑,均是直接调用被装饰对象的相关方法。
ReferenceCountExchangeClient包装的对象是HeaderExchangeClient,HeaderExchangeClient在HeaderExchanger中使用connect方法生成。
public class HeaderExchangeClient implements ExchangeClient {private final Client client; private final ExchangeChannel channel; // 执行器 private static final HashedWheelTimer IDLE_CHECK_TIMER = new HashedWheelTimer( new NamedThreadFactory("dubbo-client-idleCheck", true), 1, TimeUnit.SECONDS, TICKS_PER_WHEEL); // 心跳任务 private HeartbeatTimerTask heartBeatTimerTask; // 连接任务 private ReconnectTimerTask reconnectTimerTask; public HeaderExchangeClient(Client client, boolean startTimer) { Assert.notNull(client, "Client can't be null"); this.client = client; // 创建 HeaderExchangeChannel 对象 this.channel = new HeaderExchangeChannel(client); if (startTimer) { URL url = client.getUrl(); // 开始重新连接任务,当连接不可用或者是还在超时时间内 startReconnectTask(url); // 开始心跳任务 startHeartBeatTask(url); } } }

HeaderExchangeClient 中很多方法只有一行代码,即调用 HeaderExchangeChannel 对象的同签名方法。那 HeaderExchangeClient 有什么用处呢?答案是封装了一些关于心跳检测的逻辑以及重新建立连接。接下来继续看HeaderExchangeChannel
final class HeaderExchangeChannel implements ExchangeChannel { // 无返回值 @Override public void send(Object message, boolean sent) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!"); } if (message instanceof Request || message instanceof Response || message instanceof String) { channel.send(message, sent); } else { Request request = new Request(); request.setVersion(Version.getProtocolVersion()); // 设置双向通信标志 request.setTwoWay(false); // 这里的 message 变量类型为 RpcInvocation request.setData(message); channel.send(request, sent); } } // 有返回值 @Override public CompletableFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. Request req = new Request(); req.setVersion(Version.getProtocolVersion()); // 设置双向通信标志 req.setTwoWay(true); // 这里的 request 变量类型为 RpcInvocation req.setData(request); // 创建 DefaultFuture 对象 DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout); try { // 调用 NettyClient 的 send 方法发送请求 channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; } }
可以看到send方法无返回值,request有一个future的返回。
接下来我们继续看channel.send方法,其中channel默认为NettyClient,NettyClient没有send方法,而是它的父类AbstractPeer实现了send方法。然后会继续调用NettyClient的父类AbstractClient的sent方法。
public abstract class AbstractPeer implements Endpoint, ChannelHandler { @Override public void send(Object message) throws RemotingException { send(message, url.getParameter(Constants.SENT_KEY, false)); } }public abstract class AbstractClient extends AbstractEndpoint implements Client { @Override public void send(Object message, boolean sent) throws RemotingException { if (needReconnect && !isConnected()) { connect(); } // 获取 Channel,getChannel 是一个抽象方法,具体由子类实现 Channel channel = getChannel(); //TODO Can the value returned by getChannel() be null? need improvement. if (channel == null || !channel.isConnected()) { throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl()); } channel.send(message, sent); } }

继续看NettyClient的getChannel方法
public class NettyClient extends AbstractClient {// 这里的 Channel 全限定名称为 org.jboss.netty.channel.Channel private volatile Channel channel; @Override protected com.alibaba.dubbo.remoting.Channel getChannel() { Channel c = channel; if (c == null || !c.isConnected()) return null; // 获取一个 NettyChannel 类型对象 return NettyChannel.getOrAddChannel(c, getUrl(), this); } }final class NettyChannel extends AbstractChannel {private static final ConcurrentMap channelMap = new ConcurrentHashMap(); private final org.jboss.netty.channel.Channel channel; /** 私有构造方法 */ private NettyChannel(org.jboss.netty.channel.Channel channel, URL url, ChannelHandler handler) { super(url, handler); if (channel == null) { throw new IllegalArgumentException("netty channel == null; "); } this.channel = channel; }static NettyChannel getOrAddChannel(org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) { if (ch == null) { return null; }// 尝试从集合中获取 NettyChannel 实例 NettyChannel ret = channelMap.get(ch); if (ret == null) { // 如果 ret = null,则创建一个新的 NettyChannel 实例 NettyChannel nc = new NettyChannel(ch, url, handler); if (ch.isConnected()) { // 将 键值对存入 channelMap 集合中 ret = channelMap.putIfAbsent(ch, nc); } if (ret == null) { ret = nc; } } return ret; } }

获取到NettyChannel的实例后,就开始调用NettyChannel的sent方法:
public void send(Object message, boolean sent) throws RemotingException { // whether the channel is closed super.send(message, sent); boolean success = true; int timeout = 0; try { // 发送消息(包含请求和响应消息) ChannelFuture future = channel.writeAndFlush(message); // sent 的值源于 中 sent 的配置值,有两种配置值: //1. true: 等待消息发出,消息发送失败将抛出异常 //2. false: 不等待消息发出,将消息放入 IO 队列,即刻返回 // 默认情况下 sent = false; if (sent) { // wait timeout ms timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); // 等待消息发出,若在规定时间没能发出,success 会被置为 false success = future.await(timeout); } Throwable cause = future.cause(); // 若 success 为 false,这里抛出异常 if (cause != null) { throw cause; } } catch (Throwable e) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } if (!success) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit"); } }

接下来看下sayHello方法的整个调用链
proxy0#sayHello(String)
-> InvokerInvocationHandler#invoke(Object, Method, Object[])
-> MockClusterInvoker#invoke(Invocation) //本地调用和错误时mock本地实现
-> AbstractClusterInvoker#invoke(Invocation) //列举可用服务以及初始化负载均衡对象
-> FailoverClusterInvoker#doInvoke(Invocation, List>, LoadBalance) //实现了Failover模式的集群容错
-> Filter#invoke(Invoker, Invocation) // 包含多个 Filter 调用
-> ListenerInvokerWrapper#invoke(Invocation)
-> AsyncToSyncInvoker#invoker // 如果需要异步转同步
-> AbstractInvoker#invoke(Invocation) // 设置attachment和同步或异步
-> DubboInvoker#doInvoke(Invocation) // 设置客户端以及发送请求
-> ReferenceCountExchangeClient#request(Object, int) // 记录该client使用次数
-> HeaderExchangeClient#request(Object, int) // 执行心跳任务和连接超时重连任务
-> HeaderExchangeChannel#request(Object, int) // 设置请求参数
-> AbstractPeer#send(Object) // 检查chaneel是否关闭
-> AbstractClient#send(Object, boolean) // 配置netty,建立连接,获取chaneel重新连接
-> NettyChannel#send(Object, boolean) // 发送请求和捕获异常,向上抛出
-> NioClientSocketChannel#write(Object)
对于服务请求的编码以及接收请求的解码过程暂不分析
2.2 服务提供方接收请求 2.2.1 线程派发模型
dubbo|dubbo 服务调用过程
文章图片
图片来自官网
默认使用的是all,接下来看看AllChannelHandler
public class AllChannelHandler extends WrappedChannelHandler {public AllChannelHandler(ChannelHandler handler, URL url) { super(handler, url); }/** 处理连接事件 */ @Override public void connected(Channel channel) throws RemotingException { // 获取线程池 ExecutorService executor = getExecutorService(); try { // 将连接事件派发到线程池中处理 executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } }/** 处理断开事件 */ @Override public void disconnected(Channel channel) throws RemotingException { ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t); } }/** 处理请求和响应消息,这里的 message 变量类型可能是 Request,也可能是 Response */ @Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getExecutorService(); try { // 将请求和响应消息派发到线程池中处理 executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out if(message instanceof Request && t instanceof RejectedExecutionException){ Request request = (Request)message; // 如果通信方式为双向通信,此时将 Server side ... threadpool is exhausted // 错误信息封装到 Response 中,并返回给服务消费方。 if(request.isTwoWay()){ String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); // 返回包含错误信息的 Response 对象 channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } }/** 处理异常信息 */ @Override public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService executor = getExecutorService(); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t); } } }

继续看下构造函数中调用的父构造方法
public WrappedChannelHandler(ChannelHandler handler, URL url) { this.handler = handler; this.url = url; // 默认为FixedThreadPool的实现 executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) { componentKey = CONSUMER_SIDE; } // 配置到satastore中,方便client或者是server关闭时同时关闭线程池 DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); dataStore.put(componentKey, Integer.toString(url.getPort()), executor); }

这里默认使用了一个最大线程数为200的线程池,具体线程池用于处理什么样的事件取决于使用怎样的线程派发模型。
2.2.2 调用服务
针对netty3和netty4接收channel数据的类和方法有所不同,以下以netty4为例,展示调用链,从NettyServerHandler进入是因为,在服务导出过程中向netty的bootstrap中注册了NettyServerHandler,根据不同的策略以下调用链中的具体的类可能有所不同,这里所列举的都是默认的处理类
【dubbo|dubbo 服务调用过程】NettyServerHandler#channelRead(ChannelHandlerContext, MessageEvent)
-> AbstractPeer#received
-> MultiMessageHandler#received // 多消息处理
-> HeartbeatHandler#received // 处理心跳请求,如果不是继续后续调用
-> AllChannelHandler#received // 将此次处理放入线程池执行
-> ExecutorService#execute(Runnable) // 由线程池执行后续的调用逻辑
-> DecodeHandler#received // 解码
-> HeaderExchangeHandler#received // 处理request或者response或者telnet
-> Dubboprotocol#reply // 获取服务导出过程中保存的对应的invoker
-> filter#invoker#invoke
-> invokerWrapper#invoke
-> AbstractProxyInvoker#invoke // 调用具体的方法,封装调用结果
-> 服务导出过程中生成的对应的代理类
这里线程池的execute执行的也就是ChannelEventRunnable,我们从ChannelEventRunnable开始分析
public class ChannelEventRunnable implements Runnable {private final ChannelHandler handler; private final Channel channel; private final ChannelState state; private final Throwable exception; private final Object message; @Override public void run() { // 检测通道状态,对于请求或响应消息,此时 state = RECEIVED if (state == ChannelState.RECEIVED) { try { // 将 channel 和 message 传给 ChannelHandler 对象,进行后续的调用 handler.received(channel, message); } catch (Exception e) { logger.warn("... operation error, channel is ... message is ..."); } } // 其他消息类型通过 switch 进行处理 else { switch (state) { case CONNECTED: try { handler.connected(channel); } catch (Exception e) { logger.warn("... operation error, channel is ..."); } break; case DISCONNECTED: // ... case SENT: // ... case CAUGHT: // ... default: logger.warn("unknown state: " + state + ", message is " + message); } }} }

ChannelEventRunnable 仅是一个中转站,它的 run 方法中并不包含具体的调用逻辑,仅用于将参数传给其他 ChannelHandler 对象进行处理,该对象类型为 DecodeHandler。
public class DecodeHandler extends AbstractChannelHandlerDelegate {private static final Logger log = LoggerFactory.getLogger(DecodeHandler.class); public DecodeHandler(ChannelHandler handler) { super(handler); }@Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Decodeable) { // 对 Decodeable 接口实现类对象进行解码 decode(message); }if (message instanceof Request) { // 对 Request 的 data 字段进行解码 decode(((Request) message).getData()); }if (message instanceof Response) { // 对 Request 的 result 字段进行解码 decode(((Response) message).getResult()); }// 执行后续逻辑 handler.received(channel, message); }private void decode(Object message) { // Decodeable 接口目前有两个实现类, // 分别为 DecodeableRpcInvocation 和 DecodeableRpcResult if (message instanceof Decodeable) { try { // 执行解码逻辑 ((Decodeable) message).decode(); if (log.isDebugEnabled()) { log.debug("Decode decodeable message " + message.getClass().getName()); } } catch (Throwable e) { if (log.isWarnEnabled()) { log.warn("Call Decodeable.decode failed: " + e.getMessage(), e); } } // ~ end of catch } // ~ end of if } // ~ end of method decode}

DecodeHandler 存在的意义就是保证请求或响应对象可在线程池中被解码。解码完毕后,完全解码后的 Request 对象会继续向后传递,下一站是 HeaderExchangeHandler。
public class HeaderExchangeHandler implements ChannelHandlerDelegate { void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); // 检测请求是否合法,不合法则返回状态码为 BAD_REQUEST 的响应 if (req.isBroken()) { Object data = https://www.it610.com/article/req.getData(); String msg; if (data == null) { msg = null; } else if (data instanceof Throwable) { msg = StringUtils.toString((Throwable) data); } else { msg = data.toString(); } res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); // 将调用结果返回给服务消费端 channel.send(res); return; } // find handler by message class. // 获取 data 字段值,也就是 RpcInvocation 对象 Object msg = req.getData(); try { // 继续向下调用 CompletionStage future = handler.reply(channel, msg); future.whenComplete((appResult, t) -> { try { if (t == null) { // 设置 OK 状态码 res.setStatus(Response.OK); // 设置调用结果 res.setResult(appResult); } else { // 若调用过程出现异常,则设置 SERVICE_ERROR,表示服务端异常 res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(t)); } // 将调用结果返回给服务消费端 channel.send(res); } catch (RemotingException e) { logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e); } finally { // HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); } } public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { // 处理请求对象 if (message instanceof Request) { // handle request. Request request = (Request) message; if (request.isEvent()) { // 处理事件 handlerEvent(channel, request); } else { // 双向通信 if (request.isTwoWay()) { // 向后调用服务,并得到调用结果 handleRequest(exchangeChannel, request); } // 如果是单向通信,仅向后调用指定服务即可,无需返回调用结果 else { handler.received(exchangeChannel, request.getData()); } } } // 处理响应对象,服务消费方会执行此处逻辑,后面分析 else if (message instanceof Response) { handleResponse(channel, (Response) message); } // telnet 相关,忽略 else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } }
对于双向通信,HeaderExchangeHandler 首先向后进行调用,得到调用结果。然后将调用结果封装到 Response 对象中,最后再将该对象返回给服务消费方。如果请求不合法,或者调用失败,则将错误信息封装到 Response 对象中,并返回给服务消费方。
其中handler.reply调用的是dubboProtocol的匿名类对象逻辑
public class DubboProtocol extends AbstractProtocol { private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public CompletableFuture reply(ExchangeChannel channel, Object message) throws RemotingException {if (!(message instanceof Invocation)) { throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); }Invocation inv = (Invocation) message; // 获取 Invoker 实例 Invoker invoker = getInvoker(channel, inv); // need to consider backward-compatibility if it's a callback // 参数回调 相关 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || !methodsStr.contains(",")) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); // 通过 Invoker 调用具体的服务 Result result = invoker.invoke(inv); // 返回入参 return result.completionFuture().thenApply(Function.identity()); } }Invoker getInvoker(Channel channel, Invocation inv) throws RemotingException { boolean isCallBackServiceInvoke = false; boolean isStubServiceInvoke = false; int port = channel.getLocalAddress().getPort(); String path = inv.getAttachments().get(PATH_KEY); // if it's callback service on client side // 本地存根 isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(STUB_EVENT_KEY)); if (isStubServiceInvoke) { port = channel.getRemoteAddress().getPort(); }//callback // 是否是客户端并且不是 本地存根 isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke; if (isCallBackServiceInvoke) { path += "." + inv.getAttachments().get(CALLBACK_SERVICE_KEY); inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString()); }// 计算 service key,格式为 groupName/serviceName:serviceVersion:port。比如: //dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880 String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY)); // 从 exporterMap 查找与 serviceKey 相对应的 DubboExporter 对象, // 服务导出过程中会将映射关系存储到 exporterMap 集合中 DubboExporter exporter = (DubboExporter) exporterMap.get(serviceKey); // 获取 Invoker 对象,并返回 if (exporter == null) { throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv); }return exporter.getInvoker(); }}
invoke方法逻辑在AbstractProxyInvoker中
public Result invoke(Invocation invocation) throws RpcException { try { // 调用 doInvoke 执行后续的调用,并将调用结果封装到 asyncRpcResult Object value = https://www.it610.com/article/doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments()); CompletableFuture future = wrapWithFuture(value, invocation); AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation); future.whenComplete((obj, t) -> { AppResponse result = new AppResponse(); if (t != null) { if (t instanceof CompletionException) { result.setException(t.getCause()); } else { result.setException(t); } } else { result.setValue(obj); } asyncRpcResult.complete(result); }); return asyncRpcResult; } catch (InvocationTargetException e) { if (RpcContext.getContext().isAsyncStarted() && !RpcContext.getContext().stopAsync()) { logger.error("Provider async started, but got an exception from the original method, cannot write the exception back to consumer because an async result may have returned the new thread.", e); } return AsyncRpcResult.newDefaultAsyncResult(null, e.getTargetException(), invocation); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); } }
doInvoke 是一个抽象方法,这个需要由具体的 Invoker 实例实现。Invoker 实例是在运行时通过 JavassistProxyFactory 创建的,创建逻辑如下
public Invoker getInvoker(T proxy, Class type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' // 为目标类创建 Wrapper final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); // 创建匿名 Invoker 类对象,并实现 doInvoke 方法 return new AbstractProxyInvoker(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class[] parameterTypes, Object[] arguments) throws Throwable { // 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }

Wrapper 是一个抽象类,其中 invokeMethod 是一个抽象方法。Dubbo 会在运行时通过 Javassist 框架为 Wrapper 生成实现类,并实现 invokeMethod 方法,该方法最终会根据调用信息调用具体的服务。以 DemoServiceImpl 为例,Javassist 为其生成的代理类如下。
public class Wrapper1 extends Wrapper implements ClassGenerator.DC { public static String[] pns; public static Map pts; public static String[] mns; public static String[] dmns; public static Class[] mts0; public String[] getPropertyNames() { return pns; }public boolean hasProperty(String string) { return pts.containsKey(string); }public Class getPropertyType(String string) { return (Class)pts.get(string); }public String[] getMethodNames() { return mns; }public String[] getDeclaredMethodNames() { return dmns; }public void setPropertyValue(Object object, String string, Object object2) { try { DemoServiceImpl demoServiceImpl = (DemoServiceImpl)object; } catch (Throwable throwable) { throw new IllegalArgumentException(throwable); } throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString()); }public Object getPropertyValue(Object object, String string) { try { DemoServiceImpl demoServiceImpl = (DemoServiceImpl)object; } catch (Throwable throwable) { throw new IllegalArgumentException(throwable); } throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(string).append("\" field or setter method in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString()); }public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException { DemoServiceImpl demoServiceImpl; try { demoServiceImpl = (DemoServiceImpl)object; } catch (Throwable throwable) { throw new IllegalArgumentException(throwable); } try { if ("sayHello".equals(string) && arrclass.length == 1) { return demoServiceImpl.sayHello((String)arrobject[0]); } } catch (Throwable throwable) { throw new InvocationTargetException(throwable); } throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString()); } }

2.2.3 服务提供方返回结果
执行完HeaderExchangeHandler#handleRequest中的handler.reply后,会调用channel.send将调用结果返回给服务消费者
HeaderExchangeHandler#handleRequest
-> HeaderExchangeChannel#send
-> NettyChannel#send
-> NioSocketChannel#writeAndFlush
2.2.4 服务消费方接收服务提供方返回
在服务引入过程中,client会在netty bootstrap中添加类型为NettyClientHandler的handler,当消费方收到服务提供方的返回后,就触发NettyClientHandler中的channelRead方法。
NettyClientHandler#channelRead
-> AbstractPeer#received
-> MultiMessageHandler#received
-> HeartbeatHandler#received
-> AllChannelHandler#received
-> ExecutorService#execute(Runnable)
-> DecodeHandler#received
-> HeaderExchangeHandler#received
感觉和服务提供者收到请求差不多。但是,从这里开始执行的是HeaderExchangeHandler#handleResponse。
static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } }public static void received(Channel channel, Response response) { received(channel, response, false); }public static void received(Channel channel, Response response, boolean timeout) { try { // 在 HeaderExchangeChannel#request,中创建实例时添加到 FUTURES 中 DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { Timeout t = future.timeoutCheckTask; // 没有超时则取消超时任务 if (!timeout) { // decrease Time t.cancel(); } future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } }private void doReceived(Response res) { if (res == null) { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { this.complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { this.completeExceptionally(new RemotingException(channel, res.getErrorMessage())); } }

接下来会调用FutureAdapter
public FutureAdapter(CompletableFuture future) { this.appResponseFuture = future; future.whenComplete((appResponse, t) -> { if (t != null) { if (t instanceof CompletionException) { t = t.getCause(); } this.completeExceptionally(t); } else { if (appResponse.hasException()) { this.completeExceptionally(appResponse.getException()); } else { #1 this.complete((V) appResponse.getValue()); } } }); }

从#1代码可以看出来,已经取出来了服务提供方返回的结果。
这里的this.complete就是层层调用complete,将结果不断的向上传递。
系列文章大致的介绍了一下dubbo的一些源码。但是依然有许多的内容没有介绍到,如:编码解码,返回结果的各种适配器,dubbo与spring的相关结合等等。文章中如有错误欢迎指出,一起讨论,互相成就。

    推荐阅读