掘金 后端 ( ) • 2024-04-27 09:33

归档

测试

// 在 demo-spring-webmvc 里新建测试类
public class TestMain {
    public static void main(String[] args) throws InterruptedException {
        initFlowRules(); // 初始化规则
        while (true) {
            // 1.5.0 版本开始可以直接利用 try-with-resources 特性
            try (Entry entry = SphU.entry("HelloWorld")) { // 进入资源 sign_demo_010
                // 被保护的逻辑
                System.out.println("hello world");
                // entry.exit(); // 在自动 close() 方法里会调用 exit(),不需要再手动调用
            } catch (BlockException ex) {
                // 处理被流控的逻辑
                System.err.println("blocked!");
                break;
            }
            Thread.sleep(10);
        }
    }

    // 初始化规则
    private static void initFlowRules() {
        List<FlowRule> rules = new ArrayList<>();
        FlowRule rule = new FlowRule();
        rule.setResource("HelloWorld");
        rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
        rule.setCount(20);                  // Set limit QPS to 20.
        rules.add(rule);
        FlowRuleManager.loadRules(rules);   // 设置规则 sign_demo_020
    }
}

查看异常栈

  • BlockExceptionFlowException 类的 fillInStackTrace() 方法注释掉才会打印完整调用栈记录
com.alibaba.csp.sentinel.slots.block.flow.FlowException
	at com.alibaba.csp.sentinel.slots.block.flow.FlowRuleChecker.checkFlow(FlowRuleChecker.java:53) // 流控校验
	at com.alibaba.csp.sentinel.slots.block.flow.FlowSlot.checkFlow(FlowSlot.java:171)
	at com.alibaba.csp.sentinel.slots.block.flow.FlowSlot.entry(FlowSlot.java:164)
	at com.alibaba.csp.sentinel.slots.block.flow.FlowSlot.entry(FlowSlot.java:141)
	at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40)
	at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32)
	at com.alibaba.csp.sentinel.slots.system.SystemSlot.entry(SystemSlot.java:39)
	at com.alibaba.csp.sentinel.slots.system.SystemSlot.entry(SystemSlot.java:32)
	at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40)
	at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32)
	at com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot.entry(AuthoritySlot.java:42)
	at com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot.entry(AuthoritySlot.java:35)
	at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40)
	at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32)
	at com.alibaba.csp.sentinel.slots.statistic.StatisticSlot.entry(StatisticSlot.java:59)
	at com.alibaba.csp.sentinel.slots.statistic.StatisticSlot.entry(StatisticSlot.java:51)
	at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40)
	at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32)
	at com.alibaba.csp.sentinel.slots.logger.LogSlot.entry(LogSlot.java:38)
	at com.alibaba.csp.sentinel.slots.logger.LogSlot.entry(LogSlot.java:31)
	at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40)
	at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32)
	at com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot.entry(ClusterBuilderSlot.java:104)
	at com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot.entry(ClusterBuilderSlot.java:49)
	at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40)
	at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32)
	at com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot.entry(NodeSelectorSlot.java:174)
	at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40)
	at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.fireEntry(AbstractLinkedProcessorSlot.java:32)
	at com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain$1.entry(DefaultProcessorSlotChain.java:31)
	at com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot.transformEntry(AbstractLinkedProcessorSlot.java:40)
	at com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain.entry(DefaultProcessorSlotChain.java:75)
	at com.alibaba.csp.sentinel.CtSph.entryWithPriority(CtSph.java:149)
	at com.alibaba.csp.sentinel.CtSph.entry(CtSph.java:177)
	at com.alibaba.csp.sentinel.CtSph.entry(CtSph.java:316)
	at com.alibaba.csp.sentinel.SphU.entry(SphU.java:85)
	at com.alibaba.csp.sentinel.demo.spring.webmvc.test.TestMain.main(TestMain.java:22) // 在 demo-spring-webmvc 里建的测试类

原理

类结构

  • com.alibaba.csp.sentinel.slotchain.ProcessorSlot
/** sign_i_001 处理器接口 */
public interface ProcessorSlot<T> {
    /** sign_im_001 进入资源处理 */
    void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,
               Object... args) throws Throwable;
}
  • com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot
/** sign_c_001 处理器(单向)链节点 */
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> { // 实现接口 sign_i_001
    private AbstractLinkedProcessorSlot<?> next = null; // 下一个节点
}

进入资源

  • 相当于获取资源,进行计数处理,被限制就会报错

  • com.alibaba.csp.sentinel.SphU

    private static final Object[] OBJECTS0 = new Object[0];

    // sign_demo_010 进入资源
    public static Entry entry(String name) throws BlockException {
        // sph 为 CtSph 实例
        return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0); // sign_m_010
    }
  • com.alibaba.csp.sentinel.CtSph
    // sign_m_010
    @Override
    public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
        StringResourceWrapper resource = new StringResourceWrapper(name, type);
        return entry(resource, count, args); // sign_m_001
    }

    // sign_m_001
    public Entry entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {
        return entryWithPriority(resourceWrapper, count, false, args); // sign_m_002
    }

    // sign_m_002
    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        Context context = ContextUtil.getContext();

        ... // 省略其他处理

        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); // sign_m_003

        ... // 省略 chain 为空处理

        Entry e = new CtEntry(resourceWrapper, chain, context); // 创建入口(相当于锁)
        try {
            /**
             * 链式处理进入资源逻辑 sign_im_001
             * 首节点处理 sign_m_020
             */
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } ... // 省略 catch 处理
        return e;
    }

    // sign_m_003
    ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        if (chain == null) {
            synchronized (LOCK) {
                chain = chainMap.get(resourceWrapper);
                if (chain == null) {
                    ... // 省略超出限制的处理
                    /**
                     * 通过 SPI 获取处理链。
                     *   配置文件: META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot
                     *   链节点类可通过 @Spi 注解设置(顺排)顺序。
                     * 结构参考: sign_chain_001
                     */
                    chain = SlotChainProvider.newSlotChain();
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap; // COW 更新缓存
                }
            }
        }
        return chain;
    }

处理链

  • sign_chain_001
// 链结构: cur -> next
DefaultProcessorSlotChain   // 默认节点     (用于组装链)
  -> first                  // 头节点       (匿名实现,只传递,无逻辑) sign_f_001
  -> NodeSelectorSlot       // 设置统计节点 (供下游统计)
  -> ClusterBuilderSlot     // 设置集群节点 (供下游统计)
  -> LogSlot                // 日志记录     (只记录异常日志)
  -> StatisticSlot          // 统计         (记录 QPS、线程数等,供下游(下次)流控)
  -> AuthoritySlot          // 权限管理     (黑名单、白名单处理)
  -> SystemSlot             // 系统流制     (整个应用管控)
  -> FlowSlot               // 单资源流控
  -> DegradeSlot            // 降级处理     (相当于熔断);尾节点 end

链处理进入

  • com.alibaba.csp.sentinel.slotchain.DefaultProcessorSlotChain
public class DefaultProcessorSlotChain extends ProcessorSlotChain {
    // sign_f_001
    AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {
        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
            throws Throwable {
            super.fireEntry(context, resourceWrapper, t, count, prioritized, args); // 只是传递给下一个节点 sign_m_022
        }

        @Override
        public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
            super.fireExit(context, resourceWrapper, count, args);
        }
    };

    // sign_m_020 实现进入处理方法 sign_im_001
    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
        throws Throwable {
        // 从头节点开始
        first.transformEntry(context, resourceWrapper, t, count, prioritized, args); // sign_m_021
    }
}
  • com.alibaba.csp.sentinel.slotchain.AbstractLinkedProcessorSlot
    // sign_m_021 传递处理进入
    void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
        throws Throwable {
        T t = (T)o;
        entry(context, resourceWrapper, t, count, prioritized, args); // 各自节点应实现的进入处理方法 sign_im_001
    }

    // sign_m_022 发送进入
    @Override
    public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        if (next != null) { 
            // 向下一个节点传递
            next.transformEntry(context, resourceWrapper, obj, count, prioritized, args); // sign_m_021
        }
    }

设置规则

  • 有点绕,用监听器做最终更改

  • com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager

public class FlowRuleManager {
    private static volatile Map<String, List<FlowRule>> flowRules = new HashMap<>();
    private static final FlowPropertyListener LISTENER = new FlowPropertyListener();
    private static SentinelProperty<List<FlowRule>> currentProperty = new DynamicSentinelProperty<List<FlowRule>>();

    static {
        currentProperty.addListener(LISTENER);  // 添加监听器
        startMetricTimerListener();             // 开启度量定时任务,默认 1s 一次
    }

    // sign_demo_020 设置规则
    public static void loadRules(List<FlowRule> rules) {
        currentProperty.updateValue(rules);     // 更新值 sign_m_201
    }

    // 内部监听器
    private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {
        @Override
        public synchronized void configUpdate(List<FlowRule> value) {
            Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);   // 构建规则集 Map  sign_m_210
            if (rules != null) {
                flowRules = rules; // 更改规则集 Map
            }
            RecordLog.info("[FlowRuleManager] Flow rules received: {}", rules);
        }
    }
}
  • com.alibaba.csp.sentinel.property.DynamicSentinelProperty
// sign_c_100 动态属性,可通过监听器扩展 (值更改的) 处理
public class DynamicSentinelProperty<T> implements SentinelProperty<T> {

    protected Set<PropertyListener<T>> listeners = new CopyOnWriteArraySet<>();
    private T value = null;

    // sign_m_201 更新值
    @Override
    public boolean updateValue(T newValue) {
        ... // 省略:新值与旧值相同时的返回处理

        value = newValue; // 改值
        for (PropertyListener<T> listener : listeners) {
            listener.configUpdate(newValue); // 通知监听器
        }
        return true;
    }
}
  • com.alibaba.csp.sentinel.slots.block.flow.FlowRuleUtil
    // sign_f_201 
    private static final Function<FlowRule, String> extractResource = new Function<FlowRule, String>() {
        @Override
        public String apply(FlowRule rule) {
            return rule.getResource(); // 使用资源名作 key
        }
    };

    // sign_m_210 构建规则集 Map
    public static Map<String, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list) {
        return buildFlowRuleMap(list, null); // sign_m_211
    }
    // sign_m_211
    public static Map<String, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Predicate<FlowRule> filter) {
        return buildFlowRuleMap(list, filter, true); // sign_m_212
    }
    // sign_m_212
    public static Map<String, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Predicate<FlowRule> filter,
                                                               boolean shouldSort) {
        /**
         * extractResource 参考 sign_f_201
         */
        return buildFlowRuleMap(list, extractResource, filter, shouldSort); // sign_m_213
    }
    // sign_m_213
    public static <K> Map<K, List<FlowRule>> buildFlowRuleMap(List<FlowRule> list, Function<FlowRule, K> groupFunction,
                                                              Predicate<FlowRule> filter, boolean shouldSort) {
        Map<K, List<FlowRule>> newRuleMap = new ConcurrentHashMap<>();
        ... // 省略 list 为空处理
        Map<K, Set<FlowRule>> tmpMap = new ConcurrentHashMap<>();

        for (FlowRule rule : list) {
            ... // 省略对 rule 进行无效校验和过滤处理
            ... // 省略对 rule 的 limitApp 为空时填充默认值("default")的处理

            TrafficShapingController rater = generateRater(rule); // 创建流量控制器 sign_m_220
            rule.setRater(rater); // 设置规则的流量控制器

            K key = groupFunction.apply(rule);
            ... // 省略 key 为空的处理

            Set<FlowRule> flowRules = tmpMap.get(key);
            if (flowRules == null) {
                flowRules = new HashSet<>(); // 使用 Set 防止添加重复的规则
                tmpMap.put(key, flowRules);
            }
            flowRules.add(rule);
        }

        Comparator<FlowRule> comparator = new FlowRuleComparator();
        for (Entry<K, Set<FlowRule>> entries : tmpMap.entrySet()) {
            List<FlowRule> rules = new ArrayList<>(entries.getValue());
            if (shouldSort) {
                Collections.sort(rules, comparator); // 对规则进行排序
            }
            newRuleMap.put(entries.getKey(), rules);
        }

        return newRuleMap;
    }

    // sign_m_220 创建流量控制器
    private static TrafficShapingController generateRater(FlowRule rule) {
        if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
            switch (rule.getControlBehavior()) {
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
                    return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
                            ColdFactorProperty.coldFactor);
                case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
                    return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
                case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
                    return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
                            rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
                case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
                default:
                    // Default mode or unknown mode: default traffic shaping controller (fast-reject).
            }
        }
        return new DefaultController(rule.getCount(), rule.getGrade()); // 默认返回此控制器
    }

总结

  • 对资源的监控是通过处理器链进行处理
  • 放不放行是通过流量控制器进行判断
  • 链路处理具体介绍参考:链路控制