弓背霞明剑照霜,秋风走马出咸阳。这篇文章主要讲述微服务架构 | 5.4 Sentinel 流控统计和熔断的源码分析相关的知识,希望能为你提供帮助。
【微服务架构 | 5.4 Sentinel 流控统计和熔断的源码分析】@[TOC](5.4 Sentinel 流控、统计和熔断的源码分析)
前言参考资料:
《Spring Microservices in Action》
《Spring Cloud Alibaba 微服务原理与实战》
《B站 尚硅谷 SpringCloud 框架开发教程 周阳》
《Sentinel GitHub 官网》
《Sentinel 官网》
调用链路是 Sentinel 的工作主流程,由各个 Slot 槽组成,将不同的 Slot 槽按照顺序串在一起,从而将不同的功能(限流、降级、系统保护)组合在一起;
本篇《2. 获取 ProcessorSlot 链》将从源码级讲解如何获取调用链路,接着会以遍历链表的方式处理每一个 Slot 槽,其中就有:FlowSlot、StatisticSlot、DegradeSlot 等。分别对应本篇《3. 流控槽实施流控逻辑》、《4. 统计槽实施指标数据统计》和《5. 熔断槽实施服务熔断》;
1. Sentinel 的自动装配
1.2 依赖引入
- 我们引入 Sentinel 的 starter 依赖文件,不需要太多额外操作,即可使用 Sentinel 默认自带的限流功能,原因是这些配置和功能都给我们自动装配了;
- 在 Spring-Cloud-Alibaba-Sentinel 包下的 META-INF/spring.factories 文件里定义了会自动装配哪些类;
文章图片
- SentinelWebAutoConfiguration:对 Web Servlet 环境的支持;
- SentinelWebFluxAutoConfiguration:对 Spring WebFlux 的支持;
- SentinelEndpointAutoConfiguration:暴露 Endpoint 信息;
- SentinelFeignAutoConfiguration:用于适应 Feign 组件;
- SentinelAutoConfiguration:支持对 RestTemplate 的服务调用使用 Sentinel 进行保护;
- 在 SentinelWebAutoConfiguration 配置类中自动装配了一个 FilterRegistrationBean,其主要作用是注册一个 CommonFilter,并且默认情况下通过
/*
规则拦截所有的请求;
@Configuration @EnableConfigurationProperties(SentinelProperties.class) public class SentinelWebAutoConfiguration //省略其他代码@Bean @ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled", matchIfMissing = true) public FilterRegistrationBean sentinelFilter() FilterRegistrationBean< Filter> registration = new FilterRegistrationBean< > (); SentinelProperties.Filter filterConfig = properties.getFilter(); if (filterConfig.getUrlPatterns() == null || filterConfig.getUrlPatterns().isEmpty()) List< String> defaultPatterns = new ArrayList< > (); //默认情况下通过 /* 规则拦截所有的请求 defaultPatterns.add("/*"); filterConfig.setUrlPatterns(defaultPatterns); registration.addUrlPatterns(filterConfig.getUrlPatterns().toArray(new String[0])); //【点进去】注册 CommonFilter Filter filter = new CommonFilter(); registration.setFilter(filter); registration.setOrder(filterConfig.getOrder()); registration.addInitParameter("HTTP_METHOD_SPECIFY", String.valueOf(properties.getHttpMethodSpecify())); log.info("[Sentinel Starter] register Sentinel CommonFilter with urlPatterns: .", filterConfig.getUrlPatterns()); return registration;
1.4 CommonFilter 过滤器 - CommonFilter 过滤器的作用与源码如下:
- 从请求中获取目标 URL;
- 获取 Urlcleaner;
- 对当前 URL 添加限流埋点;
public class CommonFilter implements Filter //省略部分代码public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException
HttpServletRequest sRequest = (HttpServletRequest)request;
Entry urlEntry = null;
try
//解析请求 URL
String target = FilterUtil.filterTarget(sRequest);
//URL 清洗
UrlCleaner urlCleaner = WebCallbackManager.getUrlCleaner();
if (urlCleaner != null)
//如果存在,则说明配置过 URL 清洗策略,替换配置的 targer
target = urlCleaner.clean(target);
if (!StringUtil.isEmpty(target))
String origin = this.parseOrigin(sRequest);
ContextUtil.enter("sentinel_web_servlet_context", origin);
if (this.httpMethodSpecify)
String pathWithHttpMethod = sRequest.getMethod().toUpperCase() + ":" + target;
//使用 SphU.entry() 方法对 URL 添加限流埋点
urlEntry = SphU.entry(pathWithHttpMethod, 1, EntryType.IN);
else
urlEntry = SphU.entry(target, 1, EntryType.IN);
//执行过滤
chain.doFilter(request, response);
catch (BlockException var14)
HttpServletResponse sResponse = (HttpServletResponse)response;
WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse, var14);
catch (ServletException | RuntimeException | IOException var15)
Tracer.traceEntry(var15, urlEntry);
throw var15;
finally
if (urlEntry != null)
urlEntry.exit();
ContextUtil.exit();
1.5 小结
- 对于 Web Servlet 环境,只是通过 Filter 的方式将所有请求自动设置为 Sentinel 的资源,从而达到限流的目的;
- Sentinel 的工作原理主要依靠 ProcessorSlot 链,遍历链中的每一个 Slot 槽,执行相应逻辑; 2.1 Sentinel 源码包结构
- 在 DeBug 之前,我们需要对 Sentinel 的源码包结构做个分析,以找到方法的入口;
模块名 | 说明 |
---|---|
sentinel-adapter | 负责针对主流开源框架进行限流适配,如:Dubbo、gRPC、Zuul 等; |
sentinel-core | Sentinel 核心库,提供限流、熔断等实现; |
sentinel-dashboard | 控制台模块,提供可视化监控和管理; |
sentinel-demo | 官方案例; |
sentinel-extension | 实现不同组件的数据源扩展,如:Nacos、ZooKeeper、Apollo 等; |
sentinel-transport | 通信协议处理模块; |
- Slot 槽是 Sentinel 的核心,因此方法的入口在 sentinel-core 核心库,里面有好多个
SphU.entry()
方法,我们给方法打上断点,DeBug 进入,然后登录 Sentinel 控制台;
文章图片
2.2 获取 ProcessorSlot 链与操作 Slot 槽的入口 CtSph.entryWithPriority()
- 一直进入最终方法的实现在
CtSph.entryWithPriority()
方法里,其主要逻辑与源码如下:- 校验全局上下文 context;
- 构造 ProcessorSlot 链;
- 遍历 ProcessorSlot 链操作 Slot 槽(遍历链表);
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException
Context context = ContextUtil.getContext();
if (context instanceof NullContext)
//上下文量已经超过阈值 ->
只初始化条目,不进行规则检查
return new CtEntry(resourceWrapper, null, context);
if (context == null)
//没有指定上下文 ->
使用默认上下文 context
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
if (!Constants.ON)
//全局开关关闭 ->
没有规则检查
return new CtEntry(resourceWrapper, null, context);
//【断点步入 2.2.1】通过 lookProcessChain 方法获取 ProcessorSlot 链
ProcessorSlot<
Object>
chain = lookProcessChain(resourceWrapper);
if (chain == null)
//表示资源量超过 Constants.MAX_SLOT_CHAIN_SIZE 常量 ->
不会进行规则检查
return new CtEntry(resourceWrapper, null, context);
Entry e = new CtEntry(resourceWrapper, chain, context);
try
//【断点步入 3./4./5.】执行 ProcessorSlot 对 ProcessorSlot 链中的 Slot 槽遍历操作(遍历链表的方式)
chain.entry(context, resourceWrapper, null, count, prioritized, args);
catch (BlockException e1)
e.exit(count, args);
throw e1;
catch (Throwable e1)
//这种情况不应该发生,除非 Sentinel 内部存在错误
RecordLog.info("Sentinel unexpected exception", e1);
return e;
2.2.1 构造 ProcessorSlot 链CtSph.lookProcessChain()
- 进入
CtSph.lookProcessChain()
方法;
ProcessorSlot<
Object>
lookProcessChain(ResourceWrapper resourceWrapper)
//从缓存中获取 slot 调用链
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null)
synchronized (LOCK)
chain = chainMap.get(resourceWrapper);
if (chain == null)
// Entry size limit.
if (chainMap.size() >
= Constants.MAX_SLOT_CHAIN_SIZE)
return null;
//【断点步入】构造 Slot 链(责任链模式)
chain = SlotChainProvider.newSlotChain();
Map<
ResourceWrapper, ProcessorSlotChain>
newMap = new HashMap<
ResourceWrapper, ProcessorSlotChain>
(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
return chain;
- 最终调用
DefaultSlotChainBuilder.build()
方法构造 DefaultProcessorSlotChain;
@Override
public ProcessorSlotChain build()
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
List<
ProcessorSlot>
sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
for (ProcessorSlot slot : sortedSlotList)
if (!(slot instanceof AbstractLinkedProcessorSlot))
RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, cant be added into ProcessorSlotChain");
continue;
chain.addLast((AbstractLinkedProcessorSlot<
?>
) slot);
return chain;
- 可以看到最后 ProcessorSlotChain 链中有 10 个 Slot 插槽:
- 在本篇笔记中我们关注 3 个槽:
- FlowSlot:进行流控规则校验,对应本篇《3. 流控槽实施流控逻辑》;
- StatisticSlot:实现指标数据的统计,对应本篇《4. 统计槽实施指标数据统计》;
- DegradeSlot:服务熔断,对应本篇《5. 熔断槽实施服务熔断》
文章图片
2.2.2 操作 Slot 槽的入口
- 操作 Slot 槽的入口方法是:
ProcessorSlot.entry()
; - 接着会以遍历链表的方式操作每个 Slot 槽,其中就有:FlowSlot、StatisticSlot、DegradeSlot 等。分别对应下面的《3. 流控槽实施流控逻辑》、《4. 统计槽实施指标数据统计》和《5. 熔断槽实施服务熔断》;
- 进入
ProcessorSlot.entry()
方法,它会遍历每个 Slot 插槽,并对其进行操作,其中会经过FlowSlot.entry()
方法(需要提前给该方法打上断点),方法的逻辑跟源码如下:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable
//【断点步入】检查流量规则
checkFlow(resourceWrapper, context, node, count, prioritized);
//调用下一个 Slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
- 进入
FlowSlot.checkFlow()
方法,最终调用FlowRuleChecker.checkFlow()
方法,方法的逻辑和源码如下:- 遍历所有流控规则 FlowRule;
- 针对每个规则调用 canPassCheck 进行校验;
public void checkFlow(Function<
String, Collection<
FlowRule>
>
ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException
if (ruleProvider == null || resource == null)
return;
//【断点步入 3.1】获取流控规则
Collection<
FlowRule>
rules = ruleProvider.apply(resource.getName());
if (rules != null)
//遍历所有流控规则 FlowRule
for (FlowRule rule : rules)
//【点进去 3.2】校验每条规则
if (!canPassCheck(rule, context, node, count, prioritized))
throw new FlowException(rule.getLimitApp(), rule);
3.1 获取流控规则 FlowSlot.ruleProvider.apply()
- 进入
FlowSlot.ruleProvider.apply()
方法,获取到 Sentinel 控制台上的流控规则;
private final Function<
String, Collection<
FlowRule>
>
ruleProvider = new Function<
String, Collection<
FlowRule>
>
()
@Override
public Collection<
FlowRule>
apply(String resource)
// Flow rule map should not be null.
Map<
String, List<
FlowRule>
>
flowRules = FlowRuleManager.getFlowRuleMap();
return flowRules.get(resource);
;
3.2 校验每条规则 FlowRuleChecker.canPassCheck()
- 进入
FlowRuleChecker.canPassCheck()
方法,分集群和单机模式校验每条规则;
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized)
String limitApp = rule.getLimitApp();
if (limitApp == null)
return true;
//集群模式
if (rule.isClusterMode())
return passClusterCheck(rule, context, node, acquireCount, prioritized);
//【点进去】单机模式
return passLocalCheck(rule, context, node, acquireCount, prioritized);
- 由于我们是单机模式,进入
FlowRuleChecker.passLocalCheck()
方法,其主要逻辑和源码如下:- 根据来源和策略获取 Node,从而拿到统计的 runtime 信息;
- 使用流量控制器检查是否让流量通过;
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized)
//【点进去 3.2.1】获取 Node
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null)
return true;
//【点进去 3.2.2】获取流控的处理策略
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
3.2.1 获取 Node FlowRuleChecker.selectNodeByRequesterAndStrategy()
- 进入
FlowRuleChecker.selectNodeByRequesterAndStrategy()
方法,其根据 FlowRule 中配置的 Strategy 和 limitApp 属性,返回不同处理策略的 Node;
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node)
//limitApp 不能为空
String limitApp = rule.getLimitApp();
int strategy = rule.getStrategy();
String origin = context.getOrigin();
//场景1:限流规则设置了具体应用,如果当前流量就是通过该应用的,则命中场景1
if (limitApp.equals(origin) &
&
filterOrigin(origin))
if (strategy == RuleConstant.STRATEGY_DIRECT)
// Matches limit origin, return origin statistic node.
return context.getOriginNode();
return selectReferenceNode(rule, context, node);
else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp))
//场景2:限流规则未指定任何具体应,默认为default,则当前流量直接命中场景2
if (strategy == RuleConstant.STRATEGY_DIRECT)
// Return the cluster node.
return node.getClusterNode();
return selectReferenceNode(rule, context, node);
else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) &
&
FlowRuleManager.isOtherOrigin(origin, rule.getResource()))
//场景3:限流规则设置的是other,当前流量未命中前两种场景
if (strategy == RuleConstant.STRATEGY_DIRECT)
return context.getOriginNode();
return selectReferenceNode(rule, context, node);
return null;
- 假设我们对接口 UserService 配置限流 1000 QPS,这 3 种场景分别如下:
- 场景 1:目的是优先保障重要来源的流量。我们需要区分调用来源,将限流规则细化。对A应用配置500QPS,对B应用配置200QPS,此时会产生两条规则:A应用请求的流量限制在500,B应用请求的流量限制在200;
- 场景 2:没有特别重要来源的流量。我们不想区分调用来源,所有入口调用 UserService 共享一个规则,所有 client 加起来总流量只能通过 1000 QPS;
- 场景 3:配合第1种场景使用,在长尾应用多的情况下不想对每个应用进行设置,没有具体设置的应用都将命中;
- 进入
FlowRule.getRater().canPass()
方法,首先通过FlowRule.getRater()
获得流控行为 TrafficShapingController,这是一个接口,有四种实现类,如下图所示:
文章图片
- 有以下四种处理策略:
- DefaultController:直接拒绝;
- RateLimiterController:匀速排队;
- WarmUpController:冷启动(预热);
- WarmUpRateLimiterController:匀速+冷启动。
- 最终调用
TrafficShapingController.canPass()
方法,执行流控行为;
- 限流的核心是限流算法的实现,Sentinel 默认采用滑动窗口算法来实现限流,具体的指标数据统计由 StatisticSlot 实现;
- 我们给
StatisticSlot.entry()
方法里的语句打上断点,运行到光标处; StatisticSlot.entry()
方法的核心是使用 Node 统计“增加线程数”和“请求通过数”;
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable
try
//先执行后续 Slot 检查,再统计数据(即先调用后续所有 Slot)
fireEntry(context, resourceWrapper, node, count, prioritized, args);
//【断点步入】使用 Node 统计“增加线程数”和“请求通过数”
node.increaseThreadNum();
node.addPassRequest(count);
//如果存在来源节点,则对来源节点增加线程数和请求通过数
if (context.getCurEntry().getOriginNode() != null)
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
//如果是入口流量,则对全局节点增加线程数和请求通过数
if (resourceWrapper.getEntryType() == EntryType.IN)
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
//执行事件通知和回调函数
for (ProcessorSlotEntryCallback<
DefaultNode>
handler : StatisticSlotCallbackRegistry.getEntryCallbacks())
handler.onPass(context, resourceWrapper, node, count, args);
//处理优先级等待异常
catch (PriorityWaitException ex)
node.increaseThreadNum();
//如果有来源节点,则对来源节点增加线程数
if (context.getCurEntry().getOriginNode() != null)
context.getCurEntry().getOriginNode().increaseThreadNum();
//如果是入口流量,对全局节点增加线程数
if (resourceWrapper.getEntryType() == EntryType.IN)
Constants.ENTRY_NODE.increaseThreadNum();
//执行事件通知和回调函数
for (ProcessorSlotEntryCallback<
DefaultNode>
handler : StatisticSlotCallbackRegistry.getEntryCallbacks())
handler.onPass(context, resourceWrapper, node, count, args);
//处理限流、熔断等异常
catch (BlockException e) //省略throw e;
//处理业务异常
catch (Throwable e)
context.getCurEntry().setError(e);
throw e;
4.1 统计“增加线程数”和“请求通过数”
- 这两个方法都是调用同一个类的,笔者以第一个为例,进入
DefaultNode.increaseThreadNum()
方法,最终调用的是StatisticNode.increaseThreadNum()
,而统计也是依靠 StatisticNode 维护的,这里放上 StatisticNode 的统计核心与源码:- StatisticNode 持有两个计数器 Metric 对象,统计行为是通过 Metric 完成的;
public class StatisticNode implements Node //省略其他代码//【断点步入】最近 1s 滑动窗口计数器(默认 1s)
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
//最近 1min 滑动窗口计数器(默认 1min)
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
//增加 “请求通过数”
@Override
public void addPassRequest(int count)
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
//增加 RT 和成功数
@Override
public void addRtAndSuccess(long rt, int successCount)
rollingCounterInSecond.addSuccess(successCount);
rollingCounterInSecond.addRT(rt);
rollingCounterInMinute.addSuccess(successCount);
rollingCounterInMinute.addRT(rt);
//增加“线程数”
@Override
public void increaseThreadNum()
curThreadNum.increment();
- 这里还有减少请求通过数(线程数)、统计最大值等方法,由于篇幅有限,这里不放出,感兴趣的读者可以自己 DeBug 进入看看;
- ArrayMetric 的构造方法需要先给方法打上断点,重新 DeBug,在初始化时注入构造;
public class ArrayMetric implements Metric //省略其他代码//【点进去 4.2.2】数据存储
private final LeapArray<
MetricBucket>
data;
//最近 1s 滑动计数器用的是 OccupiableBucketLeapArray
public ArrayMetric(int sampleCount, int intervalInMs)
this.data = https://www.songbingjia.com/android/new OccupiableBucketLeapArray(sampleCount, intervalInMs);
//最近 1min 滑动计数器用的是 BucketLeapArray
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy)
if (enableOccupy)
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
else
this.data = new BucketLeapArray(sampleCount, intervalInMs);
//增加成功数
@Override
public void addSuccess(int count)
WindowWrap<
MetricBucket>
wrap = data.currentWindow();
wrap.value().addSuccess(count);
//增加通过数
@Override
public void addPass(int count)
WindowWrap<
MetricBucket>
wrap = data.currentWindow();
wrap.value().addPass(count);
//增加 RT
@Override
public void addRT(long rt)
WindowWrap<
MetricBucket>
wrap = data.currentWindow();
wrap.value().addRT(rt);
4.2.2 LeapArray 环形数组
- LeapArray 是处理数据的核心数据结构,采用滑动窗口算法;
- ArrayMetric 中持有 LeapArray 对象,所有方法都是对 LeapArray 进行操作;
- LeapArray 是环形的数据结构,为了节约内存,它存储固定个数的窗口对象 WindowWrap,只保存最近一段时间的数据,新增的时间窗口会覆盖最早的时间窗口;
public abstract class LeapArray<
T>
//省略其他代码//单个窗口的长度(1个窗口多长时间)
protected int windowLengthInMs;
//采样窗口个数
protected int sampleCount;
//全部窗口的长度(全部窗口多长时间)
protected int intervalInMs;
private double intervalInSecond;
//窗口数组:存储所有窗口(支持原子读取和写入)
protected final AtomicReferenceArray<
WindowWrap<
T>
>
array;
//更新窗口数据时用的锁
private final ReentrantLock updateLock = new ReentrantLock();
public LeapArray(int sampleCount, int intervalInMs)
//计算单个窗口的长度
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<
>
(sampleCount);
//【点进去 4.2.3】获取当前窗口
public WindowWrap<
T>
currentWindow()
//这里参数是当前时间
return currentWindow(TimeUtil.currentTimeMillis());
//获取指定时间的窗口
public WindowWrap<
T>
currentWindow(long timeMillis)
if (timeMillis <
0)
return null;
// 计算数组下标
int idx = calculateTimeIdx(timeMillis);
//计算当前请求对应的窗口开始时间
long windowStart = calculateWindowStart(timeMillis);
/*
* 从 array 中获取窗口。有 3 种情况:
* (1) array 中窗口不在,创建一个 CAS 并写入 array;
* (2) array 中窗口开始时间 = 当前窗口开始时间,直接返回;
* (3) array 中窗口开始时间 <
当前窗口开始时间,表示 o1d 窗口已过期,重置窗口数据并返回;
*/
while (true)
// 取窗口
WindowWrap<
T>
old = array.get(idx);
//(1)窗口不在
if (old == null)
//创建一个窗口
WindowWrap<
T>
window = new WindowWrap<
T>
(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
//CAS将窗口写进 array 中并返回(CAS 操作确保只初始化一次)
if (array.compareAndSet(idx, null, window))
return window;
else
//并发写失败,释放 CPU 资源,避免有线程长时间占用 CPU,一般下次来的时候 array 中有数据了会命中第2种情况;
Thread.yield();
//(2)array 中窗口开始时间 = 当前窗口开始时间
else if (windowStart == old.windowStart())
//直接返回
return old;
//(3)array 中窗口开始时间 <
当前窗口开始时间
else if (windowStart >
old.windowStart())
//尝试获取更新锁
if (updateLock.tryLock())
try
//拿到锁的线程才重置窗口
return resetWindowTo(old, windowStart);
finally
//释放锁
updateLock.unlock();
else
//并发加锁失败,释放 CPU 资源,避免有线程长时间占用 CPU,一般下次来的时候因为 old 对象时间更新了会命中第 2 种情况;
Thread.yield();
//理论上不会出现
else if (windowStart <
old.windowStart())
// 正常情况不会进入该分支(机器时钟回拨等异常情况)
return new WindowWrap<
T>
(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
//计算索引
private int calculateTimeIdx(/*@Valid*/ long timeMillis)
//timeId 降低时间精度
long timeId = timeMillis / windowLengthInMs;
//计算当前索引,这样我们就可以将时间戳映射到 leap 数组
return (int)(timeId % array.length());
//计算窗口开始时间
protected long calculateWindowStart(/*@Valid*/ long timeMillis)
return timeMillis - timeMillis % windowLengthInMs;
4.2.3 WindowWrap 窗口包装类
- WindowWrap 是一个窗口对象,它是一个包装类,包装的对象是 MetricBucket;
public class WindowWrap<
T>
//窗口长度,与 LeapArray 的 windowLengthInMs 一致
private final long windowLengthInMs;
//窗口开始时间,其值是 windowLengthInMs 的整数倍
private long windowStart;
//窗口的数据,支持 MetricBucket 类型,存储统计数据
private T value;
//省略其他代码
4.2.4 MetricBucket 指标桶
- MetricBucket 类的定义如下,可以发现指标数据存在 LongAdder[] counters中;
- LongAdder 是 JDK1.8 中新增的类,用于在高并发场景下代替AtomicLong,以用空间换时间的方式降低了 CAS 失败的概率,从而提高性能;
public class MetricBucket
/**
* 存储指标的计数器;
* LongAdder 是线程安全的计数器
* counters[0]PASS 通过数;
* counters[1]BLOCK 拒绝数;
* counters[2]EXCEPTION 异常数;
* counters[3]SUCCESS 成功数;
* counters[4]RT 响应时长;
* counters[5]OCCUPIED_PASS 预分配通过数;
**/
private final LongAdder[] counters;
//最小 RT,默认值是 5000ms
private volatile long minRt;
//构造中初始化
public MetricBucket()
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events)
counters[event.ordinal()] = new LongAdder();
initMinRt();
//覆盖指标
public MetricBucket reset(MetricBucket bucket)
for (MetricEvent event : MetricEvent.values())
counters[event.ordinal()].reset();
counters[event.ordinal()].add(bucket.get(event));
initMinRt();
return this;
private void initMinRt()
this.minRt = SentinelConfig.statisticMaxRt();
//重置指标为0
public MetricBucket reset()
for (MetricEvent event : MetricEvent.values())
counters[event.ordinal()].reset();
initMinRt();
return this;
//获取指标,从 counters 中返回
public long get(MetricEvent event)
return counters[event.ordinal()].sum();
//添加指标
public MetricBucket add(MetricEvent event, long n)
counters[event.ordinal()].add(n);
return this;
public long pass()
return get(MetricEvent.PASS);
public long block()
return get(MetricEvent.BLOCK);
public void addPass(int n)
add(MetricEvent.PASS, n);
public void addBlock(int n)
add(MetricEvent.BLOCK, n);
//省略其他代码
4.2.5 各数据结构的依赖关系
文章图片
文章图片
4.2.6 LeapArray 统计数据的大致思路
- 创建一个长度为 n 的数组,数组元素就是窗口,窗口包装了 1 个指标桶,桶中存放了该窗口时间范围中对应的请求统计数据;
- 可以想象成一个环形数组在时间轴上向右滚动,请求到达时,会命中数组中的一个窗口,那么该请求的数据就会存到命中的这个窗口包含的指标桶中;
- 当数组转满一圈时,会回到数组的开头,而此时下标为 0 的元素需要重复使用,它里面的窗口数据过期了,需要重置,然后再使用。具体过程如下图:
文章图片
5. 熔断槽实施服务熔断 DegradeSlot.entry()
- 服务熔断是通过 DegradeSlot 来实现的,它会根据用户配置的熔断规则和系统运行时各个 Node 中的统计数据进行熔断判断;
- 注意:熔断功能在 Sentinel-1.8.0 版本前后有较大变化;
- 我们给
DegradeSlot.entry()
方法里的语句打上断点,运行到光标处;
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable
//【断点步入】熔断检查
performChecking(context, resourceWrapper);
//调用下一个 Slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
- 进入
DegradeSlot.performChecking()
方法,其逻辑与源码如下:- 根据资源名称获取断路器;
- 循环判断每个断路器;
void performChecking(Context context, ResourceWrapper r) throws BlockException
//根据 resourceName 获取断路器
List<
CircuitBreaker>
circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty())
return;
//循环判断每个断路器
for (CircuitBreaker cb : circuitBreakers)
//【点进去】尝试通过断路器
if (!cb.tryPass(context))
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
5.1 继续或取消熔断功能
- 进入
AbstractCircuitBreaker.tryPass()
方法,当请求超时并且处于探测恢复(半开状态,HALF-OPEN 状态)失败时继续断路功能;
@Override
public boolean tryPass(Context context)
//当前断路器状态为关闭
if (currentState.get() == State.CLOSED)
return true;
if (currentState.get() == State.OPEN)
//【点进去】对于半开状态,我们尝试通过
return retryTimeoutArrived() &
&
fromOpenToHalfOpen(context);
return false;
- 进入
AbstractCircuitBreaker.fromOpenToHalfOpen()
方法,实现状态的变更;
protected boolean fromOpenToHalfOpen(Context context)
//尝试将状态从 OPEN 设置为 HALF_OPEN
if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN))
//状态变化通知
notifyObservers(State.OPEN, State.HALF_OPEN, null);
Entry entry = context.getCurEntry();
//在 entry 添加一个 exitHandlerentry.exit() 时会调用
entry.whenTerminate(new BiConsumer<
Context, Entry>
()
@Override
public void accept(Context context, Entry entry)
//如果有发生异常,重新将状态设置为OPEN 请求不同通过
if (entry.getBlockError() != null)
currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
);
//此时状态已设置为HALF_OPEN正常通行
return true;
//熔断
return false;
- 上述讲解了:状态从 OPEN 变为 HALF_OPEN,HALF_OPEN 变为 OPEN;
- 但状态从 HALF_OPEN 变为 CLOSE 需要在正常执行完请求后,由 entry.exit() 调用
DegradeSlot.exit()
方法来改变状态;
- 状态从 HALF_OPEN 变为 CLOSE 的实现方法在
DegradeSlot.exit()
;
@Override
public void exit(Context context, ResourceWrapper r, int count, Object... args)
Entry curEntry = context.getCurEntry();
//无阻塞异常
if (curEntry.getBlockError() != null)
fireExit(context, r, count, args);
return;
//通过资源名获取断路器
List<
CircuitBreaker>
circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
//没有配置断路器,则直接放行
if (circuitBreakers == null || circuitBreakers.isEmpty())
fireExit(context, r, count, args);
return;
if (curEntry.getBlockError() == null)
for (CircuitBreaker circuitBreaker : circuitBreakers)
//【点进去】在请求完成时
circuitBreaker.onRequestComplete(context);
fireExit(context, r, count, args);
- 进入
ExceptionCircuitBreaker.onRequestComplete()
方法,其主要逻辑与源码如下:- 请求失败比例与总请求比例加 1,用于判断后续是否超过阈值;
@Override
public void onRequestComplete(Context context)
Entry entry = context.getCurEntry();
if (entry == null)
return;
Throwable error = entry.getError();
//简单错误计数器
SimpleErrorCounter counter = stat.currentWindow().value();
if (error != null)
//异常请求数加 1
counter.getErrorCount().add(1);
//总请求数加 1
counter.getTotalCount().add(1);
//【点进去】超过阈值时变更状态
handleStateChangeWhenThresholdExceeded(error);
- 进入
ExceptionCircuitBreaker.handleStateChangeWhenThresholdExceeded()
方法,变更状态;
private void handleStateChangeWhenThresholdExceeded(Throwable error)
//全开则直接放行
if (currentState.get() == State.OPEN)
return;
//半开状态
if (currentState.get() == State.HALF_OPEN)
//检查请求
if (error == null)
//发生异常,将状态从半开 HALF_OPEN 转为关闭 CLOSE
fromHalfOpenToClose();
else
//无异常,解开半开状态
fromHalfOpenToOpen(1.0d);
return;
//计算是否超过阈值
List<
SimpleErrorCounter>
counters = stat.values();
long errCount = 0;
long totalCount = 0;
for (SimpleErrorCounter counter : counters)
errCount += counter.errorCount.sum();
totalCount += counter.totalCount.sum();
if (totalCount <
minRequestAmount)
return;
double curCount = errCount;
if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO)
//熔断策略为:异常比例
curCount = errCount * 1.0d / totalCount;
if (curCount >
threshold)
transformToOpen(curCount);
6. Sentinel 源码结构图小结
- SphU.entry():核心逻辑的入口函数;
- CtSph.entryWithPriority():获取 Slot 链,操作 Slot 槽;
- CtSph.lookProcessChain():获取 ProcessorSlot 链;
- DefaultSlotChainBuilder.build():构造 DefaultProcessorSlotChain 链(里面有 10 个 Slot 插槽);
- ProcessorSlot.entry():遍历 ProcessorSlot 链;
- FlowSlot.entry():遍历到 FlowSlot 槽,限流规则;
- FlowSlot.checkFlow():检查流量规则;
- FlowRuleChecker.checkFlow():使用检查器检查流量规则;
- FlowSlot.ruleProvider.apply():获取流控规则;
- FlowRuleChecker.canPassCheck():校验每条规则;
- FlowRuleChecker.passClusterCheck():集群模式;
- FlowRuleChecker.passLocalCheck():单机模式;
- FlowRuleChecker.selectNodeByRequesterAndStrategy():获取 Node;
- FlowRule.getRater():获得流控行为 TrafficShapingController;
- TrafficShapingController.canPass():执行流控行为;
- StatisticSlot.entry:遍历到 StatisticSlot 槽,统计数据;
- DefaultNode.increaseThreadNum():统计“增加线程数”;
- StatisticNode.increaseThreadNum():统计“请求通过数”;
- ArrayMetric.ArrayMetric():初始化指标数组;
- LeapArray:环形数组;
- WindowWrap:窗口包装类;
- MetricBucket:指标桶;
- DefaultNode.addPassRequest():统计“增加线程数”;
- StatisticNode.addPassRequest():同上;
- DegradeSlot.entry():遍历到 DegradeSlot 槽,服务熔断;
- DegradeSlot.performChecking():执行检查;
- DegradeRuleManager.getCircuitBreakers():根据 resourceName 获取断路器;
- AbstractCircuitBreaker.tryPass():继续或取消熔断功能;
- AbstractCircuitBreaker.fromOpenToHalfOpen():尝试通过半开状态;
- DegradeSlot.exit():请求失败(超时),启动熔断;
- ExceptionCircuitBreaker.onRequestComplete():在请求完成时操作;
- ExceptionCircuitBreaker.handleStateChangeWhenThresholdExceeded():变更状态;
新人制作,如有错误,欢迎指出,感激不尽!
:::
::: hljs-center
欢迎关注公众号,会分享一些更日常的东西!
:::
::: hljs-center
如需转载,请标注出处!
:::
::: hljs-center
文章图片
:::
推荐阅读
- 微服务架构 | 5.2 基于 Sentinel 的服务限流及熔断
- 自上而下的理解网络——TCP篇
- XXE外部实体注入(XML External Entity Injection)学习笔记
- 聊聊 JDBC 的 executeBatch || 对比下不同数据库对 JDBC batch 的实现细节
- 字节可寻址内存和字可寻址内存之间的差异
- 架构风格、架构模式和设计模式之间的区别
- Arch Linux和Kali Linux之间有什么区别()
- 批处理系统与在线处理系统有什么区别()
- 批处理操作系统和多程序操作系统之间有什么区别()