【Java架构|dubbo - 负载均衡调用过程】开篇
这篇文章的主要目的是讲清楚ClusterInvoker存在多个invoker对象进行负载均衡的调用过程,也就描述从调用到负载均衡选择的调用链路。
【加群】:857565362
selector 调用时序图
文章图片
说明:
- RegistryProtocol的doRefer()方法内部cluster.join()负责创建ClusterInvoker对象,所有的cluster的invoker的选择逻辑都在这个函数实现。
- FailoverClusterInvoker继承自AbstractClusterInvoker,AbstractClusterInvoker抽象出了通用的所有类型ClusterInvoker需要使用的方法,核心的如select、doSelect、invoke等通用方法,将具体不同实现的doInvoke方法抽象给具体实现类实现。
- AbstractClusterInvoker的invoke()方法作为调用的入口,内部功能主要是根据loadBalance策略选择invoker进行执行。
- AbstractClusterInvoker的select() -> doSelect()方法内部根据不同loadBalance策略实现invoker的调用。
- 通过选择具体的invoker对象后调用invoke()方法实现远程调用。
- 【加群】:857565362
dubbo cluster 调用过程源码
FailoverCluster对象创建入口
说明: - 核心在于cluster.join()方法会调用到如FailoverCluster类,建议通过debug打断点方式跟踪调用栈。
public class RegistryProtocol implements Protocol {private Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url) {
// 构建RegistryDirectory,可以把它理解为注册资源,其中包含了消费者/服务/路由等相关信息,其同时也是回调监听器
RegistryDirectory directory = new RegistryDirectory(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map, String> parameters = new HashMap, String>(directory.getUrl().getParameters());
// 构建subscribeUrl信息,主要拼接consumer:xxx的url地址
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
// 向注册中心注册服务消费者
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}// 从注册中心订阅服务提供者(即引用的服务)
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
// 从invoker当中选择其中一个返回
Invoker invoker = cluster.join(directory);
// 注册消费者
ProviderConsumerRegTable.registerConsuemr(invoker, url, subscribeUrl, directory);
return invoker;
}
}
FailoverClusterInvoker 负载均衡器举例
- 按照以下流程进行进行invoker的选择:AbstractClusterInvoker.invoke() -> FailoverClusterInvoker.doInvoke() -> AbstractClusterInvoker. select() -> AbstractClusterInvoker.doselect() -> AbstractLoadBalance.select()。
- invoke()和select()调用链的关键路径在于invoke由父类触发后在实现类FailoverClusterInvoker执行,然后调用的负载均衡因为是通用的所以在父类AbstractClusterInvoker中实现。
- directory.list(invocation)负责选择从ReigstryDeirectory获取服务引用的invokers列表进行负载均衡选择。
- invoker选择的过程中,在AbstractClusterInvoker.invoke()阶段会根据规则生成LoadBalance对象进行invoker的选择,在调用过程中生成LoadBalance的对象。
- LoadBalance的选择是按照URL中配置的信息负载均衡器进行选择,如果没有配置就走默认的负载均衡器。
- AbstractLoadBalance作为负载均衡器基类,提供了select()的执行流程,具体的doSelect动作由各个实现类去实现。
public class FailoverCluster implements Cluster {public final static String NAME = "failover";
public Invoker join(Directory directory) throws RpcException {
return new FailoverClusterInvoker(directory);
}
}public class FailoverClusterInvoker extends AbstractClusterInvoker {private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);
public FailoverClusterInvoker(Directory directory) {
super(directory);
}@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException {
List> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null;
// last exception.
List> invoked = new ArrayList>(copyinvokers.size());
// invoked invokers.
Set> providers = new HashSet>(len);
for (int i = 0;
i < len;
i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
// check again
checkInvokers(copyinvokers, invocation);
}
Invoker invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
return result;
} catch (RpcException e) {} catch (Throwable e) {} finally {
providers.add(invoker.getUrl().getAddress());
}
}
}
}public abstract class AbstractClusterInvoker implements Invoker {public Result invoke(final Invocation invocation) throws RpcException {checkWhetherDestroyed();
LoadBalance loadbalance;
List> invokers = list(invocation);
if (invokers != null && invokers.size() > 0) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}protected List> list(Invocation invocation) throws RpcException {
List> invokers = directory.list(invocation);
return invokers;
}protected Invoker select(LoadBalance loadbalance, Invocation invocation, List> invokers, List> selected) throws RpcException {
if (invokers == null || invokers.size() == 0)
return null;
String methodName = invocation == null ? "" : invocation.getMethodName();
boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY);
{
//ignore overloaded method
if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
stickyInvoker = null;
}
//ignore cucurrent problem
if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
if (availablecheck && stickyInvoker.isAvailable()) {
return stickyInvoker;
}
}
}
Invoker invoker = doselect(loadbalance, invocation, invokers, selected);
if (sticky) {
stickyInvoker = invoker;
}
return invoker;
}private Invoker doselect(LoadBalance loadbalance, Invocation invocation, List> invokers, List> selected) throws RpcException {
if (invokers == null || invokers.size() == 0)
return null;
if (invokers.size() == 1)
return invokers.get(0);
// If we only have two invokers, use round-robin instead.
if (invokers.size() == 2 && selected != null && selected.size() > 0) {
return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
}
Invoker invoker = loadbalance.select(invokers, getUrl(), invocation);
//If the `invoker` is in the`selected` or invoker is unavailable && availablecheck is true, reselect.
if ((selected != null && selected.contains(invoker))
|| (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
try {
Invoker rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if (rinvoker != null) {
invoker = rinvoker;
} else {
//Check the index of current selected invoker, if it's not the last one, choose the one at index+1.
int index = invokers.indexOf(invoker);
try {
//Avoid collision
invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invoker;
} catch (Exception e) {
logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e);
}
}
} catch (Throwable t) {
logger.error("clustor relselect fail reason is :" + t.getMessage() + " if can not slove ,you can set cluster.availablecheck=false in url", t);
}
}
return invoker;
}
}
AbstractLoadBalance执行过程
- AbstractLoadBalance的select()方法作为负载均衡选择器的入口位置,提供通用的getWeight()方法获取权重。
- AbstractLoadBalance的doSelect()由具体的实现类实现。
public abstract class AbstractLoadBalance implements LoadBalance {static int calculateWarmupWeight(int uptime, int warmup, int weight) {
int ww = (int) ((float) uptime / ((float) warmup / (float) weight));
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}public Invoker select(List> invokers, URL url, Invocation invocation) {
if (invokers == null || invokers.size() == 0)
return null;
if (invokers.size() == 1)
return invokers.get(0);
return doSelect(invokers, url, invocation);
}protected abstract Invoker doSelect(List> invokers, URL url, Invocation invocation);
protected int getWeight(Invoker> invoker, Invocation invocation) {
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
int uptime = (int) (System.currentTimeMillis() - timestamp);
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight;
}
}
我这儿整理了比较全面的JAVA相关的面试资料,
需要领取面试资料的同学,请加群:473984645
文章图片
推荐阅读
- Java|Java基础——数组
- 人工智能|干货!人体姿态估计与运动预测
- java简介|Java是什么(Java能用来干什么?)
- Java|规范的打印日志
- Linux|109 个实用 shell 脚本
- 程序员|【高级Java架构师系统学习】毕业一年萌新的Java大厂面经,最新整理
- Spring注解驱动第十讲--@Autowired使用
- SqlServer|sql server的UPDLOCK、HOLDLOCK试验
- jvm|【JVM】JVM08(java内存模型解析[JMM])
- 技术|为参加2021年蓝桥杯Java软件开发大学B组细心整理常见基础知识、搜索和常用算法解析例题(持续更新...)