1、接口 IRule
public interface IRule{
//选择server
public Server choose(Object key);
//设置LoadBalancer
public void setLoadBalancer(ILoadBalancer lb);
//获取LoadBalancer
public ILoadBalancer getLoadBalancer();
}
2、接口的抽象实现类 AbstractLoadBalancerRule
public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {
private ILoadBalancer lb;
@Override
public void setLoadBalancer(ILoadBalancer lb){
this.lb = lb;
}
@Override
public ILoadBalancer getLoadBalancer(){
return lb;
}
}
上一篇中,IRule是在 BaseLoadBalancer 中做了一个绑定,在这里,在接口中设置了Loadbalancer
3、轮询 RoundRobinRule
public class RoundRobinRule extends AbstractLoadBalancerRule {
//下一个计数
private AtomicInteger nextServerCyclicCounter;
private static final boolean AVAILABLE_ONLY_SERVERS = true;
private static final boolean ALL_SERVERS = false;
private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);
public RoundRobinRule() {
nextServerCyclicCounter = new AtomicInteger(0);
}public RoundRobinRule(ILoadBalancer lb) {
this();
setLoadBalancer(lb);
}public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}Server server = null;
//尝试次数
int count = 0;
while (server == null && count++ < 10) {
//可用的servers
List reachableServers = lb.getReachableServers();
//所有的servers
List allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
//获取server,递增计数并取模
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}// Next.
server = null;
}
//尝试10次还没有获取到server,则返回空
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
//递增计数并取模
private int incrementAndGetModulo(int modulo) {
for (;
;
) {
int current = nextServerCyclicCounter.get();
int next = (current + 1) % modulo;
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}
@Override
public Server choose(Object key) {
return choose(getLoadBalancer(), key);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}
4、权重轮询WeightedResponseTimeRule 将server的应答时间的平均值或者百分比作为权重
public class WeightedResponseTimeRule extends RoundRobinRule {public static final IClientConfigKey WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY = new IClientConfigKey() {
@Override
public String key() {
return "ServerWeightTaskTimerInterval";
}@Override
public String toString() {
return key();
}@Override
public Class type() {
return Integer.class;
}
};
//默认时间周期 30秒
public static final int DEFAULT_TIMER_INTERVAL = 30 * 1000;
//权重计算定时时间 默认 30秒
private int serverWeightTaskTimerInterval = DEFAULT_TIMER_INTERVAL;
private static final Logger logger = LoggerFactory.getLogger(WeightedResponseTimeRule.class);
//服务的权重值,后面的会叠加前面的
private volatile List accumulatedWeights = new ArrayList();
private final Random random = new Random();
protected Timer serverWeightTimer = null;
protected AtomicBoolean serverWeightAssignmentInProgress = new AtomicBoolean(false);
String name = "unknown";
public WeightedResponseTimeRule() {
super();
}public WeightedResponseTimeRule(ILoadBalancer lb) {
super(lb);
}@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
if (lb instanceof BaseLoadBalancer) {
name = ((BaseLoadBalancer) lb).getName();
}
initialize(lb);
}void initialize(ILoadBalancer lb) {
if (serverWeightTimer != null) {
serverWeightTimer.cancel();
}
serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"
+ name, true);
serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,
serverWeightTaskTimerInterval);
// do a initial run
ServerWeight sw = new ServerWeight();
sw.maintainWeights();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
logger
.info("Stopping NFLoadBalancer-serverWeightTimer-"
+ name);
serverWeightTimer.cancel();
}
}));
}public void shutdown() {
if (serverWeightTimer != null) {
logger.info("Stopping NFLoadBalancer-serverWeightTimer-" + name);
serverWeightTimer.cancel();
}
}List getAccumulatedWeights() {
return Collections.unmodifiableList(accumulatedWeights);
}
//选择server
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "https://www.it610.com/article/RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
@Override
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
return null;
}
Server server = null;
while (server == null) {
//获取权重值
List currentWeights = accumulatedWeights;
if (Thread.interrupted()) {
return null;
}
//获取所有server
List allList = lb.getAllServers();
int serverCount = allList.size();
if (serverCount == 0) {
return null;
}int serverIndex = 0;
//权重值集合的最后一个,代表权重值得和
double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);
// 没有server或者权重值没有初始化,退化到 轮询 rule
if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
server =super.choose(getLoadBalancer(), key);
if(server == null) {
return server;
}
} else {
// 创建一个随机值,0~maxTotalWeight
double randomWeight = random.nextDouble() * maxTotalWeight;
// pick the server index based on the randomIndex
int n = 0;
//选择随机数小于等于权重值的server
for (Double d : currentWeights) {
if (d >= randomWeight) {
serverIndex = n;
break;
} else {
n++;
}
}server = allList.get(serverIndex);
}if (server == null) {
/* Transient. */
Thread.yield();
continue;
}if (server.isAlive()) {
return (server);
}// Next.
server = null;
}
return server;
}
//权重值更新定时器
class DynamicServerWeightTask extends TimerTask {
public void run() {
ServerWeight serverWeight = new ServerWeight();
try {
serverWeight.maintainWeights();
} catch (Exception e) {
logger.error("Error running DynamicServerWeightTask for {}", name, e);
}
}
}
//权重值更新逻辑
class ServerWeight {public void maintainWeights() {
//获取负载均衡器
ILoadBalancer lb = getLoadBalancer();
if (lb == null) {
return;
}if (!serverWeightAssignmentInProgress.compareAndSet(false,true)){
return;
}try {
logger.info("Weight adjusting job started");
AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
//负载均衡器的状态
LoadBalancerStats stats = nlb.getLoadBalancerStats();
if (stats == null) {
// no statistics, nothing to do
return;
}
//计算所有server的平均相应时间的综合
double totalResponseTime = 0;
for (Server server : nlb.getAllServers()) {
// 获取服务状态
ServerStats ss = stats.getSingleServerStat(server);
totalResponseTime += ss.getResponseTimeAvg();
}
// 计算每个server的权重
//weightSoFar + totalResponseTime - server的平均响应时间
Double weightSoFar = 0.0;
List finalWeights = new ArrayList();
for (Server server : nlb.getAllServers()) {
ServerStats ss = stats.getSingleServerStat(server);
double weight = totalResponseTime - ss.getResponseTimeAvg();
weightSoFar += weight;
finalWeights.add(weightSoFar);
}
setWeights(finalWeights);
} catch (Exception e) {
logger.error("Error calculating server weights", e);
} finally {
serverWeightAssignmentInProgress.set(false);
}}
}void setWeights(List weights) {
this.accumulatedWeights = weights;
}@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
super.initWithNiwsConfig(clientConfig);
serverWeightTaskTimerInterval = clientConfig.get(WEIGHT_TASK_TIMER_INTERVAL_CONFIG_KEY, DEFAULT_TIMER_INTERVAL);
}}
5、ClientConfigEnabledRoundRobinRule
public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule {RoundRobinRule roundRobinRule = new RoundRobinRule();
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
roundRobinRule = new RoundRobinRule();
}@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
roundRobinRule.setLoadBalancer(lb);
}@Override
public Server choose(Object key) {
if (roundRobinRule != null) {
return roundRobinRule.choose(key);
} else {
throw new IllegalArgumentException(
"This class has not been initialized with the RoundRobinRule class");
}
}}
这个策略什么也没有做,直接依赖了一个RoundRobinRule,但是它是这些策略的基础类,提供了一个兜底能力。
【源码分析|Spring Cloud Ribbon 负载均衡策略】BestAvailableRule,PredicateBasedRule,ZoneAvoidanceRule,AvailabilityFilteringRule。
6、BestAvailableRule 最小连接数
public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {private LoadBalancerStats loadBalancerStats;
@Override
public Server choose(Object key) {
if (loadBalancerStats == null) {
return super.choose(key);
}
List serverList = getLoadBalancer().getAllServers();
int minimalConcurrentConnections = Integer.MAX_VALUE;
long currentTime = System.currentTimeMillis();
Server chosen = null;
//遍历所有server
for (Server server: serverList) {
//获取server的状态
ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
//如果没有熔断
if (!serverStats.isCircuitBreakerTripped(currentTime)) {
//当前server连接数
int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
//判断当前可用server的连接数是否小于最小连接数
//将最小连接数赋值当前连接数,循环后得到最小连接的server
if (concurrentConnections < minimalConcurrentConnections) {
minimalConcurrentConnections = concurrentConnections;
chosen = server;
}
}
}
//如果没有选择出server,则直接调用父类的方法选择server
if (chosen == null) {
return super.choose(key);
} else {
return chosen;
}
}@Override
public void setLoadBalancer(ILoadBalancer lb) {
super.setLoadBalancer(lb);
if (lb instanceof AbstractLoadBalancer) {
loadBalancerStats = ((AbstractLoadBalancer) lb).getLoadBalancerStats();
}
}
}
7、PredicateBasedRule 断言基础策略
public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
//获取断言
public abstract AbstractServerPredicate getPredicate();
//选择server,在过滤后进行轮询
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
Optional server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
if (server.isPresent()) {
return server.get();
} else {
return null;
}
}
}
8、AvailabilityFilteringRule 可用性过滤
public class AvailabilityFilteringRule extends PredicateBasedRule {private AbstractServerPredicate predicate;
public AvailabilityFilteringRule() {
super();
predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null))
.addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
.build();
}@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, clientConfig))
.addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
.build();
}//获得可用服务的数量
@Monitor(name="AvailableServersCount", type = DataSourceType.GAUGE)
public int getAvailableServersCount() {
ILoadBalancer lb = getLoadBalancer();
List servers = lb.getAllServers();
if (servers == null) {
return 0;
}
return Collections2.filter(servers, predicate.getServerOnlyPredicate()).size();
} //用轮询策略选择server,再判断是否可用,最大尝试十次
@Override
public Server choose(Object key) {
int count = 0;
Server server = roundRobinRule.choose(key);
while (count++ <= 10) {
//predicate.apply 是否不用跳过
if (predicate.apply(new PredicateKey(server))) {
return server;
}
server = roundRobinRule.choose(key);
}
return super.choose(key);
}@Override
public AbstractServerPredicate getPredicate() {
return predicate;
}
}
9、ZoneAvoidanceRule
public class ZoneAvoidanceRule extends PredicateBasedRule {private static final Random random = new Random();
private CompositePredicate compositePredicate;
public ZoneAvoidanceRule() {
super();
ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);
AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);
compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {
return CompositePredicate.withPredicates(p1, p2)
.addFallbackPredicate(p2)
.addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
.build();
}@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig);
AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig);
compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
}static Map createSnapshot(LoadBalancerStats lbStats) {
Map map = new HashMap();
for (String zone : lbStats.getAvailableZones()) {
ZoneSnapshot snapshot = lbStats.getZoneSnapshot(zone);
map.put(zone, snapshot);
}
return map;
}static String randomChooseZone(Map snapshot,
Set chooseFrom) {
if (chooseFrom == null || chooseFrom.size() == 0) {
return null;
}
String selectedZone = chooseFrom.iterator().next();
if (chooseFrom.size() == 1) {
return selectedZone;
}
int totalServerCount = 0;
for (String zone : chooseFrom) {
totalServerCount += snapshot.get(zone).getInstanceCount();
}
int index = random.nextInt(totalServerCount) + 1;
int sum = 0;
for (String zone : chooseFrom) {
sum += snapshot.get(zone).getInstanceCount();
if (index <= sum) {
selectedZone = zone;
break;
}
}
return selectedZone;
}
//获取可用区域
public static Set getAvailableZones(
Map snapshot, double triggeringLoad,
double triggeringBlackoutPercentage) {
if (snapshot.isEmpty()) {
return null;
}
//获取可用区域名称
Set availableZones = new HashSet(snapshot.keySet());
if (availableZones.size() == 1) {
return availableZones;
}
Set worstZones = new HashSet();
double maxLoadPerServer = 0;
boolean limitedZoneAvailability = false;
//遍历区域快照
for (Map.Entry zoneEntry : snapshot.entrySet()) {
//区域名称
String zone = zoneEntry.getKey();
//区域快照
ZoneSnapshot zoneSnapshot = zoneEntry.getValue();
//区域实例
int instanceCount = zoneSnapshot.getInstanceCount();
//该区域没有服务,移除
if (instanceCount == 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
double loadPerServer = zoneSnapshot.getLoadPerServer();
//该区域服务的负载小于0,或者服务的故障率大于等于阈值,移除
if (((double) zoneSnapshot.getCircuitTrippedCount())
/ instanceCount >= triggeringBlackoutPercentage
|| loadPerServer < 0) {
availableZones.remove(zone);
limitedZoneAvailability = true;
} else {
//选出平均负载最差的区域
if (Math.abs(loadPerServer - maxLoadPerServer) < 0.000001d) {
worstZones.add(zone);
} else if (loadPerServer > maxLoadPerServer) {
maxLoadPerServer = loadPerServer;
worstZones.clear();
worstZones.add(zone);
}
}
}
}
//最大负载小于阈值并且没有被限制的区域
if (maxLoadPerServer < triggeringLoad && !limitedZoneAvailability) {
// zone override is not needed here
return availableZones;
}
//从最差区域中轮询一个,然后从可用区域中移除
String zoneToAvoid = randomChooseZone(snapshot, worstZones);
if (zoneToAvoid != null) {
availableZones.remove(zoneToAvoid);
}
return availableZones;
}
//获取可用的区域
public static Set getAvailableZones(LoadBalancerStats lbStats,
double triggeringLoad, double triggeringBlackoutPercentage) {
if (lbStats == null) {
return null;
}
//生成区域快照
Map snapshot = createSnapshot(lbStats);
return getAvailableZones(snapshot, triggeringLoad,
triggeringBlackoutPercentage);
}@Override
public AbstractServerPredicate getPredicate() {
return compositePredicate;
}
}
推荐阅读
- mall学习教程|SpringBoot实现Excel导入导出,性能爆表,用起来够优雅~
- java|SpringBoot 实现 Excel 导入导出性能爆表用起来够优雅
- Flink双流join的3种方式及IntervalJoin源码分析
- Kafka 负载均衡在 vivo 的落地实践
- Spring|Spring Cloud(2020.0.3) | 从入门到入土 - 3. 微服务远程调用
- 微服务|微服务(nacos配置管理&&feign远程调用&&Gateway服务网关)
- 微服务springcloud|SpringCould 快速入门二(nacos配置和Fegin远程调用)
- springMVC
- springboot|springboot框架(9):SSMP综合开发