入手 继上一篇不成熟的源码分析经历之后,为了搞清楚Consumer
是如何与Provider
通信的,于是又一言不合翻看起了源码。好,进入正题,依旧从RegistryDirectory
这个核心类入手:
// 这里的入参urls是所有可用的provider的url
private Map> toInvokers(List urls) {
Map> newUrlInvokerMap = new HashMap>();
if(urls == null || urls.size() == 0){
return newUrlInvokerMap;
}
Set keys = new HashSet();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
//如果reference端配置了protocol,则只选择匹配的protocol
if (queryProtocols != null && queryProtocols.length() >0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
if (! ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()
+ ", supported protocol: "+ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
URL url = mergeUrl(providerUrl);
String key = url.toFullString();
// URL参数是排序的
if (keys.contains(key)) { // 重复URL
continue;
}
keys.add(key);
// 缓存key为没有合并消费端参数的URL,不管消费端如何合并参数,如果服务端URL发生变化,则重新refer
Map> localUrlInvokerMap = this.urlInvokerMap;
// local reference
Invoker invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // 缓存中没有,重新refer
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = ! url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
invoker = new InvokerDelegete(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:"+serviceType+",url:("+url+")" + t.getMessage(), t);
}
if (invoker != null) { // 将新的引用放入缓存
newUrlInvokerMap.put(key, invoker);
}
}else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
上面一段代码虽然很长,但是其实做的事情比较简单:就是把
Provider
的URL
封装成对应的Invoker
,我们只需要关注下面这一行:invoker = new InvokerDelegete(protocol.refer(serviceType, url), url, providerUrl);
这里的
protocol
的真正的实现是DubboProtocol
。不过其上还被两层包装类给包装了(这里涉及到dubbo
的扩展点自动包装),分别是ProtocolListenerWrapper
和ProtocolFilterWrapper
。其中ProtocolListenerWrapper
添加了监听器的功能,而ProtocolFilterWrapper
添加了过滤器功能。调用链Filter的实现
ProtocolListenerWrapper
比较简单,这边就不展开看源码了,ProtocolFilterWrapper
还是有必要看一下的,主要就看buildInvokerChain
方法:private static Invoker buildInvokerChain(final Invoker invoker, String key, String group) {
Invoker last = invoker;
List filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (filters.size() > 0) {
for (int i = filters.size() - 1;
i >= 0;
i --) {
final Filter filter = filters.get(i);
final Invoker next = last;
last = new Invoker() {public Class getInterface() {
return invoker.getInterface();
}public URL getUrl() {
return invoker.getUrl();
}public boolean isAvailable() {
return invoker.isAvailable();
}public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}public void destroy() {
invoker.destroy();
}@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
这里会按照
Filter
列表的顺序生成一个FilterChain
,实际上是利用了Invoker
来实现链式调用。这里每个Filter
都会生成一个Invoker
,然后该Invoker
的invoke
方法会调用自身Filter
中的invoke
方法。而最后一个Filter
中的invoke
方法的Invoker
参数则是真正我们最后调用远程服务的Invoker
,也就是DubboInvoker
。DubboProtocol实现 通过两层包装类之后,就要调用到真正的
Protocol
实现类——DubboProtocol
了,我们看看其中的refer
方法干了些什么:public Invoker refer(Class serviceType, URL url) throws RpcException {
// create rpc invoker.
DubboInvoker invoker = new DubboInvoker(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
可以看到返回的
invoker
是一个DubboInvoker
,并且在创建这个DubboInvoker
时,我们注意到第三个参数是一个ExchangeClient[]
类型的参数,从名字上很容易猜想出其中可能涉及了两端的通讯:private ExchangeClient[] getClients(URL url){
//是否共享连接
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
//如果connections不配置,则共享连接,否则每服务每连接
if (connections == 0){
service_share_connect = true;
connections = 1;
}ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0;
i < clients.length;
i++) {
if (service_share_connect){
clients[i] = getSharedClient(url);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
默认情况下都是共享连接的,也就是
Consumer
和Provider
之间不管有多少个Service
,都只共享一条连接:private ExchangeClient getSharedClient(URL url){
String key = url.getAddress();
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if ( client != null ){
if ( !client.isClosed()){
client.incrementAndGetCount();
return client;
} else {
referenceClientMap.remove(key);
}
}
ExchangeClient exchagneclient = initClient(url);
client = new ReferenceCountExchangeClient(exchagneclient, ghostClientMap);
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
return client;
}
下面来看看客户端连接的初始化过程:
private ExchangeClient initClient(URL url) {// client type setting.
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
boolean compatible = (version != null && version.startsWith("1.0."));
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
//默认开启heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// BIO存在严重性能问题,暂时不允许使用
if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}ExchangeClient client ;
try {
//设置连接应该是lazy的
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)){
client = new LazyConnectExchangeClient(url ,requestHandler);
} else {
client = Exchangers.connect(url ,requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url
+ "): " + e.getMessage(), e);
}
return client;
}
默认不是lazy的,也就是说,在初始化时
Consumer
就直接与Provider
建立了一条连接。请求调用细节 而建立连接之后,后续就是利用这条连接来进行通讯了,让我们来跟踪一下客户端最终是如何通过这条连接来发送调用请求的。这里需要提一句,关于
Dubbo
中的Invoker
,在客户端这一块,我个人认为应该要分成两类。一类的作用是将整个集群伪装成一个Invoker
,这类Invoker
的典型特征是都继承于AbstractClusterInvoker
。而另外一类则是真正可调用的,也就是类似DubboInvoker
这类,下面我们就来看看DubboInvoker
中的核心方法:protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) {
// 这里还有一个参数isSent,表明是否等待消息发出
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter
这里可以很清晰地看到,远程调用分三种情况:
1. 不需要返回值的调用(所谓oneWay)
2. 异步(async)
3. 同步
对于第一种情况,客户端只管发请求就完了,不考虑返回结果。
对于第二种情况,客户端除了发请求,还需要将结果塞到一个ThreadLocal变量中,以便于客户端get返回值
对于第三种情况,客户端除了发请求,还会同步等待返回结果
看了源代码之后,我们再来看看官方文档上对于同步/异步调用的描述:
文章图片
【dubbo|从Consumer分析Dubbo调用链】是不是很清晰?
推荐阅读
- Dubbo使用Hessian2序列化时针对Byte类型出现java.lang.ClassCastException
- Android源码|okhttp源码分析(一)——基本流程(超详细)
- Android|App启动(一)Application的创建
- Android|Flutter的绘制流程简述
- Android基础|后台服务中弹出激活设备管理器界面失败原因定位
- Android|Android 高手进阶,自定义圆形进度条
- 调试示例源码|Android 将MAP格式数据写入XML 将XMP文件读MAP数据格式中
- Android基础|Android AsyncTask 源码详细解析,掌握工作原理和细节