掘金 后端 ( ) • 2024-03-31 22:56

highlight: a11y-dark theme: z-blue

go-admin 如何使用go-sentinel

go-admin 基于Gin + Vue + Element UI & Arco Design & Ant Design 的前后端分离权限管理系统脚手架(包含了:多租户的支持,基础用户管理功能,jwt鉴权,代码生成器,RBAC资源控制,表单构建,定时任务等

定义中间件

源码在/common/middleware/sentinel.go

Sentinel 系统BBR自适应限流从整体维度对应用入口流量进行控制,结合应用的 Load、CPU 使用率、总体平均 RT、入口 QPS 和并发线程数等几个维度的监控指标,通过自适应的流控策略,让系统的入口流量和系统的负载达到一个平衡,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。

package middleware

import (
	"github.com/alibaba/sentinel-golang/core/system"
	sentinel "github.com/alibaba/sentinel-golang/pkg/adapters/gin"
	"github.com/gin-gonic/gin"

	log "github.com/go-admin-team/go-admin-core/logger"
)

// Sentinel 限流
func Sentinel() gin.HandlerFunc {
	if _, err := system.LoadRules([]*system.Rule{
		{
			MetricType:   system.InboundQPS,
			TriggerCount: 200,
			Strategy:     system.BBR,
		},
	}); err != nil {
		log.Fatalf("Unexpected error: %+v", err)
	}
	return sentinel.SentinelMiddleware(
		sentinel.WithBlockFallback(func(ctx *gin.Context) {
			ctx.AbortWithStatusJSON(200, map[string]interface{}{
				"msg":  "too many request; the quota used up!",
				"code": 500,
			})
		}),
	)
}

限流规则: 在 Sentinel() 函数内部,使用 system.LoadRules 函数定义了限流规则:

  • MetricType 设定为 system.InboundQPS,表示限制请求的速率。
  • TriggerCount 设置为 200,表示每秒允许的请求数量为 200。
  • Strategy 设定为 system.BBR,这是一种流量控制策略,表示使用 BBR 算法进行限流。

回调函数: 使用 sentinel.SentinelMiddleware 函数创建了一个 Gin 中间件,其中包含了一个 WithBlockFallback 选项,该选项定义了当请求被限流时的回调函数。在这个回调函数中,会返回一个 JSON 响应,提示客户端请求过多,已经达到配额上限。

使用中间件

/cmd/api/server.go

func initRouter() {
	var r *gin.Engine
	h := sdk.Runtime.GetEngine()
	if h == nil {
		h = gin.New()
		sdk.Runtime.SetEngine(h)
	}
	switch h.(type) {
	case *gin.Engine:
		r = h.(*gin.Engine)
	default:
		log.Fatal("not support other engine")
		//os.Exit(-1)
	}
	if config.SslConfig.Enable {
		r.Use(handler.TlsHandler())
	}
	//r.Use(middleware.Metrics())
	r.Use(common.Sentinel()).
		Use(common.RequestId(pkg.TrafficKey)).
		Use(api.SetRequestLogger)

	common.InitMiddleware(r)

}

jupiter 如何使用go-sentinel

jupiter Jupiter 是斗鱼开源的一套微服务治理框架,提供丰富的后台功能,管理应用的资源、配置,应用的性能、配置等可视化

定义配置

/pkg/core/sentinel/config.go

对官方库的二次封装。支持从 ETCD 数据源和其他数据源加载规则,并进行相应的初始化工作

  1. type Config struct { ... }: 定义了 Sentinel 的配置结构体 Config,其中包含了一系列配置项,如启用状态、数据源、熔断降级规则、流量控制规则和系统保护规则等。
  2. func StdConfig() Config { ... }: 提供了一个标准的配置获取函数,用于获取标准的 Sentinel 配置。
  3. func RawConfig(key string) Config { ... }: 根据给定的配置键获取原始配置,如果配置不存在或解析失败,则返回默认配置。
  4. func DefaultConfig() Config { ... }: 返回默认的 Sentinel 配置。
  5. func (e Config) exitHandler(entry *SentinelEntry, ctx *EntryContext) error { ... }: 定义了 Sentinel 规则执行完成后的退出处理函数,用于统计执行结果并处理异常。
  6. func (c Config) Entry(resource string, opts ...EntryOption) (*SentinelEntry, *BlockError) { ... }: 根据给定的资源和选项创建 Sentinel 规则的入口。
  7. func (c Config) Build() error { ... }: 构建 Sentinel 配置,主要进行 Sentinel 的初始化工作,包括加载规则和注册状态监听器。
  8. func (c Config) loadRules() { ... }: 加载 Sentinel 规则,根据配置的数据源加载对应的规则。
  9. func checkSrcComplianceJson(src []byte) (bool, error) { ... }: 检查 JSON 格式的数据源是否合规。
  10. func initRules(client *clientv3.Client, key string, h datasource.PropertyHandler) error { ... }: 根据给定的客户端和键初始化规则。
  11. func circuitBreakerRuleJsonArrayParser(src []byte) (interface{}, error) { ... }: 解析 JSON 格式的熔断降级规则。
package sentinel

import (
	"encoding/json"
	"fmt"

	sentinel "github.com/alibaba/sentinel-golang/api"
	"github.com/alibaba/sentinel-golang/core/base"
	"github.com/alibaba/sentinel-golang/core/circuitbreaker"
	"github.com/alibaba/sentinel-golang/core/config"
	"github.com/alibaba/sentinel-golang/core/flow"
	"github.com/alibaba/sentinel-golang/core/system"
	"github.com/alibaba/sentinel-golang/ext/datasource"
	"github.com/douyu/jupiter/pkg"
	"github.com/douyu/jupiter/pkg/client/etcdv3"
	"github.com/douyu/jupiter/pkg/conf"
	"github.com/douyu/jupiter/pkg/core/constant"
	"github.com/douyu/jupiter/pkg/xlog"
	clientv3 "go.etcd.io/etcd/client/v3"
	"go.uber.org/zap"
)

type Config struct {
	Enable     bool   `toml:"enable"`
	Datasource string `toml:"datasource"`
	EtcdRawKey string `toml:"etcdRawKey"`
	// 熔断降级
	CbKey   string                `toml:"cbKey"`
	CbRules []*CircuitBreakerRule `toml:"cbRules"`
	// 流量控制
	FlowKey   string       `toml:"flowKey"`
	FlowRules []*flow.Rule `toml:"flowRules"`
	// 系统保护
	SystemKey   string         `toml:"systemKey"`
	SystemRules []*system.Rule `toml:"systemRules"`
}

func StdConfig() Config {
	return RawConfig(constant.ConfigKey("sentinel"))
}

func RawConfig(key string) Config {
	config := DefaultConfig()

	if conf.Get(constant.ConfigKey("sentinel")) == nil {
		return config
	}

	if err := conf.UnmarshalKey(key, &config, conf.TagName("toml")); err != nil {
		xlog.Jupiter().Warn("unmarshal config", zap.String("key", key), zap.Error(err))

		return config
	}

	return config
}

func DefaultConfig() Config {
	return Config{
		Enable:     false,
		Datasource: SENTINEL_DATASOURCE_ETCD,
		EtcdRawKey: "app.registry.etcd",
		// 熔断降级规则 /wsd-sentinel/{language}/{app}/{idc}/{env}/{ruleType}=${value}
		CbKey: "/wsd-sentinel/go/%s/%s/%s/degrade",
		// 流量控制规则 /wsd-sentinel/{language}/{app}/{idc}/{env}/{ruleType}=${value}
		FlowKey: "/wsd-sentinel/go/%s/%s/%s/flow",
		// 系统保护规则 /wsd-sentinel/{language}/{app}/{idc}/{env}/{ruleType}=${value}
		SystemKey: "/wsd-sentinel/go/%s/%s/%s/system",
	}
}

func (e Config) exitHandler(entry *SentinelEntry, ctx *EntryContext) error {
	if ctx.Err() != nil {
		sentinelExceptionsThrown.WithLabelValues(labels(entry.Resource().Name())...).Inc()
	} else {
		sentinelSuccess.WithLabelValues(labels(entry.Resource().Name())...).Inc()
	}

	sentinelRt.WithLabelValues(labels(entry.Resource().Name())...).Observe(float64(ctx.Rt()) / 1000)

	return ctx.Err()
}

func (c Config) Entry(resource string, opts ...EntryOption) (*SentinelEntry, *BlockError) {
	if !c.Enable {
		return base.NewSentinelEntry(nil, nil, nil), nil
	}

	a, b := sentinel.Entry(resource, opts...)

	sentinelReqeust.WithLabelValues(labels(resource)...).Inc()

	if b != nil {
		sentinelBlocked.WithLabelValues(labels(resource)...).Inc()

		return a, b
	}

	a.WhenExit(c.exitHandler)

	return a, b
}

func (c Config) Build() error {

	if !c.Enable {
		xlog.Jupiter().Info("disable sentinel feature")

		return nil
	}

	if err := sentinel.InitDefault(); err != nil {
		xlog.Jupiter().Error("sentinel.InitDefault failed", zap.Error(err))

		return err
	}

	defaultConfig := config.NewDefaultConfig()
	defaultConfig.Sentinel.App.Name = pkg.Name()

	err := sentinel.InitWithConfig(defaultConfig)
	if err != nil {
		return err
	}

	circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{})

	c.loadRules()

	return nil
}

func (c Config) loadRules() {

	xlog.Jupiter().Info("load sentinel rules", zap.String("datasource", c.Datasource))

	switch c.Datasource {
	case SENTINEL_DATASOURCE_ETCD:

		cli, err := etcdv3.RawConfig(c.EtcdRawKey).Singleton()
		if err != nil {
			panic(err)
		}

		err = initRules(cli.Client, c.CbKey, datasource.NewCircuitBreakerRulesHandler(circuitBreakerRuleJsonArrayParser))
		if err != nil {
			xlog.Jupiter().Warn("sentinel etcd Initialize failed", xlog.FieldErr(err))
		}

		err = initRules(cli.Client, c.SystemKey, datasource.NewSystemRulesHandler(datasource.SystemRuleJsonArrayParser))
		if err != nil {
			xlog.Jupiter().Warn("sentinel etcd Initialize failed", xlog.FieldErr(err))
		}

		err = initRules(cli.Client, c.FlowKey, datasource.NewFlowRulesHandler(datasource.FlowRuleJsonArrayParser))
		if err != nil {
			xlog.Jupiter().Warn("sentinel etcd Initialize failed", xlog.FieldErr(err))
		}
	default:

		var err error
		_, err = flow.LoadRules(c.FlowRules)
		if err != nil {
			xlog.Jupiter().Warn("sentinel flow.LoadRules failed", xlog.FieldErr(err))
		}

		_, err = system.LoadRules(c.SystemRules)
		if err != nil {
			xlog.Jupiter().Warn("sentinel system.LoadRules failed", xlog.FieldErr(err))
		}

		rules := convertCbRules(c.CbRules)
		_, err = circuitbreaker.LoadRules(rules)
		if err != nil {
			xlog.Jupiter().Warn("sentinel circuitbreaker.LoadRules failed", xlog.FieldErr(err))
		}
	}
}

func checkSrcComplianceJson(src []byte) (bool, error) {
	if len(src) == 0 {
		return false, nil
	}
	return true, nil
}

func initRules(client *clientv3.Client, key string, h datasource.PropertyHandler) error {
	datasource, err := newDataSource(client,
		fmt.Sprintf(key, pkg.Name(), pkg.AppZone(), conf.GetString("app.mode")),
		h)
	if err != nil {
		return err
	}

	return datasource.Initialize()
}

func circuitBreakerRuleJsonArrayParser(src []byte) (interface{}, error) {
	if valid, err := checkSrcComplianceJson(src); !valid {
		return nil, err
	}

	rules := make([]*CircuitBreakerRule, 0, 8)
	if err := json.Unmarshal(src, &rules); err != nil {
		desc := fmt.Sprintf("Fail to convert source bytes to []*CircuitBreakerRule, err: %s", err.Error())

		xlog.Jupiter().Warn("json.Unmarshal", zap.ByteString("src", src), zap.Error(err))
		return nil, datasource.NewError(datasource.ConvertSourceError, desc)
	}

	xlog.Jupiter().Info("circuitBreakerRuleJsonArrayParser finished", zap.Any("rules", rules))

	return convertCbRules(rules), nil
}

SentinelConfig

[jupiter.sentinel]
  enable = true
  datasource = "files"
  cbKey = "/wsd-sentinel/go/%s/%s/%s/degrade"
  etcdRawKey = "app.registry.etcd"
[[jupiter.sentinel.cbRules]]
  enable = true
  resource = "recomend"
  strategy = 0
  retryTimeoutMs = 3000
  minRequestAmount = 10
  statIntervalMs = 5000
  maxAllowedRtMs = 1000
  statSlidingWindowBucketCount = 10
  threshold = 0.4

配置项

sentinel具体字段解析:

  • enable:表示是否开启熔断降级功能,默认为 false
  • datasource: 表示规则来源类型,可选etcd和files,默认是 etcd
  • cbKey:熔断降级etcd的key规则,只在datasource为etcd时有效,默认为/wsd-sentinel/go/%s/%s/%s/degrade
  • etcdRawKey:etcd的配置名,只在datasource为etcd时有效,默认为app.registry.etcd

cbRules具体字段解析:

  • Enable: 表示是否开启此熔断规则。

  • Resource: 熔断器规则生效的埋点资源的名称;

  • Strategy: 熔断策略,目前支持SlowRequestRatio、ErrorRatio、ErrorCount三种;

    1. 选择以慢调用比例 (SlowRequestRatio) 作为阈值,需要设置允许的最大响应时间(MaxAllowedRtMs),请求的响应时间大于该值则统计为慢调用。通过 Threshold 字段设置触发熔断的慢调用比例,取值范围为 [0.0, 1.0]。规则配置后,在单位统计时长内请求数目大于设置的最小请求数目,并且慢调用的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态,若接下来的一个请求响应时间小于设置的最大 RT 则结束熔断,若大于设置的最大 RT 则会再次被熔断。
    2. 选择以错误比例 (ErrorRatio) 作为阈值,需要设置触发熔断的异常比例(Threshold),取值范围为 [0.0, 1.0]。规则配置后,在单位统计时长内请求数目大于设置的最小请求数目,并且异常的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态,若接下来的一个请求没有错误则结束熔断,否则会再次被熔断。代码中可以通过 api.TraceError(entry, err) 函数来记录 error。
    3. 选择以错误比例 (ErrorCount) 作为阈值,需要设置触发熔断的异常比例(Threshold),取值范围为 [1, $]。规则配置后,在单位统计时长内请求数目大于设置的最小请求数目,并且异常的次数大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态,若接下来的一个请求没有错误则结束熔断,否则会再次被熔断。代码中可以通过 api.TraceError(entry, err) 函数来记录 error。
  • RetryTimeoutMs: 即熔断触发后持续的时间(单位为 ms)。资源进入熔断状态后,在配置的熔断时长内,请求都会快速失败。熔断结束后进入探测恢复模式(HALF-OPEN)。

  • MinRequestAmount: 静默数量,如果当前统计周期内对资源的访问数量小于静默数量,那么熔断器就处于静默期。换言之,也就是触发熔断的最小请求数目,若当前统计周期内的请求数小于此值,即使达到熔断条件规则也不会触发。

  • StatIntervalMs: 统计的时间窗口长度(单位为 ms)。

  • MaxAllowedRtMs: 仅对慢调用熔断策略生效,MaxAllowedRtMs 是判断请求是否是慢调用的临界值,也就是如果请求的response time小于或等于MaxAllowedRtMs,那么就不是慢调用;如果response time大于MaxAllowedRtMs,那么当前请求就属于慢调用。

  • Threshold: 对于慢调用熔断策略, Threshold表示是慢调用比例的阈值(小数表示,比如0.1表示10%),也就是如果当前资源的慢调用比例如果高于Threshold,那么熔断器就会断开;否则保持闭合状态。 对于错误比例策略,Threshold表示的是错误比例的阈值(小数表示,比如0.1表示10%)。对于错误数策略,Threshold是错误计数的阈值。