源码分析|Spring Cloud Ribbon 负载均衡策略

1、接口 IRule

public interface IRule{ //选择server public Server choose(Object key); //设置LoadBalancer public void setLoadBalancer(ILoadBalancer lb); //获取LoadBalancer public ILoadBalancer getLoadBalancer(); }

2、接口的抽象实现类 AbstractLoadBalancerRule
public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware { private ILoadBalancer lb; @Override public void setLoadBalancer(ILoadBalancer lb){ this.lb = lb; } @Override public ILoadBalancer getLoadBalancer(){ return lb; } }

上一篇中,IRule是在 BaseLoadBalancer 中做了一个绑定,在这里,在接口中设置了Loadbalancer
3、轮询 RoundRobinRule
public class RoundRobinRule extends AbstractLoadBalancerRule { //下一个计数 private AtomicInteger nextServerCyclicCounter; private static final boolean AVAILABLE_ONLY_SERVERS = true; private static final boolean ALL_SERVERS = false; private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class); public RoundRobinRule() { nextServerCyclicCounter = new AtomicInteger(0); }public RoundRobinRule(ILoadBalancer lb) { this(); setLoadBalancer(lb); }public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { log.warn("no load balancer"); return null; }Server server = null; //尝试次数 int count = 0; while (server == null && count++ < 10) { //可用的servers List reachableServers = lb.getReachableServers(); //所有的servers List allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size(); if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } //获取server,递增计数并取模 int nextServerIndex = incrementAndGetModulo(serverCount); server = allServers.get(nextServerIndex); if (server == null) { /* Transient. */ Thread.yield(); continue; }if (server.isAlive() && (server.isReadyToServe())) { return (server); }// Next. server = null; } //尝试10次还没有获取到server,则返回空 if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; } //递增计数并取模 private int incrementAndGetModulo(int modulo) { for (; ; ) { int current = nextServerCyclicCounter.get(); int next = (current + 1) % modulo; if (nextServerCyclicCounter.compareAndSet(current, next)) return next; } } @Override public Server choose(Object key) { return choose(getLoadBalancer(), key); } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { } }

4、权重轮询WeightedResponseTimeRule 将server的应答时间的平均值或者百分比作为权重
public class WeightedResponseTimeRule extends RoundRobinRule {public static final IClientConfigKey WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY = new IClientConfigKey() { @Override public String key() { return "ServerWeightTaskTimerInterval"; }@Override public String toString() { return key(); }@Override public Class type() { return Integer.class; } }; //默认时间周期 30秒 public static final int DEFAULT_TIMER_INTERVAL = 30 * 1000; //权重计算定时时间 默认 30秒 private int serverWeightTaskTimerInterval = DEFAULT_TIMER_INTERVAL; private static final Logger logger = LoggerFactory.getLogger(WeightedResponseTimeRule.class); //服务的权重值,后面的会叠加前面的 private volatile List accumulatedWeights = new ArrayList(); private final Random random = new Random(); protected Timer serverWeightTimer = null; protected AtomicBoolean serverWeightAssignmentInProgress = new AtomicBoolean(false); String name = "unknown"; public WeightedResponseTimeRule() { super(); }public WeightedResponseTimeRule(ILoadBalancer lb) { super(lb); }@Override public void setLoadBalancer(ILoadBalancer lb) { super.setLoadBalancer(lb); if (lb instanceof BaseLoadBalancer) { name = ((BaseLoadBalancer) lb).getName(); } initialize(lb); }void initialize(ILoadBalancer lb) { if (serverWeightTimer != null) { serverWeightTimer.cancel(); } serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-" + name, true); serverWeightTimer.schedule(new DynamicServerWeightTask(), 0, serverWeightTaskTimerInterval); // do a initial run ServerWeight sw = new ServerWeight(); sw.maintainWeights(); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { logger .info("Stopping NFLoadBalancer-serverWeightTimer-" + name); serverWeightTimer.cancel(); } })); }public void shutdown() { if (serverWeightTimer != null) { logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + name); serverWeightTimer.cancel(); } }List getAccumulatedWeights() { return Collections.unmodifiableList(accumulatedWeights); } //选择server @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "https://www.it610.com/article/RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE") @Override public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; while (server == null) { //获取权重值 List currentWeights = accumulatedWeights; if (Thread.interrupted()) { return null; } //获取所有server List allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { return null; }int serverIndex = 0; //权重值集合的最后一个,代表权重值得和 double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); // 没有server或者权重值没有初始化,退化到 轮询 rule if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) { server =super.choose(getLoadBalancer(), key); if(server == null) { return server; } } else { // 创建一个随机值,0~maxTotalWeight double randomWeight = random.nextDouble() * maxTotalWeight; // pick the server index based on the randomIndex int n = 0; //选择随机数小于等于权重值的server for (Double d : currentWeights) { if (d >= randomWeight) { serverIndex = n; break; } else { n++; } }server = allList.get(serverIndex); }if (server == null) { /* Transient. */ Thread.yield(); continue; }if (server.isAlive()) { return (server); }// Next. server = null; } return server; } //权重值更新定时器 class DynamicServerWeightTask extends TimerTask { public void run() { ServerWeight serverWeight = new ServerWeight(); try { serverWeight.maintainWeights(); } catch (Exception e) { logger.error("Error running DynamicServerWeightTask for {}", name, e); } } } //权重值更新逻辑 class ServerWeight {public void maintainWeights() { //获取负载均衡器 ILoadBalancer lb = getLoadBalancer(); if (lb == null) { return; }if (!serverWeightAssignmentInProgress.compareAndSet(false,true)){ return; }try { logger.info("Weight adjusting job started"); AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb; //负载均衡器的状态 LoadBalancerStats stats = nlb.getLoadBalancerStats(); if (stats == null) { // no statistics, nothing to do return; } //计算所有server的平均相应时间的综合 double totalResponseTime = 0; for (Server server : nlb.getAllServers()) { // 获取服务状态 ServerStats ss = stats.getSingleServerStat(server); totalResponseTime += ss.getResponseTimeAvg(); } // 计算每个server的权重 //weightSoFar + totalResponseTime - server的平均响应时间 Double weightSoFar = 0.0; List finalWeights = new ArrayList(); for (Server server : nlb.getAllServers()) { ServerStats ss = stats.getSingleServerStat(server); double weight = totalResponseTime - ss.getResponseTimeAvg(); weightSoFar += weight; finalWeights.add(weightSoFar); } setWeights(finalWeights); } catch (Exception e) { logger.error("Error calculating server weights", e); } finally { serverWeightAssignmentInProgress.set(false); }} }void setWeights(List weights) { this.accumulatedWeights = weights; }@Override public void initWithNiwsConfig(IClientConfig clientConfig) { super.initWithNiwsConfig(clientConfig); serverWeightTaskTimerInterval = clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, DEFAULT_TIMER_INTERVAL); }}

5、ClientConfigEnabledRoundRobinRule
public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule {RoundRobinRule roundRobinRule = new RoundRobinRule(); @Override public void initWithNiwsConfig(IClientConfig clientConfig) { roundRobinRule = new RoundRobinRule(); }@Override public void setLoadBalancer(ILoadBalancer lb) { super.setLoadBalancer(lb); roundRobinRule.setLoadBalancer(lb); }@Override public Server choose(Object key) { if (roundRobinRule != null) { return roundRobinRule.choose(key); } else { throw new IllegalArgumentException( "This class has not been initialized with the RoundRobinRule class"); } }}

这个策略什么也没有做,直接依赖了一个RoundRobinRule,但是它是这些策略的基础类,提供了一个兜底能力。
【源码分析|Spring Cloud Ribbon 负载均衡策略】BestAvailableRule,PredicateBasedRule,ZoneAvoidanceRule,AvailabilityFilteringRule。
6、BestAvailableRule 最小连接数
public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {private LoadBalancerStats loadBalancerStats; @Override public Server choose(Object key) { if (loadBalancerStats == null) { return super.choose(key); } List serverList = getLoadBalancer().getAllServers(); int minimalConcurrentConnections = Integer.MAX_VALUE; long currentTime = System.currentTimeMillis(); Server chosen = null; //遍历所有server for (Server server: serverList) { //获取server的状态 ServerStats serverStats = loadBalancerStats.getSingleServerStat(server); //如果没有熔断 if (!serverStats.isCircuitBreakerTripped(currentTime)) { //当前server连接数 int concurrentConnections = serverStats.getActiveRequestsCount(currentTime); //判断当前可用server的连接数是否小于最小连接数 //将最小连接数赋值当前连接数,循环后得到最小连接的server if (concurrentConnections < minimalConcurrentConnections) { minimalConcurrentConnections = concurrentConnections; chosen = server; } } } //如果没有选择出server,则直接调用父类的方法选择server if (chosen == null) { return super.choose(key); } else { return chosen; } }@Override public void setLoadBalancer(ILoadBalancer lb) { super.setLoadBalancer(lb); if (lb instanceof AbstractLoadBalancer) { loadBalancerStats = ((AbstractLoadBalancer) lb).getLoadBalancerStats(); } } }

7、PredicateBasedRule 断言基础策略
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule { //获取断言 public abstract AbstractServerPredicate getPredicate(); //选择server,在过滤后进行轮询 @Override public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); Optional server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } } }

8、AvailabilityFilteringRule 可用性过滤
public class AvailabilityFilteringRule extends PredicateBasedRule {private AbstractServerPredicate predicate; public AvailabilityFilteringRule() { super(); predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null)) .addFallbackPredicate(AbstractServerPredicate.alwaysTrue()) .build(); }@Override public void initWithNiwsConfig(IClientConfig clientConfig) { predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, clientConfig)) .addFallbackPredicate(AbstractServerPredicate.alwaysTrue()) .build(); }//获得可用服务的数量 @Monitor(name="AvailableServersCount", type = DataSourceType.GAUGE) public int getAvailableServersCount() { ILoadBalancer lb = getLoadBalancer(); List servers = lb.getAllServers(); if (servers == null) { return 0; } return Collections2.filter(servers, predicate.getServerOnlyPredicate()).size(); } //用轮询策略选择server,再判断是否可用,最大尝试十次 @Override public Server choose(Object key) { int count = 0; Server server = roundRobinRule.choose(key); while (count++ <= 10) { //predicate.apply 是否不用跳过 if (predicate.apply(new PredicateKey(server))) { return server; } server = roundRobinRule.choose(key); } return super.choose(key); }@Override public AbstractServerPredicate getPredicate() { return predicate; } }

9、ZoneAvoidanceRule
public class ZoneAvoidanceRule extends PredicateBasedRule {private static final Random random = new Random(); private CompositePredicate compositePredicate; public ZoneAvoidanceRule() { super(); ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this); AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this); compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate); }private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) { return CompositePredicate.withPredicates(p1, p2) .addFallbackPredicate(p2) .addFallbackPredicate(AbstractServerPredicate.alwaysTrue()) .build(); }@Override public void initWithNiwsConfig(IClientConfig clientConfig) { ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig); AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig); compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate); }static Map createSnapshot(LoadBalancerStats lbStats) { Map map = new HashMap(); for (String zone : lbStats.getAvailableZones()) { ZoneSnapshot snapshot = lbStats.getZoneSnapshot(zone); map.put(zone, snapshot); } return map; }static String randomChooseZone(Map snapshot, Set chooseFrom) { if (chooseFrom == null || chooseFrom.size() == 0) { return null; } String selectedZone = chooseFrom.iterator().next(); if (chooseFrom.size() == 1) { return selectedZone; } int totalServerCount = 0; for (String zone : chooseFrom) { totalServerCount += snapshot.get(zone).getInstanceCount(); } int index = random.nextInt(totalServerCount) + 1; int sum = 0; for (String zone : chooseFrom) { sum += snapshot.get(zone).getInstanceCount(); if (index <= sum) { selectedZone = zone; break; } } return selectedZone; } //获取可用区域 public static Set getAvailableZones( Map snapshot, double triggeringLoad, double triggeringBlackoutPercentage) { if (snapshot.isEmpty()) { return null; } //获取可用区域名称 Set availableZones = new HashSet(snapshot.keySet()); if (availableZones.size() == 1) { return availableZones; } Set worstZones = new HashSet(); double maxLoadPerServer = 0; boolean limitedZoneAvailability = false; //遍历区域快照 for (Map.Entry zoneEntry : snapshot.entrySet()) { //区域名称 String zone = zoneEntry.getKey(); //区域快照 ZoneSnapshot zoneSnapshot = zoneEntry.getValue(); //区域实例 int instanceCount = zoneSnapshot.getInstanceCount(); //该区域没有服务,移除 if (instanceCount == 0) { availableZones.remove(zone); limitedZoneAvailability = true; } else { double loadPerServer = zoneSnapshot.getLoadPerServer(); //该区域服务的负载小于0,或者服务的故障率大于等于阈值,移除 if (((double) zoneSnapshot.getCircuitTrippedCount()) / instanceCount >= triggeringBlackoutPercentage || loadPerServer < 0) { availableZones.remove(zone); limitedZoneAvailability = true; } else { //选出平均负载最差的区域 if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) { worstZones.add(zone); } else if (loadPerServer > maxLoadPerServer) { maxLoadPerServer = loadPerServer; worstZones.clear(); worstZones.add(zone); } } } } //最大负载小于阈值并且没有被限制的区域 if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) { // zone override is not needed here return availableZones; } //从最差区域中轮询一个,然后从可用区域中移除 String zoneToAvoid = randomChooseZone(snapshot, worstZones); if (zoneToAvoid != null) { availableZones.remove(zoneToAvoid); } return availableZones; } //获取可用的区域 public static Set getAvailableZones(LoadBalancerStats lbStats, double triggeringLoad, double triggeringBlackoutPercentage) { if (lbStats == null) { return null; } //生成区域快照 Map snapshot = createSnapshot(lbStats); return getAvailableZones(snapshot, triggeringLoad, triggeringBlackoutPercentage); }@Override public AbstractServerPredicate getPredicate() { return compositePredicate; } }

    推荐阅读