highlight: a11y-dark theme: z-blue
go-admin 如何使用go-sentinel
go-admin 基于Gin + Vue + Element UI & Arco Design & Ant Design 的前后端分离权限管理系统脚手架(包含了:多租户的支持,基础用户管理功能,jwt鉴权,代码生成器,RBAC资源控制,表单构建,定时任务等
定义中间件
源码在/common/middleware/sentinel.go
Sentinel 系统BBR自适应限流从整体维度对应用入口流量进行控制,结合应用的 Load、CPU 使用率、总体平均 RT、入口 QPS 和并发线程数等几个维度的监控指标,通过自适应的流控策略,让系统的入口流量和系统的负载达到一个平衡,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。
package middleware
import (
"github.com/alibaba/sentinel-golang/core/system"
sentinel "github.com/alibaba/sentinel-golang/pkg/adapters/gin"
"github.com/gin-gonic/gin"
log "github.com/go-admin-team/go-admin-core/logger"
)
// Sentinel 限流
func Sentinel() gin.HandlerFunc {
if _, err := system.LoadRules([]*system.Rule{
{
MetricType: system.InboundQPS,
TriggerCount: 200,
Strategy: system.BBR,
},
}); err != nil {
log.Fatalf("Unexpected error: %+v", err)
}
return sentinel.SentinelMiddleware(
sentinel.WithBlockFallback(func(ctx *gin.Context) {
ctx.AbortWithStatusJSON(200, map[string]interface{}{
"msg": "too many request; the quota used up!",
"code": 500,
})
}),
)
}
限流规则: 在 Sentinel()
函数内部,使用 system.LoadRules
函数定义了限流规则:
-
MetricType
设定为system.InboundQPS
,表示限制请求的速率。 -
TriggerCount
设置为 200,表示每秒允许的请求数量为 200。 -
Strategy
设定为system.BBR
,这是一种流量控制策略,表示使用 BBR 算法进行限流。
回调函数: 使用 sentinel.SentinelMiddleware
函数创建了一个 Gin 中间件,其中包含了一个 WithBlockFallback
选项,该选项定义了当请求被限流时的回调函数。在这个回调函数中,会返回一个 JSON 响应,提示客户端请求过多,已经达到配额上限。
使用中间件
/cmd/api/server.go
func initRouter() {
var r *gin.Engine
h := sdk.Runtime.GetEngine()
if h == nil {
h = gin.New()
sdk.Runtime.SetEngine(h)
}
switch h.(type) {
case *gin.Engine:
r = h.(*gin.Engine)
default:
log.Fatal("not support other engine")
//os.Exit(-1)
}
if config.SslConfig.Enable {
r.Use(handler.TlsHandler())
}
//r.Use(middleware.Metrics())
r.Use(common.Sentinel()).
Use(common.RequestId(pkg.TrafficKey)).
Use(api.SetRequestLogger)
common.InitMiddleware(r)
}
jupiter 如何使用go-sentinel
jupiter Jupiter 是斗鱼开源的一套微服务治理框架,提供丰富的后台功能,管理应用的资源、配置,应用的性能、配置等可视化
定义配置
/pkg/core/sentinel/config.go
对官方库的二次封装。支持从 ETCD 数据源和其他数据源加载规则,并进行相应的初始化工作
-
type Config struct { ... }
: 定义了 Sentinel 的配置结构体Config
,其中包含了一系列配置项,如启用状态、数据源、熔断降级规则、流量控制规则和系统保护规则等。 -
func StdConfig() Config { ... }
: 提供了一个标准的配置获取函数,用于获取标准的 Sentinel 配置。 -
func RawConfig(key string) Config { ... }
: 根据给定的配置键获取原始配置,如果配置不存在或解析失败,则返回默认配置。 -
func DefaultConfig() Config { ... }
: 返回默认的 Sentinel 配置。 -
func (e Config) exitHandler(entry *SentinelEntry, ctx *EntryContext) error { ... }
: 定义了 Sentinel 规则执行完成后的退出处理函数,用于统计执行结果并处理异常。 -
func (c Config) Entry(resource string, opts ...EntryOption) (*SentinelEntry, *BlockError) { ... }
: 根据给定的资源和选项创建 Sentinel 规则的入口。 -
func (c Config) Build() error { ... }
: 构建 Sentinel 配置,主要进行 Sentinel 的初始化工作,包括加载规则和注册状态监听器。 -
func (c Config) loadRules() { ... }
: 加载 Sentinel 规则,根据配置的数据源加载对应的规则。 -
func checkSrcComplianceJson(src []byte) (bool, error) { ... }
: 检查 JSON 格式的数据源是否合规。 -
func initRules(client *clientv3.Client, key string, h datasource.PropertyHandler) error { ... }
: 根据给定的客户端和键初始化规则。 -
func circuitBreakerRuleJsonArrayParser(src []byte) (interface{}, error) { ... }
: 解析 JSON 格式的熔断降级规则。
package sentinel
import (
"encoding/json"
"fmt"
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/core/system"
"github.com/alibaba/sentinel-golang/ext/datasource"
"github.com/douyu/jupiter/pkg"
"github.com/douyu/jupiter/pkg/client/etcdv3"
"github.com/douyu/jupiter/pkg/conf"
"github.com/douyu/jupiter/pkg/core/constant"
"github.com/douyu/jupiter/pkg/xlog"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
type Config struct {
Enable bool `toml:"enable"`
Datasource string `toml:"datasource"`
EtcdRawKey string `toml:"etcdRawKey"`
// 熔断降级
CbKey string `toml:"cbKey"`
CbRules []*CircuitBreakerRule `toml:"cbRules"`
// 流量控制
FlowKey string `toml:"flowKey"`
FlowRules []*flow.Rule `toml:"flowRules"`
// 系统保护
SystemKey string `toml:"systemKey"`
SystemRules []*system.Rule `toml:"systemRules"`
}
func StdConfig() Config {
return RawConfig(constant.ConfigKey("sentinel"))
}
func RawConfig(key string) Config {
config := DefaultConfig()
if conf.Get(constant.ConfigKey("sentinel")) == nil {
return config
}
if err := conf.UnmarshalKey(key, &config, conf.TagName("toml")); err != nil {
xlog.Jupiter().Warn("unmarshal config", zap.String("key", key), zap.Error(err))
return config
}
return config
}
func DefaultConfig() Config {
return Config{
Enable: false,
Datasource: SENTINEL_DATASOURCE_ETCD,
EtcdRawKey: "app.registry.etcd",
// 熔断降级规则 /wsd-sentinel/{language}/{app}/{idc}/{env}/{ruleType}=${value}
CbKey: "/wsd-sentinel/go/%s/%s/%s/degrade",
// 流量控制规则 /wsd-sentinel/{language}/{app}/{idc}/{env}/{ruleType}=${value}
FlowKey: "/wsd-sentinel/go/%s/%s/%s/flow",
// 系统保护规则 /wsd-sentinel/{language}/{app}/{idc}/{env}/{ruleType}=${value}
SystemKey: "/wsd-sentinel/go/%s/%s/%s/system",
}
}
func (e Config) exitHandler(entry *SentinelEntry, ctx *EntryContext) error {
if ctx.Err() != nil {
sentinelExceptionsThrown.WithLabelValues(labels(entry.Resource().Name())...).Inc()
} else {
sentinelSuccess.WithLabelValues(labels(entry.Resource().Name())...).Inc()
}
sentinelRt.WithLabelValues(labels(entry.Resource().Name())...).Observe(float64(ctx.Rt()) / 1000)
return ctx.Err()
}
func (c Config) Entry(resource string, opts ...EntryOption) (*SentinelEntry, *BlockError) {
if !c.Enable {
return base.NewSentinelEntry(nil, nil, nil), nil
}
a, b := sentinel.Entry(resource, opts...)
sentinelReqeust.WithLabelValues(labels(resource)...).Inc()
if b != nil {
sentinelBlocked.WithLabelValues(labels(resource)...).Inc()
return a, b
}
a.WhenExit(c.exitHandler)
return a, b
}
func (c Config) Build() error {
if !c.Enable {
xlog.Jupiter().Info("disable sentinel feature")
return nil
}
if err := sentinel.InitDefault(); err != nil {
xlog.Jupiter().Error("sentinel.InitDefault failed", zap.Error(err))
return err
}
defaultConfig := config.NewDefaultConfig()
defaultConfig.Sentinel.App.Name = pkg.Name()
err := sentinel.InitWithConfig(defaultConfig)
if err != nil {
return err
}
circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{})
c.loadRules()
return nil
}
func (c Config) loadRules() {
xlog.Jupiter().Info("load sentinel rules", zap.String("datasource", c.Datasource))
switch c.Datasource {
case SENTINEL_DATASOURCE_ETCD:
cli, err := etcdv3.RawConfig(c.EtcdRawKey).Singleton()
if err != nil {
panic(err)
}
err = initRules(cli.Client, c.CbKey, datasource.NewCircuitBreakerRulesHandler(circuitBreakerRuleJsonArrayParser))
if err != nil {
xlog.Jupiter().Warn("sentinel etcd Initialize failed", xlog.FieldErr(err))
}
err = initRules(cli.Client, c.SystemKey, datasource.NewSystemRulesHandler(datasource.SystemRuleJsonArrayParser))
if err != nil {
xlog.Jupiter().Warn("sentinel etcd Initialize failed", xlog.FieldErr(err))
}
err = initRules(cli.Client, c.FlowKey, datasource.NewFlowRulesHandler(datasource.FlowRuleJsonArrayParser))
if err != nil {
xlog.Jupiter().Warn("sentinel etcd Initialize failed", xlog.FieldErr(err))
}
default:
var err error
_, err = flow.LoadRules(c.FlowRules)
if err != nil {
xlog.Jupiter().Warn("sentinel flow.LoadRules failed", xlog.FieldErr(err))
}
_, err = system.LoadRules(c.SystemRules)
if err != nil {
xlog.Jupiter().Warn("sentinel system.LoadRules failed", xlog.FieldErr(err))
}
rules := convertCbRules(c.CbRules)
_, err = circuitbreaker.LoadRules(rules)
if err != nil {
xlog.Jupiter().Warn("sentinel circuitbreaker.LoadRules failed", xlog.FieldErr(err))
}
}
}
func checkSrcComplianceJson(src []byte) (bool, error) {
if len(src) == 0 {
return false, nil
}
return true, nil
}
func initRules(client *clientv3.Client, key string, h datasource.PropertyHandler) error {
datasource, err := newDataSource(client,
fmt.Sprintf(key, pkg.Name(), pkg.AppZone(), conf.GetString("app.mode")),
h)
if err != nil {
return err
}
return datasource.Initialize()
}
func circuitBreakerRuleJsonArrayParser(src []byte) (interface{}, error) {
if valid, err := checkSrcComplianceJson(src); !valid {
return nil, err
}
rules := make([]*CircuitBreakerRule, 0, 8)
if err := json.Unmarshal(src, &rules); err != nil {
desc := fmt.Sprintf("Fail to convert source bytes to []*CircuitBreakerRule, err: %s", err.Error())
xlog.Jupiter().Warn("json.Unmarshal", zap.ByteString("src", src), zap.Error(err))
return nil, datasource.NewError(datasource.ConvertSourceError, desc)
}
xlog.Jupiter().Info("circuitBreakerRuleJsonArrayParser finished", zap.Any("rules", rules))
return convertCbRules(rules), nil
}
SentinelConfig
[jupiter.sentinel]
enable = true
datasource = "files"
cbKey = "/wsd-sentinel/go/%s/%s/%s/degrade"
etcdRawKey = "app.registry.etcd"
[[jupiter.sentinel.cbRules]]
enable = true
resource = "recomend"
strategy = 0
retryTimeoutMs = 3000
minRequestAmount = 10
statIntervalMs = 5000
maxAllowedRtMs = 1000
statSlidingWindowBucketCount = 10
threshold = 0.4
配置项
sentinel具体字段解析:
- enable:表示是否开启熔断降级功能,默认为 false
- datasource: 表示规则来源类型,可选etcd和files,默认是 etcd
- cbKey:熔断降级etcd的key规则,只在datasource为etcd时有效,默认为/wsd-sentinel/go/%s/%s/%s/degrade
- etcdRawKey:etcd的配置名,只在datasource为etcd时有效,默认为app.registry.etcd
cbRules具体字段解析:
-
Enable: 表示是否开启此熔断规则。
-
Resource: 熔断器规则生效的埋点资源的名称;
-
Strategy: 熔断策略,目前支持SlowRequestRatio、ErrorRatio、ErrorCount三种;
- 选择以慢调用比例 (SlowRequestRatio) 作为阈值,需要设置允许的最大响应时间(MaxAllowedRtMs),请求的响应时间大于该值则统计为慢调用。通过 Threshold 字段设置触发熔断的慢调用比例,取值范围为 [0.0, 1.0]。规则配置后,在单位统计时长内请求数目大于设置的最小请求数目,并且慢调用的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态,若接下来的一个请求响应时间小于设置的最大 RT 则结束熔断,若大于设置的最大 RT 则会再次被熔断。
- 选择以错误比例 (ErrorRatio) 作为阈值,需要设置触发熔断的异常比例(Threshold),取值范围为 [0.0, 1.0]。规则配置后,在单位统计时长内请求数目大于设置的最小请求数目,并且异常的比例大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态,若接下来的一个请求没有错误则结束熔断,否则会再次被熔断。代码中可以通过 api.TraceError(entry, err) 函数来记录 error。
- 选择以错误比例 (ErrorCount) 作为阈值,需要设置触发熔断的异常比例(Threshold),取值范围为 [1, $]。规则配置后,在单位统计时长内请求数目大于设置的最小请求数目,并且异常的次数大于阈值,则接下来的熔断时长内请求会自动被熔断。经过熔断时长后熔断器会进入探测恢复状态,若接下来的一个请求没有错误则结束熔断,否则会再次被熔断。代码中可以通过 api.TraceError(entry, err) 函数来记录 error。
-
RetryTimeoutMs: 即熔断触发后持续的时间(单位为 ms)。资源进入熔断状态后,在配置的熔断时长内,请求都会快速失败。熔断结束后进入探测恢复模式(HALF-OPEN)。
-
MinRequestAmount: 静默数量,如果当前统计周期内对资源的访问数量小于静默数量,那么熔断器就处于静默期。换言之,也就是触发熔断的最小请求数目,若当前统计周期内的请求数小于此值,即使达到熔断条件规则也不会触发。
-
StatIntervalMs: 统计的时间窗口长度(单位为 ms)。
-
MaxAllowedRtMs: 仅对慢调用熔断策略生效,MaxAllowedRtMs 是判断请求是否是慢调用的临界值,也就是如果请求的response time小于或等于MaxAllowedRtMs,那么就不是慢调用;如果response time大于MaxAllowedRtMs,那么当前请求就属于慢调用。
-
Threshold: 对于慢调用熔断策略, Threshold表示是慢调用比例的阈值(小数表示,比如0.1表示10%),也就是如果当前资源的慢调用比例如果高于Threshold,那么熔断器就会断开;否则保持闭合状态。 对于错误比例策略,Threshold表示的是错误比例的阈值(小数表示,比如0.1表示10%)。对于错误数策略,Threshold是错误计数的阈值。