掘金 后端 ( ) • 2024-05-10 11:11

一、背景与目标

  • 随着微服务架构的普及,系统之间的交互变得日益复杂,流量控制成为了保障系统稳定运行的关键技术之一。微服务网关作为所有外部请求的入口,可以在这一层进行统一的限流管理。这使得限流策略的配置和维护更加集中和方便,减少了在多个微服务中分别配置限流策略的复杂性。
  • 我司之前一直使用的Sentinel企业版AHAS网关限流,但AHAS将在2025年1月5日正式停服,AHAS推荐的方案是MSE微服务治理,但截止2024年5月,MSE暂时不支持API级别的热点参数限流,不满足我司需求。于是需要改造开源Sentinel网关限流,以满足我司需求。

二、改造前流程分析

改造前:

  • gateway客户端引入sentinel
  • sentinel-dashboard不进行任何修改

Sentinel限流流程图 (4).jpg

1、客户端

  1. 初始化sentinel配置(com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration#init)
  2. 上报心跳到sentinel-dashboard,其中包含ip、port、appName、appType等关键信息(com.alibaba.csp.sentinel.transport.init.HeartbeatSenderInitFunc#init)

2、sentinel-dashboard

  1. 新增、修改、删除:
    1. 控制台对API或者限流规则进行新增、修改、删除
    2. 对本地内存中的数据进行增、修改、删除
    3. 再次查询本地内存中数据,并将其推送到客户端接口(http://clientIp:8719/setRules)
  2. 列表查询
    1. 远程调用客户端接口查询API或规则(http://clientIp:8719/gateway/getRules)
    2. 保存到本地内存InMemoryRuleRepositoryAdapter
    3. 返回列表数据到后台
  3. 定时拉取监控指标数据
    1. 远程调用客户端接口查询API或规则监控指标(http://clientIp:8719/metric)
    2. 保存到本地内存InMemoryMetricsRepository

3、请求限流

  1. 时间窗口统计请求量(com.alibaba.csp.sentinel.slots.statistic.StatisticSlot#entry)
  2. 获取本地内存中的限流规则,并判断是否达到限流条件(com.alibaba.csp.sentinel.adapter.gateway.common.slot.GatewayFlowSlot#entry)

三、问题分析与方案设计

1、改造前问题分析

  1. 客户端重启
    1. 此时刷新列表,会读取到客户端的空数据并将控制台内存中的数据进行清空
  2. 控制台重启
    1. 此时刷新列表,会读取到客户端内存中的数据到控制台内存,但如果此时新增规则,会出现数据错误。因为重启后,内存中自增ID会被重置为1,因为新增数据时,会覆盖客户端ID为1的数据。
    2. 此时新增数据,会将新增的这一条数据保存到内存并推送客户端,从而将客户端原有数据给覆盖
  3. 因为不管是客户端重启还是控制台重启,都可能导致数据异常,因此API和规则的持久化是必须要做的

2、方案设计

只对sentinel-dashboard后端代码进行改动,不改动客户端和前端代码。(只针对网关限流进行改造,其它原理类似)

  1. 控制台收到客户端注册时,请求nacos配置中心,并将其配置存入本地内存
  2. 控制台对API或规则进行增删改时,更新本地内存中的数据,并同时推送当前服务下的所有API或者限流规则到nacos
  3. 控制台进行列表查询时,只查询本地内存中的数据
  4. ID生成采用雪花算法,避免重启后出现ID重复数据异常

四、实施方案

1、客户端(无需改造,只需常规配置)

  1. 引入依赖

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-sentinel-gateway</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.csp</groupId>
            <artifactId>sentinel-datasource-nacos</artifactId>
        </dependency>
  1. yaml配置参数
spring:
  cloud:
    sentinel:
      enable: true
      filter:
        enabled: false
      eager: true #立即加载
      transport:
        dashboard: localhost:9999
      datasource:
        gw-flow:
          nacos:
            group-id: sentinel_group
            namespace: sentinel
            data-id: ${spring.application.name}-sentinel-gateway-flow-rules # 在修改的sentinel 源码中定义的规则名
            server-addr: 192.168.0.7:8848
            username: xxxx
            password: xxxx
            data-type: json
            rule-type: gw-flow
        gw-api-group:
          nacos:
            group-id: sentinel_group
            namespace: sentinel
            data-id: ${spring.application.name}-sentinel-gateway-api-rules # 在修改的sentinel 源码中定义的规则名
            server-addr: 192.168.0.7:8848
            username: xxxx
            password: xxxx
            data-type: json
            rule-type: gw-api-group

2、sentinel-dashboard

(1)pom.xml去除sentinel-datasource-nacos的scope

        <!-- for Nacos rule publisher sample -->
        <dependency>
            <groupId>com.alibaba.csp</groupId>
            <artifactId>sentinel-datasource-nacos</artifactId>
            <!-- <scope>test</scope> -->
        </dependency>

(2)移动test包下的文件到main包下同级目录

image.png

(3)模仿test包下的文件编写网关API和限流规则的Provider和Publisher

@Component("gateWayFlowRulesNacosProvider")
public class GateWayFlowRulesNacosProvider implements DynamicRuleProvider<List<GatewayFlowRuleEntity>> {

    @Resource
    private ConfigService configService;
    @Resource
    private Converter<String, List<GatewayFlowRuleEntity>> converter;
    @Resource
    private NacosConfigProperties nacosConfigProperties;

    @Override
    public List<GatewayFlowRuleEntity> getRules(String appName) throws Exception {
        String rules = configService.getConfig(appName + NacosConfigUtil.GATEWAY_FLOW_DATA_ID_POSTFIX,
                nacosConfigProperties.getGroup(), 3000);
        if (StringUtil.isEmpty(rules)) {
            return new ArrayList<>();
        }
        return converter.convert(rules);
    }

}
@Component("gateWayFlowRulesNacosPunlisher")
public class GateWayFlowRulesNacosPunlisher implements DynamicRulePublisher<List<GatewayFlowRuleEntity>> {

    @Resource
    private ConfigService configService;
    @Resource
    private Converter<List<GatewayFlowRuleEntity>, String> converter;
    @Resource
    private NacosConfigProperties nacosConfigProperties;


    @Override
    public void publish(String app, List<GatewayFlowRuleEntity> rules) throws Exception {
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        configService.publishConfig(app + NacosConfigUtil.GATEWAY_FLOW_DATA_ID_POSTFIX,
                nacosConfigProperties.getGroup(), converter.convert(rules));
    }
}
@Component("getWayApiNacosProvider")
public class GetWayApiNacosProvider implements DynamicRuleProvider<List<ApiDefinitionEntity>> {
    @Resource
    private ConfigService configService;
    @Resource
    private Converter<String , List<ApiDefinitionEntity>> converter;
    @Resource
    private NacosConfigProperties nacosConfigProperties;
    @Override
    public List<ApiDefinitionEntity> getRules(String appName) throws Exception {
        String rules = configService.getConfig(appName+ NacosConfigUtil.GATEWAY_API_DATA_ID_POSTFIX
                ,nacosConfigProperties.getGroup(),3000);
        if(StringUtil.isEmpty(rules)){
            return new ArrayList<>();
        }
        return converter.convert(rules);
    }
}
@Component("getWayApiNacosPublisher")
public class GetWayApiNacosPublisher implements DynamicRulePublisher<List<ApiDefinitionEntity>> {

    @Resource
    private ConfigService configService;
    @Resource
    private Converter<List<ApiDefinitionEntity>, String> converter;
    @Resource
    private NacosConfigProperties nacosConfigProperties;

    @Override
    public void publish(String app, List<ApiDefinitionEntity> rules) throws Exception {
        AssertUtil.notEmpty(app, "app name cannot be empty");
        if (rules == null) {
            return;
        }
        configService.publishConfig(app + NacosConfigUtil.GATEWAY_API_DATA_ID_POSTFIX,
                nacosConfigProperties.getGroup(), converter.convert(rules));
    }
}

(4)修改GatewayApiController和GatewayFlowRuleController

  1. 注入NacosPublisher
  2. 把之前调用客户端的地方修改为调用Nacos
  3. list.json接口改为只从本地内存中获取数据
@RestController
@RequestMapping(value = "/gateway/api")
public class GatewayApiController {

    private final Logger logger = LoggerFactory.getLogger(GatewayApiController.class);

    @Autowired
    private InMemApiDefinitionStore repository;

    @Autowired
    @Qualifier("getWayApiNacosPublisher")
    private DynamicRulePublisher<List<ApiDefinitionEntity>> rulePublisher;


    @GetMapping("/list.json")
    @AuthAction(AuthService.PrivilegeType.READ_RULE)
    public Result<List<ApiDefinitionEntity>> queryApis(String app, String ip, Integer port) {

        if (StringUtil.isEmpty(app)) {
            return Result.ofFail(-1, "app can't be null or empty");
        }
        if (StringUtil.isEmpty(ip)) {
            return Result.ofFail(-1, "ip can't be null or empty");
        }
        if (port == null) {
            return Result.ofFail(-1, "port can't be null");
        }

        try {
            // List<ApiDefinitionEntity> apis = sentinelApiClient.fetchApis(app, ip, port).get();
            // List<ApiDefinitionEntity> apis = ruleProvider.getRules(app);
            //
            // repository.saveAll(apis);
            List<ApiDefinitionEntity> apis = repository.findAllByApp(app);
            return Result.ofSuccess(apis);
        } catch (Throwable throwable) {
            logger.error("queryApis error:", throwable);
            return Result.ofThrowable(-1, throwable);
        }
    }

    @PostMapping("/new.json")
    @AuthAction(AuthService.PrivilegeType.WRITE_RULE)
    public Result<ApiDefinitionEntity> addApi(HttpServletRequest request, @RequestBody AddApiReqVo reqVo) {

        String app = reqVo.getApp();
        if (StringUtil.isBlank(app)) {
            return Result.ofFail(-1, "app can't be null or empty");
        }

        ApiDefinitionEntity entity = new ApiDefinitionEntity();
        entity.setApp(app.trim());

        String ip = reqVo.getIp();
        if (StringUtil.isBlank(ip)) {
            return Result.ofFail(-1, "ip can't be null or empty");
        }
        entity.setIp(ip.trim());

        Integer port = reqVo.getPort();
        if (port == null) {
            return Result.ofFail(-1, "port can't be null");
        }
        entity.setPort(port);

        // API名称
        String apiName = reqVo.getApiName();
        if (StringUtil.isBlank(apiName)) {
            return Result.ofFail(-1, "apiName can't be null or empty");
        }
        entity.setApiName(apiName.trim());

        // 匹配规则列表
        List<ApiPredicateItemVo> predicateItems = reqVo.getPredicateItems();
        if (CollectionUtils.isEmpty(predicateItems)) {
            return Result.ofFail(-1, "predicateItems can't empty");
        }

        List<ApiPredicateItemEntity> predicateItemEntities = new ArrayList<>();
        for (ApiPredicateItemVo predicateItem : predicateItems) {
            ApiPredicateItemEntity predicateItemEntity = new ApiPredicateItemEntity();

            // 匹配模式
            Integer matchStrategy = predicateItem.getMatchStrategy();
            if (!Arrays.asList(URL_MATCH_STRATEGY_EXACT, URL_MATCH_STRATEGY_PREFIX, URL_MATCH_STRATEGY_REGEX).contains(matchStrategy)) {
                return Result.ofFail(-1, "invalid matchStrategy: " + matchStrategy);
            }
            predicateItemEntity.setMatchStrategy(matchStrategy);

            // 匹配串
            String pattern = predicateItem.getPattern();
            if (StringUtil.isBlank(pattern)) {
                return Result.ofFail(-1, "pattern can't be null or empty");
            }
            predicateItemEntity.setPattern(pattern);

            predicateItemEntities.add(predicateItemEntity);
        }
        entity.setPredicateItems(new LinkedHashSet<>(predicateItemEntities));

        // 检查API名称不能重复
        List<ApiDefinitionEntity> allApis = repository.findAllByMachine(MachineInfo.of(app.trim(), ip.trim(), port));
        if (allApis.stream().map(o -> o.getApiName()).anyMatch(o -> o.equals(apiName.trim()))) {
            return Result.ofFail(-1, "apiName exists: " + apiName);
        }

        Date date = new Date();
        entity.setGmtCreate(date);
        entity.setGmtModified(date);

        try {
            entity = repository.save(entity);
        } catch (Throwable throwable) {
            logger.error("add gateway api error:", throwable);
            return Result.ofThrowable(-1, throwable);
        }

        // if (!publishApis(app, ip, port)) {
        //     logger.warn("publish gateway apis fail after add");
        // }
        publishApis(app);

        return Result.ofSuccess(entity);
    }

    @PostMapping("/save.json")
    @AuthAction(AuthService.PrivilegeType.WRITE_RULE)
    public Result<ApiDefinitionEntity> updateApi(@RequestBody UpdateApiReqVo reqVo) {
        String app = reqVo.getApp();
        if (StringUtil.isBlank(app)) {
            return Result.ofFail(-1, "app can't be null or empty");
        }

        Long id = reqVo.getId();
        if (id == null) {
            return Result.ofFail(-1, "id can't be null");
        }

        ApiDefinitionEntity entity = repository.findById(id);
        if (entity == null) {
            return Result.ofFail(-1, "api does not exist, id=" + id);
        }

        // 匹配规则列表
        List<ApiPredicateItemVo> predicateItems = reqVo.getPredicateItems();
        if (CollectionUtils.isEmpty(predicateItems)) {
            return Result.ofFail(-1, "predicateItems can't empty");
        }

        List<ApiPredicateItemEntity> predicateItemEntities = new ArrayList<>();
        for (ApiPredicateItemVo predicateItem : predicateItems) {
            ApiPredicateItemEntity predicateItemEntity = new ApiPredicateItemEntity();

            // 匹配模式
            int matchStrategy = predicateItem.getMatchStrategy();
            if (!Arrays.asList(URL_MATCH_STRATEGY_EXACT, URL_MATCH_STRATEGY_PREFIX, URL_MATCH_STRATEGY_REGEX).contains(matchStrategy)) {
                return Result.ofFail(-1, "Invalid matchStrategy: " + matchStrategy);
            }
            predicateItemEntity.setMatchStrategy(matchStrategy);

            // 匹配串
            String pattern = predicateItem.getPattern();
            if (StringUtil.isBlank(pattern)) {
                return Result.ofFail(-1, "pattern can't be null or empty");
            }
            predicateItemEntity.setPattern(pattern);

            predicateItemEntities.add(predicateItemEntity);
        }
        entity.setPredicateItems(new LinkedHashSet<>(predicateItemEntities));

        Date date = new Date();
        entity.setGmtModified(date);

        try {
            entity = repository.save(entity);
        } catch (Throwable throwable) {
            logger.error("update gateway api error:", throwable);
            return Result.ofThrowable(-1, throwable);
        }

        // if (!publishApis(app, entity.getIp(), entity.getPort())) {
        //     logger.warn("publish gateway apis fail after update");
        // }
        publishApis(app);

        return Result.ofSuccess(entity);
    }

    @PostMapping("/delete.json")
    @AuthAction(AuthService.PrivilegeType.DELETE_RULE)

    public Result<Long> deleteApi(Long id) {
        if (id == null) {
            return Result.ofFail(-1, "id can't be null");
        }

        ApiDefinitionEntity oldEntity = repository.findById(id);
        if (oldEntity == null) {
            return Result.ofSuccess(null);
        }

        try {
            repository.delete(id);
        } catch (Throwable throwable) {
            logger.error("delete gateway api error:", throwable);
            return Result.ofThrowable(-1, throwable);
        }

        // if (!publishApis(oldEntity.getApp(), oldEntity.getIp(), oldEntity.getPort())) {
        //     logger.warn("publish gateway apis fail after delete");
        // }
        publishApis(oldEntity.getApp());

        return Result.ofSuccess(id);
    }

    // private boolean publishApis(String app, String ip, Integer port) {
    //     List<ApiDefinitionEntity> apis = repository.findAllByMachine(MachineInfo.of(app, ip, port));
    //     return sentinelApiClient.modifyApis(app, ip, port, apis);
    // }

    /**
     * 把配置推给nacos中
     *
     * @param app
     * @throws Exception
     */
    private void publishApis(String app) {
        List<ApiDefinitionEntity> rules = repository.findAllByApp(app);
        try {
            rulePublisher.publish(app, rules);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
@RestController
@RequestMapping(value = "/gateway/flow")
public class GatewayFlowRuleController {

    private final Logger logger = LoggerFactory.getLogger(GatewayFlowRuleController.class);

    @Autowired
    private InMemGatewayFlowRuleStore repository;

    @Autowired
    @Qualifier("gateWayFlowRulesNacosPunlisher")
    private DynamicRulePublisher<List<GatewayFlowRuleEntity>> rulePublisher;


    @GetMapping("/list.json")
    @AuthAction(AuthService.PrivilegeType.READ_RULE)
    public Result<List<GatewayFlowRuleEntity>> queryFlowRules(String app, String ip, Integer port) {

        if (StringUtil.isEmpty(app)) {
            return Result.ofFail(-1, "app can't be null or empty");
        }
        if (StringUtil.isEmpty(ip)) {
            return Result.ofFail(-1, "ip can't be null or empty");
        }
        if (port == null) {
            return Result.ofFail(-1, "port can't be null");
        }

        try {
            // List<GatewayFlowRuleEntity> rules = sentinelApiClient.fetchGatewayFlowRules(app, ip, port).get();
            // List<GatewayFlowRuleEntity> rules = ruleProvider.getRules(app);
            //
            // repository.saveAll(rules);
            List<GatewayFlowRuleEntity> rules = repository.findAllByApp(app);
            return Result.ofSuccess(rules);
        } catch (Throwable throwable) {
            logger.error("query gateway flow rules error:", throwable);
            return Result.ofThrowable(-1, throwable);
        }
    }

    @PostMapping("/new.json")
    @AuthAction(AuthService.PrivilegeType.WRITE_RULE)
    public Result<GatewayFlowRuleEntity> addFlowRule(@RequestBody AddFlowRuleReqVo reqVo) {

        String app = reqVo.getApp();
        if (StringUtil.isBlank(app)) {
            return Result.ofFail(-1, "app can't be null or empty");
        }

        GatewayFlowRuleEntity entity = new GatewayFlowRuleEntity();
        entity.setApp(app.trim());

        String ip = reqVo.getIp();
        if (StringUtil.isBlank(ip)) {
            return Result.ofFail(-1, "ip can't be null or empty");
        }
        entity.setIp(ip.trim());

        Integer port = reqVo.getPort();
        if (port == null) {
            return Result.ofFail(-1, "port can't be null");
        }
        entity.setPort(port);

        // API类型, Route ID或API分组
        Integer resourceMode = reqVo.getResourceMode();
        if (resourceMode == null) {
            return Result.ofFail(-1, "resourceMode can't be null");
        }
        if (!Arrays.asList(RESOURCE_MODE_ROUTE_ID, RESOURCE_MODE_CUSTOM_API_NAME).contains(resourceMode)) {
            return Result.ofFail(-1, "invalid resourceMode: " + resourceMode);
        }
        entity.setResourceMode(resourceMode);

        // API名称
        String resource = reqVo.getResource();
        if (StringUtil.isBlank(resource)) {
            return Result.ofFail(-1, "resource can't be null or empty");
        }
        entity.setResource(resource.trim());

        // 针对请求属性
        GatewayParamFlowItemVo paramItem = reqVo.getParamItem();
        if (paramItem != null) {
            GatewayParamFlowItemEntity itemEntity = new GatewayParamFlowItemEntity();
            entity.setParamItem(itemEntity);

            // 参数属性 0-ClientIP 1-Remote Host 2-Header 3-URL参数 4-Cookie
            Integer parseStrategy = paramItem.getParseStrategy();
            if (!Arrays.asList(PARAM_PARSE_STRATEGY_CLIENT_IP, PARAM_PARSE_STRATEGY_HOST, PARAM_PARSE_STRATEGY_HEADER
                    , PARAM_PARSE_STRATEGY_URL_PARAM, PARAM_PARSE_STRATEGY_COOKIE).contains(parseStrategy)) {
                return Result.ofFail(-1, "invalid parseStrategy: " + parseStrategy);
            }
            itemEntity.setParseStrategy(paramItem.getParseStrategy());

            // 当参数属性为2-Header 3-URL参数 4-Cookie时,参数名称必填
            if (Arrays.asList(PARAM_PARSE_STRATEGY_HEADER, PARAM_PARSE_STRATEGY_URL_PARAM, PARAM_PARSE_STRATEGY_COOKIE).contains(parseStrategy)) {
                // 参数名称
                String fieldName = paramItem.getFieldName();
                if (StringUtil.isBlank(fieldName)) {
                    return Result.ofFail(-1, "fieldName can't be null or empty");
                }
                itemEntity.setFieldName(paramItem.getFieldName());
            }

            String pattern = paramItem.getPattern();
            // 如果匹配串不为空,验证匹配模式
            if (StringUtil.isNotEmpty(pattern)) {
                itemEntity.setPattern(pattern);
                Integer matchStrategy = paramItem.getMatchStrategy();
                if (!Arrays.asList(PARAM_MATCH_STRATEGY_EXACT, PARAM_MATCH_STRATEGY_CONTAINS, PARAM_MATCH_STRATEGY_REGEX).contains(matchStrategy)) {
                    return Result.ofFail(-1, "invalid matchStrategy: " + matchStrategy);
                }
                itemEntity.setMatchStrategy(matchStrategy);
            }
        }

        // 阈值类型 0-线程数 1-QPS
        Integer grade = reqVo.getGrade();
        if (grade == null) {
            return Result.ofFail(-1, "grade can't be null");
        }
        if (!Arrays.asList(FLOW_GRADE_THREAD, FLOW_GRADE_QPS).contains(grade)) {
            return Result.ofFail(-1, "invalid grade: " + grade);
        }
        entity.setGrade(grade);

        // QPS阈值
        Double count = reqVo.getCount();
        if (count == null) {
            return Result.ofFail(-1, "count can't be null");
        }
        if (count < 0) {
            return Result.ofFail(-1, "count should be at lease zero");
        }
        entity.setCount(count);

        // 间隔
        Long interval = reqVo.getInterval();
        if (interval == null) {
            return Result.ofFail(-1, "interval can't be null");
        }
        if (interval <= 0) {
            return Result.ofFail(-1, "interval should be greater than zero");
        }
        entity.setInterval(interval);

        // 间隔单位
        Integer intervalUnit = reqVo.getIntervalUnit();
        if (intervalUnit == null) {
            return Result.ofFail(-1, "intervalUnit can't be null");
        }
        if (!Arrays.asList(INTERVAL_UNIT_SECOND, INTERVAL_UNIT_MINUTE, INTERVAL_UNIT_HOUR, INTERVAL_UNIT_DAY).contains(intervalUnit)) {
            return Result.ofFail(-1, "Invalid intervalUnit: " + intervalUnit);
        }
        entity.setIntervalUnit(intervalUnit);
        entity.calIntervalSec();
        // 流控方式 0-快速失败 2-匀速排队
        Integer controlBehavior = reqVo.getControlBehavior();
        if (controlBehavior == null) {
            return Result.ofFail(-1, "controlBehavior can't be null");
        }
        if (!Arrays.asList(CONTROL_BEHAVIOR_DEFAULT, CONTROL_BEHAVIOR_RATE_LIMITER).contains(controlBehavior)) {
            return Result.ofFail(-1, "invalid controlBehavior: " + controlBehavior);
        }
        entity.setControlBehavior(controlBehavior);

        if (CONTROL_BEHAVIOR_DEFAULT == controlBehavior) {
            // 0-快速失败, 则Burst size必填
            Integer burst = reqVo.getBurst();
            if (burst == null) {
                return Result.ofFail(-1, "burst can't be null");
            }
            if (burst < 0) {
                return Result.ofFail(-1, "invalid burst: " + burst);
            }
            entity.setBurst(burst);
        } else if (CONTROL_BEHAVIOR_RATE_LIMITER == controlBehavior) {
            // 1-匀速排队, 则超时时间必填
            Integer maxQueueingTimeoutMs = reqVo.getMaxQueueingTimeoutMs();
            if (maxQueueingTimeoutMs == null) {
                return Result.ofFail(-1, "maxQueueingTimeoutMs can't be null");
            }
            if (maxQueueingTimeoutMs < 0) {
                return Result.ofFail(-1, "invalid maxQueueingTimeoutMs: " + maxQueueingTimeoutMs);
            }
            entity.setMaxQueueingTimeoutMs(maxQueueingTimeoutMs);
        }

        Date date = new Date();
        entity.setGmtCreate(date);
        entity.setGmtModified(date);

        try {
            entity = repository.save(entity);
        } catch (Throwable throwable) {
            logger.error("add gateway flow rule error:", throwable);
            return Result.ofThrowable(-1, throwable);
        }

        // if (!publishRules(app, ip, port)) {
        //     logger.warn("publish gateway flow rules fail after add");
        // }
        publishRules(app);

        return Result.ofSuccess(entity);
    }

    @PostMapping("/save.json")
    @AuthAction(AuthService.PrivilegeType.WRITE_RULE)
    public Result<GatewayFlowRuleEntity> updateFlowRule(@RequestBody UpdateFlowRuleReqVo reqVo) {

        String app = reqVo.getApp();
        if (StringUtil.isBlank(app)) {
            return Result.ofFail(-1, "app can't be null or empty");
        }

        Long id = reqVo.getId();
        if (id == null) {
            return Result.ofFail(-1, "id can't be null");
        }

        GatewayFlowRuleEntity entity = repository.findById(id);
        if (entity == null) {
            return Result.ofFail(-1, "gateway flow rule does not exist, id=" + id);
        }

        // 针对请求属性
        GatewayParamFlowItemVo paramItem = reqVo.getParamItem();
        if (paramItem != null) {
            GatewayParamFlowItemEntity itemEntity = new GatewayParamFlowItemEntity();
            entity.setParamItem(itemEntity);

            // 参数属性 0-ClientIP 1-Remote Host 2-Header 3-URL参数 4-Cookie
            Integer parseStrategy = paramItem.getParseStrategy();
            if (!Arrays.asList(PARAM_PARSE_STRATEGY_CLIENT_IP, PARAM_PARSE_STRATEGY_HOST, PARAM_PARSE_STRATEGY_HEADER
                    , PARAM_PARSE_STRATEGY_URL_PARAM, PARAM_PARSE_STRATEGY_COOKIE).contains(parseStrategy)) {
                return Result.ofFail(-1, "invalid parseStrategy: " + parseStrategy);
            }
            itemEntity.setParseStrategy(paramItem.getParseStrategy());

            // 当参数属性为2-Header 3-URL参数 4-Cookie时,参数名称必填
            if (Arrays.asList(PARAM_PARSE_STRATEGY_HEADER, PARAM_PARSE_STRATEGY_URL_PARAM, PARAM_PARSE_STRATEGY_COOKIE).contains(parseStrategy)) {
                // 参数名称
                String fieldName = paramItem.getFieldName();
                if (StringUtil.isBlank(fieldName)) {
                    return Result.ofFail(-1, "fieldName can't be null or empty");
                }
                itemEntity.setFieldName(paramItem.getFieldName());
            }

            String pattern = paramItem.getPattern();
            // 如果匹配串不为空,验证匹配模式
            if (StringUtil.isNotEmpty(pattern)) {
                itemEntity.setPattern(pattern);
                Integer matchStrategy = paramItem.getMatchStrategy();
                if (!Arrays.asList(PARAM_MATCH_STRATEGY_EXACT, PARAM_MATCH_STRATEGY_CONTAINS, PARAM_MATCH_STRATEGY_REGEX).contains(matchStrategy)) {
                    return Result.ofFail(-1, "invalid matchStrategy: " + matchStrategy);
                }
                itemEntity.setMatchStrategy(matchStrategy);
            }
        } else {
            entity.setParamItem(null);
        }

        // 阈值类型 0-线程数 1-QPS
        Integer grade = reqVo.getGrade();
        if (grade == null) {
            return Result.ofFail(-1, "grade can't be null");
        }
        if (!Arrays.asList(FLOW_GRADE_THREAD, FLOW_GRADE_QPS).contains(grade)) {
            return Result.ofFail(-1, "invalid grade: " + grade);
        }
        entity.setGrade(grade);

        // QPS阈值
        Double count = reqVo.getCount();
        if (count == null) {
            return Result.ofFail(-1, "count can't be null");
        }
        if (count < 0) {
            return Result.ofFail(-1, "count should be at lease zero");
        }
        entity.setCount(count);

        // 间隔
        Long interval = reqVo.getInterval();
        if (interval == null) {
            return Result.ofFail(-1, "interval can't be null");
        }
        if (interval <= 0) {
            return Result.ofFail(-1, "interval should be greater than zero");
        }
        entity.setInterval(interval);

        // 间隔单位
        Integer intervalUnit = reqVo.getIntervalUnit();
        if (intervalUnit == null) {
            return Result.ofFail(-1, "intervalUnit can't be null");
        }
        if (!Arrays.asList(INTERVAL_UNIT_SECOND, INTERVAL_UNIT_MINUTE, INTERVAL_UNIT_HOUR, INTERVAL_UNIT_DAY).contains(intervalUnit)) {
            return Result.ofFail(-1, "Invalid intervalUnit: " + intervalUnit);
        }
        entity.setIntervalUnit(intervalUnit);
        entity.calIntervalSec();
        // 流控方式 0-快速失败 2-匀速排队
        Integer controlBehavior = reqVo.getControlBehavior();
        if (controlBehavior == null) {
            return Result.ofFail(-1, "controlBehavior can't be null");
        }
        if (!Arrays.asList(CONTROL_BEHAVIOR_DEFAULT, CONTROL_BEHAVIOR_RATE_LIMITER).contains(controlBehavior)) {
            return Result.ofFail(-1, "invalid controlBehavior: " + controlBehavior);
        }
        entity.setControlBehavior(controlBehavior);

        if (CONTROL_BEHAVIOR_DEFAULT == controlBehavior) {
            // 0-快速失败, 则Burst size必填
            Integer burst = reqVo.getBurst();
            if (burst == null) {
                return Result.ofFail(-1, "burst can't be null");
            }
            if (burst < 0) {
                return Result.ofFail(-1, "invalid burst: " + burst);
            }
            entity.setBurst(burst);
        } else if (CONTROL_BEHAVIOR_RATE_LIMITER == controlBehavior) {
            // 2-匀速排队, 则超时时间必填
            Integer maxQueueingTimeoutMs = reqVo.getMaxQueueingTimeoutMs();
            if (maxQueueingTimeoutMs == null) {
                return Result.ofFail(-1, "maxQueueingTimeoutMs can't be null");
            }
            if (maxQueueingTimeoutMs < 0) {
                return Result.ofFail(-1, "invalid maxQueueingTimeoutMs: " + maxQueueingTimeoutMs);
            }
            entity.setMaxQueueingTimeoutMs(maxQueueingTimeoutMs);
        }

        Date date = new Date();
        entity.setGmtModified(date);

        try {
            entity = repository.save(entity);
        } catch (Throwable throwable) {
            logger.error("update gateway flow rule error:", throwable);
            return Result.ofThrowable(-1, throwable);
        }

        // if (!publishRules(app, entity.getIp(), entity.getPort())) {
        //     logger.warn("publish gateway flow rules fail after update");
        // }

        publishRules(app);

        return Result.ofSuccess(entity);
    }


    @PostMapping("/delete.json")
    @AuthAction(AuthService.PrivilegeType.DELETE_RULE)
    public Result<Long> deleteFlowRule(Long id) {

        if (id == null) {
            return Result.ofFail(-1, "id can't be null");
        }

        GatewayFlowRuleEntity oldEntity = repository.findById(id);
        if (oldEntity == null) {
            return Result.ofSuccess(null);
        }

        try {
            repository.delete(id);
        } catch (Throwable throwable) {
            logger.error("delete gateway flow rule error:", throwable);
            return Result.ofThrowable(-1, throwable);
        }

        // if (!publishRules(oldEntity.getApp(), oldEntity.getIp(), oldEntity.getPort())) {
        //     logger.warn("publish gateway flow rules fail after delete");
        // }

        publishRules(oldEntity.getApp());

        return Result.ofSuccess(id);
    }

    // private boolean publishRules(String app, String ip, Integer port) {
    //     List<GatewayFlowRuleEntity> rules = repository.findAllByMachine(MachineInfo.of(app, ip, port));
    //     return sentinelApiClient.modifyGatewayFlowRules(app, ip, port, rules);
    // }

    /**
     * 把配置推给nacos中
     *
     * @param app
     * @throws Exception
     */
    private boolean publishRules(String app) {
        List<GatewayFlowRuleEntity> rules = repository.findAllByApp(app);
        try {
            rulePublisher.publish(app, rules);
        } catch (Exception e) {
            e.printStackTrace();
        }

        return true;
    }

}

(5)修改AppManagement应用注册逻辑,如果是第一次注册,从Nacos拉取配置到本地内存

@Component
public class AppManagement implements MachineDiscovery {

    private final Logger logger = LoggerFactory.getLogger(AppManagement.class);

    @Autowired
    private ApplicationContext context;

    @Autowired
    @Qualifier("gateWayFlowRulesNacosProvider")
    private DynamicRuleProvider<List<GatewayFlowRuleEntity>> flowRuleProvider;

    @Autowired
    private InMemGatewayFlowRuleStore flowRuleRepository;


    @Autowired
    @Qualifier("getWayApiNacosProvider")
    private DynamicRuleProvider<List<ApiDefinitionEntity>> apiProvider;

    @Autowired
    private InMemApiDefinitionStore apiRepository;

    private MachineDiscovery machineDiscovery;

    @PostConstruct
    public void init() {
        machineDiscovery = context.getBean(SimpleMachineDiscovery.class);
    }

    @Override
    public Set<AppInfo> getBriefApps() {
        return machineDiscovery.getBriefApps();
    }

    @Override
    public long addMachine(MachineInfo machineInfo) {
        // 是网关并且是第一次注册
        String app = machineInfo.getApp();
        if (isGateway(machineInfo.getAppType()) && getDetailApp(app) == null) {
            pullGatewayNacosApi(app);
            pullGatewayNacosFlowRules(app);
        }
        return machineDiscovery.addMachine(machineInfo);
    }

    @Override
    public boolean removeMachine(String app, String ip, int port) {
        return machineDiscovery.removeMachine(app, ip, port);
    }

    @Override
    public List<String> getAppNames() {
        return machineDiscovery.getAppNames();
    }

    @Override
    public AppInfo getDetailApp(String app) {
        return machineDiscovery.getDetailApp(app);
    }

    @Override
    public void removeApp(String app) {
        machineDiscovery.removeApp(app);
    }

    public boolean isValidMachineOfApp(String app, String ip) {
        if (StringUtil.isEmpty(app)) {
            return false;
        }
        return Optional.ofNullable(getDetailApp(app))
                .flatMap(a -> a.getMachine(ip))
                .isPresent();
    }

    private boolean isGateway(Integer appType) {
        return appType == 1;
    }

    private void pullGatewayNacosFlowRules(String app) {
        try {
            List<GatewayFlowRuleEntity> rules = flowRuleProvider.getRules(app);
            flowRuleRepository.saveAll(rules);
        } catch (Exception e) {
            logger.error("pull nacos flow rules error, app:{}", app, e);
        }
    }

    private void pullGatewayNacosApi(String app) {
        try {
            List<ApiDefinitionEntity> api = apiProvider.getRules(app);
            apiRepository.saveAll(api);
        } catch (Exception e) {
            logger.error("pull nacos api error, app:{}", app, e);
        }
    }

}

(6)修改内存自增ID为雪花算法(需要引入hutool)

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.15</version>
        </dependency>
@Component
public class InMemApiDefinitionStore extends InMemoryRuleRepositoryAdapter<ApiDefinitionEntity> {


    @Override
    protected long nextId() {
        return IdUtil.getSnowflakeNextId();
    }
}
@Component
public class InMemGatewayFlowRuleStore extends InMemoryRuleRepositoryAdapter<GatewayFlowRuleEntity> {

    @Override
    protected long nextId() {
        return IdUtil.getSnowflakeNextId();
    }
}

(7)还有小BUG修复,完整GIT提交日志如下(完整代码在文末)

image.png

五、改造后流程分析

Sentinel限流流程图 (3).jpg

1、客户端

  1. 初始化Sentinel配置(com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration#init)
  2. 上报心跳到sentinel-dashboard,其中包含ip、port、appName、appType等关键信息(com.alibaba.csp.sentinel.transport.init.HeartbeatSenderInitFunc#init)
  3. 控制台收到第一次注册时,拉取Nacos配置到本地内存
  4. 初始化Nacos配置监听器,实时监听nacos配置变化(com.alibaba.csp.sentinel.datasource.nacos.NacosDataSource#initNacosListener)
  5. 加载Nacos初始化配置到本地内存(com.alibaba.csp.sentinel.datasource.nacos.NacosDataSource#loadInitialConfig)

2、sentinel-dashboard

  1. 新增、修改、删除:
    1. 后台页面对API或者限流规则进行新增、修改、删除
    2. 对本地内存中的数据进行操作
    3. 再次查询本地内存中数据,并将其推送到nacos
  2. 列表查询
    1. 查询本地内存InMemoryRuleRepositoryAdapter
    2. 返回列表数据到后台
  3. 定时拉取监控指标数据
    1. 远程调用客户端接口查询API或规则(http://clientIp:8719/metric)
    2. 保存到本地内存InMemoryMetricsRepository

3、请求限流(和改造前没有区别)

六、遇到的问题

Q:控制台配置的间隔单位和间隔时间不生效
A:控制台的参数名是interval和intervalUnit,但客户端读取的是intervalSec,需要进行转换并存入Nacos

Q:控制台内存中维护的自增ID,如果控制台重启,自增ID会被重置
A:ID创建修改为雪花算法,ID返回给前端时,格式化为String类型(以防Long传到前端精度丢失)

Q:如果一个链接没有被请求过,控制台修改配置后,客户端会报naco notify-error npe
A:该问题已在1.8.1被修复,可将客户端版本升级到1.8.1及以上,https://github.com/alibaba/Sentinel/pull/1729

Q:控制台是否支持集群部署
A:暂时不支持。因为现在控制台的数据都是存储在本地内存,如果要实现集群部署,需要引入外部数据存储工具,例如:Mysql、Redis

Q:阿里云AHAS在项目中的实际应用
A:网关侧接入AHAS,并将项目中所有需要外部限流的接口路径配置在AHAS中,并根据热点参数对IP、Header、Token进行多维度限流

Q:阿里云MSE微服务治理为什么不能满足需求
A:MSE进行了微服务职责的进一步划分,网关侧只能进行路由级别限流,应用侧才能进行接口级别限流。如果要实现AHAS网关限流同样的功能,需要将所有微服务加入到MSE微服务治理中,对我们来说成本过高

Q:是否还有其它问题
A:还比较多,比如:
  1、控制台监控数据未持久化,只能看到最近5分钟数据,并且可能有内存溢出的风险
  2、控制台未接入nacos配置中心
  3、控制台不支持多账号密码
  4、可能有内存和Nacos双写一致性问题

七、总结

我们目前只用到了Sentinel网关限流中热点参数限流,所以只对网关规则进行了持久化改造。如果有其它需求,大致的修改逻辑应该是差不多的。虽然改造后的Sentinel控制台还存在很多问题,但通过上面的流程分析,控制台只是作为规则可视化配置的工具,对性能要求不高,我们也没有监控持久化的需求,所以暂时就不进行其它改造了。

八、参考资料

Sentinel-官网

CSDN-Sentinel的gateway规则持久化改造

Github-文章源码