Java架构|dubbo - 负载均衡调用过程

【Java架构|dubbo - 负载均衡调用过程】开篇
这篇文章的主要目的是讲清楚ClusterInvoker存在多个invoker对象进行负载均衡的调用过程,也就描述从调用到负载均衡选择的调用链路。
【加群】:857565362
selector 调用时序图
Java架构|dubbo - 负载均衡调用过程
文章图片

说明:

  • RegistryProtocol的doRefer()方法内部cluster.join()负责创建ClusterInvoker对象,所有的cluster的invoker的选择逻辑都在这个函数实现。
  • FailoverClusterInvoker继承自AbstractClusterInvoker,AbstractClusterInvoker抽象出了通用的所有类型ClusterInvoker需要使用的方法,核心的如select、doSelect、invoke等通用方法,将具体不同实现的doInvoke方法抽象给具体实现类实现。
  • AbstractClusterInvoker的invoke()方法作为调用的入口,内部功能主要是根据loadBalance策略选择invoker进行执行。
  • AbstractClusterInvoker的select() -> doSelect()方法内部根据不同loadBalance策略实现invoker的调用。
  • 通过选择具体的invoker对象后调用invoke()方法实现远程调用。
  • 【加群】:857565362
    dubbo cluster 调用过程源码
    FailoverCluster对象创建入口
    说明:
  • 核心在于cluster.join()方法会调用到如FailoverCluster类,建议通过debug打断点方式跟踪调用栈。
public class RegistryProtocol implements Protocol {private Invoker doRefer(Cluster cluster, Registry registry, Class type, URL url) { // 构建RegistryDirectory,可以把它理解为注册资源,其中包含了消费者/服务/路由等相关信息,其同时也是回调监听器 RegistryDirectory directory = new RegistryDirectory(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map, String> parameters = new HashMap, String>(directory.getUrl().getParameters()); // 构建subscribeUrl信息,主要拼接consumer:xxx的url地址 URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters); if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { // 向注册中心注册服务消费者 registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); }// 从注册中心订阅服务提供者(即引用的服务) directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); // 从invoker当中选择其中一个返回 Invoker invoker = cluster.join(directory); // 注册消费者 ProviderConsumerRegTable.registerConsuemr(invoker, url, subscribeUrl, directory); return invoker; } }

FailoverClusterInvoker 负载均衡器举例
  • 按照以下流程进行进行invoker的选择:AbstractClusterInvoker.invoke() -> FailoverClusterInvoker.doInvoke() -> AbstractClusterInvoker. select() -> AbstractClusterInvoker.doselect() -> AbstractLoadBalance.select()。
  • invoke()和select()调用链的关键路径在于invoke由父类触发后在实现类FailoverClusterInvoker执行,然后调用的负载均衡因为是通用的所以在父类AbstractClusterInvoker中实现。
  • directory.list(invocation)负责选择从ReigstryDeirectory获取服务引用的invokers列表进行负载均衡选择。
  • invoker选择的过程中,在AbstractClusterInvoker.invoke()阶段会根据规则生成LoadBalance对象进行invoker的选择,在调用过程中生成LoadBalance的对象。
  • LoadBalance的选择是按照URL中配置的信息负载均衡器进行选择,如果没有配置就走默认的负载均衡器。
  • AbstractLoadBalance作为负载均衡器基类,提供了select()的执行流程,具体的doSelect动作由各个实现类去实现。
public class FailoverCluster implements Cluster {public final static String NAME = "failover"; public Invoker join(Directory directory) throws RpcException { return new FailoverClusterInvoker(directory); } }public class FailoverClusterInvoker extends AbstractClusterInvoker {private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class); public FailoverClusterInvoker(Directory directory) { super(directory); }@SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException { List> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List> invoked = new ArrayList>(copyinvokers.size()); // invoked invokers. Set> providers = new HashSet>(len); for (int i = 0; i < len; i++) { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if (i > 0) { checkWhetherDestroyed(); copyinvokers = list(invocation); // check again checkInvokers(copyinvokers, invocation); } Invoker invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { Result result = invoker.invoke(invocation); return result; } catch (RpcException e) {} catch (Throwable e) {} finally { providers.add(invoker.getUrl().getAddress()); } } } }public abstract class AbstractClusterInvoker implements Invoker {public Result invoke(final Invocation invocation) throws RpcException {checkWhetherDestroyed(); LoadBalance loadbalance; List> invokers = list(invocation); if (invokers != null && invokers.size() > 0) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }protected List> list(Invocation invocation) throws RpcException { List> invokers = directory.list(invocation); return invokers; }protected Invoker select(LoadBalance loadbalance, Invocation invocation, List> invokers, List> selected) throws RpcException { if (invokers == null || invokers.size() == 0) return null; String methodName = invocation == null ? "" : invocation.getMethodName(); boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY); { //ignore overloaded method if (stickyInvoker != null && !invokers.contains(stickyInvoker)) { stickyInvoker = null; } //ignore cucurrent problem if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) { if (availablecheck && stickyInvoker.isAvailable()) { return stickyInvoker; } } } Invoker invoker = doselect(loadbalance, invocation, invokers, selected); if (sticky) { stickyInvoker = invoker; } return invoker; }private Invoker doselect(LoadBalance loadbalance, Invocation invocation, List> invokers, List> selected) throws RpcException { if (invokers == null || invokers.size() == 0) return null; if (invokers.size() == 1) return invokers.get(0); // If we only have two invokers, use round-robin instead. if (invokers.size() == 2 && selected != null && selected.size() > 0) { return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0); } Invoker invoker = loadbalance.select(invokers, getUrl(), invocation); //If the `invoker` is in the`selected` or invoker is unavailable && availablecheck is true, reselect. if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) { try { Invoker rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); if (rinvoker != null) { invoker = rinvoker; } else { //Check the index of current selected invoker, if it's not the last one, choose the one at index+1. int index = invokers.indexOf(invoker); try { //Avoid collision invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invoker; } catch (Exception e) { logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e); } } } catch (Throwable t) { logger.error("clustor relselect fail reason is :" + t.getMessage() + " if can not slove ,you can set cluster.availablecheck=false in url", t); } } return invoker; } }

AbstractLoadBalance执行过程
  • AbstractLoadBalance的select()方法作为负载均衡选择器的入口位置,提供通用的getWeight()方法获取权重。
  • AbstractLoadBalance的doSelect()由具体的实现类实现。
public abstract class AbstractLoadBalance implements LoadBalance {static int calculateWarmupWeight(int uptime, int warmup, int weight) { int ww = (int) ((float) uptime / ((float) warmup / (float) weight)); return ww < 1 ? 1 : (ww > weight ? weight : ww); }public Invoker select(List> invokers, URL url, Invocation invocation) { if (invokers == null || invokers.size() == 0) return null; if (invokers.size() == 1) return invokers.get(0); return doSelect(invokers, url, invocation); }protected abstract Invoker doSelect(List> invokers, URL url, Invocation invocation); protected int getWeight(Invoker invoker, Invocation invocation) { int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); if (weight > 0) { long timestamp = invoker.getUrl().getParameter(Constants.REMOTE_TIMESTAMP_KEY, 0L); if (timestamp > 0L) { int uptime = (int) (System.currentTimeMillis() - timestamp); int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP); if (uptime > 0 && uptime < warmup) { weight = calculateWarmupWeight(uptime, warmup, weight); } } } return weight; } }

我这儿整理了比较全面的JAVA相关的面试资料,
需要领取面试资料的同学,请加群:473984645
Java架构|dubbo - 负载均衡调用过程
文章图片

    推荐阅读