sentinel源码系列|第四篇(Sentinel限流核心逻辑过程分析)

本篇主要是讲解Sentinel限流逻辑的核心流转是怎么样的,不会过多涉及到具体的算法,更多的是在讨论主线,具体每个规则的后面会说到
简单使用的例子
【sentinel源码系列|第四篇(Sentinel限流核心逻辑过程分析)】先看下官网提供的简单例子,使用的是流控规则,代码如下,可以看到入口其实就是Sphu.entry这个地方,也是最核心的逻辑
public class SentinelHelloTest { public static void main(String[] args) throws Exception{ // 配置规则. initFlowRules(); while (true) { // 1.5.0 版本开始可以直接利用 try-with-resources 特性 //SphU.entry进入资源,成功则执行内部逻辑 try (Entry entry = SphU.entry("HelloWorld")) { Thread.sleep(10000 * 6); // 被保护的逻辑 System.out.println("hello world"); } catch (BlockException ex) { // 处理被流控的逻辑, 已经超出限制则抛出异常 System.out.println("blocked!" + ex.getMessage()); } } }/** * 初始化规则 */ private static void initFlowRules(){ List rules = new ArrayList<>(); //限流规则 FlowRule rule = new FlowRule(); //资源名字 rule.setResource("HelloWorld"); //限制类型,有qps和线程数,这里是线程数 rule.setGrade(RuleConstant.FLOW_GRADE_QPS); // Set limit QPS to 20. //限制1秒最多只能进20个请求 rule.setCount(20); rules.add(rule); //加载规则,在内存中进行维护的 FlowRuleManager.loadRules(rules); } }

Sphu.entry
//入口//com.alibaba.csp.sentinel.SphU#entry(java.lang.String)//接着到//com.alibaba.csp.sentinel.CtSph#entry()@Override public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException { //对资源进行一个bao包装 StringResourceWrapper resource = new StringResourceWrapper(name, type); //调用重载的方法 return entry(resource, count, args); }//最终会到这里来//com.alibaba.csp.sentinel.CtSph#entryWithPriority(com.alibaba.csp.sentinel.slotchain.ResourceWrapper, int, boolean, java.lang.Object...)private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException {//分支逻辑,判断Context是否为空 Context context = ContextUtil.getContext(); if (context instanceof NullContext) { // The {@link NullContext} indicates that the amount of context has exceeded the threshold, // so here init the entry only. No rule checking will be done. return new CtEntry(resourceWrapper, null, context); }//如果为空使用默认的上下文 if (context == null) { // Using default context. context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME); }//如果关闭则不会执行校验规则,默认是开启的 // Global switch is close, no rule checking will do. if (!Constants.ON) { return new CtEntry(resourceWrapper, null, context); }//核心方法,构建处理链表,封装了一条链,然后使用双向链表的方式连接起来 ProcessorSlot chain = lookProcessChain(resourceWrapper); /* * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, * so no rule checking will be done. * 为空也是不需要校验 */ if (chain == null) { return new CtEntry(resourceWrapper, null, context); }Entry e = new CtEntry(resourceWrapper, chain, context); try { //责任链设计模式进入一个个slot调用 chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { //责任链设计模式退出一个个slot调用 e.exit(count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); } return e; }//看到上面核心的方法其实就是一个solt链的构建与solt链条的调用,下面再分成几个部分来说说这个代码走向
lookProcessChain
构建链条的方法的方法,里面涉及到链条的构建与其对应的扩展
//com.alibaba.csp.sentinel.CtSph#lookProcessChain ProcessorSlot lookProcessChain(ResourceWrapper resourceWrapper) { //从chainMap中获取是否已经有了,缓存功能 ProcessorSlotChain chain = chainMap.get(resourceWrapper); //如果不存在则使用双重检锁机制进行初始化 if (chain == null) { synchronized (LOCK) { chain = chainMap.get(resourceWrapper); if (chain == null) { // Entry size limit. //链表的长度不能超过最大值,否则不进行处理,MAX_SLOT_CHAIN_SIZE为6000 if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; }//核心方法,进行构建链条 chain = SlotChainProvider.newSlotChain(); //把返回的chain放到缓存中去 Map newMap = new HashMap( chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); //维护下最新的map,,这里暂时不太懂这层中转,,, chainMap = newMap; } } } //存在直接返回 return chain; }//接着再看下具体的构建方法//com.alibaba.csp.sentinel.slotchain.SlotChainProvider#newSlotChainpublic static ProcessorSlotChain newSlotChain() { //如果不为空直接返回,默认是为空的 if (slotChainBuilder != null) { return slotChainBuilder.build(); }// Resolve the slot chain builder SPI. // 通过spi机制获取第一个或者获取默认的,这里是一个扩展,可以根据spi机制进行自定义,spi机制前面说了,这里就不再进去看了 slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault(); if (slotChainBuilder == null) { // Should not go through here. //如果还是为空,则使用默认的构建器 RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default"); slotChainBuilder = new DefaultSlotChainBuilder(); } else { RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}", slotChainBuilder.getClass().getCanonicalName()); } //执行构建逻辑 return slotChainBuilder.build(); }//逻辑还是比较简单的,一目了然,接着再看下最终的构建逻辑//com.alibaba.csp.sentinel.slots.DefaultSlotChainBuilder#build@Override public ProcessorSlotChain build() { //创建一个ProcessorSlotChain对象,就是一个类似于双向链表的数据结构 ProcessorSlotChain chain = new DefaultProcessorSlotChain(); //再次使用spi机制提供扩展,然后获取排序好的链条,比如以下两个,哪个order小哪个就在前面,这里ORDER_AUTHORITY_SLOT比ORDER_DEGRADE_SLOT要小 //@Spi(order = Constants.ORDER_AUTHORITY_SLOT) public class AuthoritySlot //@Spi(order = Constants.ORDER_DEGRADE_SLOT)public class DegradeSlot List sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted(); //循环组装为一个调用链,这里采用的是责任链设计模式 for (ProcessorSlot slot : sortedSlotList) { //不是AbstractLinkedProcessorSlot类型的不会被加到链条中 if (!(slot instanceof AbstractLinkedProcessorSlot)) { RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain"); continue; } //转换并添加到末尾中 chain.addLast((AbstractLinkedProcessorSlot) slot); }return chain; }
chain.entry
责任链设计模式调用入口,看这个只要了解这个设计模式基本就可以知道了,所以下面要去看一下ProcessorSlot的结构和其他几个slot基本的一个情况
ProcessorSlot
public interface ProcessorSlot {/** * Entrance of this slot. * 进入该slot */ void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized, Object... args) throws Throwable; /** * Means finish of {@link #entry(Context, ResourceWrapper, Object, int, boolean, Object...)}. * * 循环进入slot */ void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable; /** * Exit of this slot. * 退出该slot */ void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args); /** * Means finish of {@link #exit(Context, ResourceWrapper, int, Object...)}. * 循环退出slot */ void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args); }

AbstractLinkedProcessorSlot
抽象的实现,后面其他的校验规则都会继承这个类,所以这个抽象类还是很重要的,来看一下他的大概结构有哪些内容
public abstract class AbstractLinkedProcessorSlot implements ProcessorSlot {//下个slot private AbstractLinkedProcessorSlot next = null; @Override public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { //下一个不为空,则进行调用 if (next != null) { next.transformEntry(context, resourceWrapper, obj, count, prioritized, args); } }@SuppressWarnings("unchecked") void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable { T t = (T)o; entry(context, resourceWrapper, t, count, prioritized, args); }@Override public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { //下一个不为空,则进行调用 if (next != null) { next.exit(context, resourceWrapper, count, args); } }//对下一个的基本操作 public AbstractLinkedProcessorSlot getNext() { return next; }public void setNext(AbstractLinkedProcessorSlot next) { this.next = next; }}

DefaultProcessorSlotChain
默认的实现类,这是调用的源头,也是比较重要的,用类似于双向链表的思路构建了一条链,可以看看下面的逻辑
public class DefaultProcessorSlotChain extends ProcessorSlotChain {//第一个slot AbstractLinkedProcessorSlot first = new AbstractLinkedProcessorSlot() {@Override public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { super.fireEntry(context, resourceWrapper, t, count, prioritized, args); }@Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { super.fireExit(context, resourceWrapper, count, args); }}; //最后一个slot,默认等于第一个 AbstractLinkedProcessorSlot end = first; //在头节点添加 @Override public void addFirst(AbstractLinkedProcessorSlot protocolProcessor) { protocolProcessor.setNext(first.getNext()); first.setNext(protocolProcessor); if (end == first) { end = protocolProcessor; } }//在尾节点添加 @Override public void addLast(AbstractLinkedProcessorSlot protocolProcessor) { end.setNext(protocolProcessor); end = protocolProcessor; }/** * Same as {@link #addLast(AbstractLinkedProcessorSlot)}. * * @param next processor to be added. */ @Override public void setNext(AbstractLinkedProcessorSlot next) { addLast(next); }@Override public AbstractLinkedProcessorSlot getNext() { return first.getNext(); }/** * * 调用第一个节点的进入 */ @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { first.transformEntry(context, resourceWrapper, t, count, prioritized, args); }/** * * 调用第一个节点的退出 */ @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { first.exit(context, resourceWrapper, count, args); }}
其他的就是一些细项规则了,比如
FlowSlot:限流slot
LogSlot:日志slot

等等,具体的slot在com.alibaba.csp.sentinel.slotchain.ProcessorSlot文件中
# Sentinel default ProcessorSlots com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot com.alibaba.csp.sentinel.slots.logger.LogSlot com.alibaba.csp.sentinel.slots.statistic.StatisticSlot com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot com.alibaba.csp.sentinel.slots.system.SystemSlot com.alibaba.csp.sentinel.slots.block.flow.FlowSlot com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

至于前后顺序在每个slot上面会有@Spi(order = Constants.ORDER_STATISTIC_SLOT)进行指定,值越小就排的越前
接下来就是后面的调用处理了,如下
Entry e = new CtEntry(resourceWrapper, chain, context); try { //责任链设计模式进入一个个slot调用 chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { //责任链设计模式退出一个个slot调用 e.exit(count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); } return e;

返回的chain就是DefaultProcessorSlotChain,一调用它就会一个个的slot进行调用,从而实现拦截的功能,同理,调用e.exit就会一个个solt去调用对应的方法,直到结束

本篇文章只是记录整体的逻辑,这里使用的是责任链的设计模式,这个模式在很多地方都会用到,大家可以认真学习下这种思路,这篇文章就只讲到这里,后面的文章会再说下具体规则的实现与其校验

    推荐阅读