掘金 后端 ( ) • 2024-05-04 10:25

theme: channing-cyan

烟囱架构重构-代理网关篇

写作背景

前几天写了一篇关于「烟囱架构重构,最近重构了微信生态对接业务」,有同学对三方生态对接「API 网关」比较感兴趣,写这篇文章详细聊聊一波,我会贴一些简易代码,感兴趣先看看这篇文章,了解前因后果。烟囱架构重构,最近重构了微信生态对接业务 - 掘金

历史案例

早期,对接微信/抖音生态,把所有接口都对接进来了,做了简单包装和透传,这也带来一个问题。

每当有业务团队要接入新接口,不得不新增一个http接口,非常浪费开发资源,历史代码很多都是这么干的,最近参考了某大佬思路,做了一些简化。

为了方便理解我以查询商家应用授权应用信息为栗子,代码如下:

type Client struct {
}

// AuthInfoRequest 使用授权码获取授权信息-请求参数参数
type AuthInfoRequest struct {
	ComponentAppID    string `json:"component_appid"`
	AuthorizationCode string `json:"authorization_code"`
}

// AuthInfoResponse 使用授权码获取授权信息-返回结果
type AuthInfoResponse struct {
	AuthorizationInfo AuthorizationInfo `json:"authorization_info"`
}

func (c *Client) QueryAuthInfo(ctx context.Context, token, appID, code string) (*AuthInfoResponse, error) {
	opts := &grequests.RequestOptions{
		Params: map[string]string{
			"component_access_token": token,
		},
		JSON: &AuthInfoRequest{
			ComponentAppID:    appID,
			AuthorizationCode: code,
		},
	}

	url := "https://api.weixin.qq.com/cgi-bin/component/api_query_auth"
	resp, err := grequests.DoRegularRequest("GET", url, opts)
	if err != nil {
		return nil, err
	}
	out := &AuthInfoResponse{}
	if err = resp.JSON(out); err != nil {
		return nil, err
	}
	return out, nil
}

type AuthorizationInfo struct {
	AuthorizerAppID        string `json:"authorizer_appid"`
	AuthorizerAccessToken  string `json:"authorizer_access_token"`
	ExpiresIn              int    `json:"expires_in"`
	AuthorizerRefreshToken string `json:"authorizer_refresh_token"`
}

不晓得你们是不是也是这么干的?申请单独的应用来干这事儿并封装了一堆接口。

代码比较简单,做了几件事:

  1. 拼接 token、appId;
  2. 参数组装;
  3. 发起 http 请求,反序列化对象,返回最终数据。

随着业务发展,未来可能还会做几件事

  1. 若业务场景对正确性、数据要求比较高的,可能会根据接口返回不同的 code 重试;
  2. 接口限流,三方接口一般都是有频率限制的;
  3. 缓存(非必需)。

API 网关

抖音/微信生态...接口挺多的,不可能所有接口都接一遍,关键是调用的接口仅仅是简单做了包装,透传参数。

如果每个接口都写一遍上述代码,拼 token、重试、限流、缓存、反序列化,挺浪费研、测试发资源的。所以,能不能把通用的逻辑抽出来整合到「api网关」?如下图:

个微生态-第 11 页.png

在三方生态前链路,做一个代理,把通用逻辑封装了,业务方A、B、C只关注参数、返回值,其它就别关心了。

简易代码如下:

定义常量

type PlatformType string

const (
	WxPlatformType     PlatformType = "wx"
	TiktokPlatformType PlatformType = "tiktok"
)

type HttpMethod string

const (
	HttpMethodGet  HttpMethod = "GET"
	HttpMethodPost HttpMethod = "POST"
)
定义对接平台通用结构体
type WXPlatformErrors struct {
	ErrCode int    `json:"errcode"`
	ErrMsg  string `json:"errmsg"`
}

type TikTokPlatformErrors struct {
	// todo 后续补充
}
定义 adaptor
// InvokeOptions 内部调用 使用结构直接绑定
type InvokeOptions struct {
	// 生态枚举
	PlatformType PlatformType

	// 小程序、公众号 appid
	AppID string

	// 接口路径地址 相对or绝对
	Path string

	// http method,get 请求 or post 请求
	Method HttpMethod

	// Request 因为要反序列化和序列化,所以,必须要带 json tag
	// Request 请求体类型指针
	Request any
}

type IAdaptor interface {
	// Invoke 外部调用接口
	Invoke(ctx context.Context, opts *InvokeOptions) (any, error)
}

定义 Default Adaptor

type DefaultAdaptor[R any] struct {
}

func (a *DefaultAdaptor[R]) Invoke(ctx context.Context, opts *InvokeOptions, out R) error {
	var (
		bs       []byte
		features IFeatures
	)

	// 重试请求
	retryErr := retry.Do(
		func() error {
			tmpBs, invokeErr := a.invoke(
				ctx,
				opts,
			)
			if invokeErr != nil {
				return invokeErr
			}
			bs = tmpBs

			handler, exists := GetHandler(opts.PlatformType)
			if !exists { // 如果没有实现类直接退出即可,流程不受影响
				return nil
			}
			tmpFeatures, err := handler.HandleResponse(bs)
			if err != nil {
				// 记录日志就可以了
				fmt.Println(err)
				return nil
			}
			features = tmpFeatures

			return nil
		},
		retry.Attempts(uint(3)), // 重试3次
		retry.RetryIf(func(err error) bool {
			if err == nil { // err 为 empty 不用重试了
				return false
			}

			// 根据不同平台进行重试
			return features.Retry()
		}),
	)
	if retryErr != nil {
		return retryErr
	}
	if features == nil { // 如果token刷新失败,通知token刷新
		// 通知token 刷新,强制执行Invoke方法
		// todo 通知token刷新,建议token管理器如果触发强制刷新token,增加监控和告警,以便第一时间检查代码是否异常
		// 重走流程
		// todo ⚠️,这里可能导致无限循环,若果不好把控,opts 可以增加一个标识
		return a.Invoke(ctx, opts, out)
	}

	// 序列化为最终值
	return sonic.Unmarshal(bs, &out)
}

func (a *DefaultAdaptor[R]) invoke(ctx context.Context, opts *InvokeOptions) ([]byte, error) {
	// TODO 从 token 管理器中获取token,代码省略

	// TODO 补充 token 和 appid,主要是替换 path 模板,详细代码省略...
	// /cgi-bin/open/get?access_token={{access_token}},需要将 {{access_token}} 替换为 token 即可
	path := strings.Replace(opts.Path, "{{appid}}", opts.AppID, -1)

	var (
		resp *grequests.Response
		err  error
	)
	switch opts.Method {
	case HttpMethodGet:
		resp, err = grequests.DoRegularRequest("GET", path, nil)
		if err != nil {
			return nil, err
		}
	case HttpMethodPost:
		opts := &grequests.RequestOptions{
			JSON: opts.Request,
		}
		resp, err = grequests.DoRegularRequest("GET", path, opts)
		if err != nil {
			return nil, err
		}
	default:
		return nil, fmt.Errorf("method not valid")
	}

	defer func() {
		_ = resp.RawResponse.Body.Close() // 关闭流文件读取
	}()

	// 读取流文件
	return io.ReadAll(resp.RawResponse.Body)
}
定义 IFeatures 接口
type IFeatures interface {
	// Retry 是否重试
	Retry() bool

	// TokenExpired token 是否过期
	TokenExpired() bool

	// todo 后续可以新增其其它特性
}
定义 Wx IFeatures 实现类
type WxFeatures struct {
	*WXPlatformErrors
}

func (w *WxFeatures) Retry() bool {
	/*
		1、if 可以这里可以做成的配置,指定的code重试
		2、有些错误码并不需要重试的,比如:token过期、API未授权等...,尽量把这些错误码规避掉。
	*/
	if w.ErrCode != 0 {
		return true
	}

	return false
}

func (w *WxFeatures) TokenExpired() bool {
	return w.ErrCode == 11 // 假设微信平台ErrCode=11是token过期
}

定义 Response handler

type Handler interface {
	HandleResponse(in []byte) (IFeatures, error)
}

func GetHandler(in PlatformType) (Handler, bool) {
	// todo 平台应该不会太多,暂时用switch,后期可以换成策略模式。
	switch in {
	case WxPlatformType:
		return &WxHandler{}, true
	case TiktokPlatformType:
		// ... 省略
		return nil, true
	default:
		return nil, false
	}
}
定义 Wx Response handler
type WxHandler struct {
}

func (w *WxHandler) HandleResponse(in []byte) (IFeatures, error) {
	out := new(WXPlatformErrors)
	err := sonic.Unmarshal(in, out)
	if err != nil {
		return nil, err
	}

	return &WxFeatures{WXPlatformErrors: out}, nil
}

测试案例,以查询商家应用授权应用信息为栗子。

type AuthInfoRequest struct {
	ComponentAppID    string `json:"component_appid"`
	AuthorizationCode string `json:"authorization_code"`
}

// AuthInfoResponse 使用授权码获取授权信息-返回结果
type AuthInfoResponse struct {
	WXPlatformErrors
	AuthorizationInfo AuthorizationInfo `json:"authorization_info"`
}

type AuthorizationInfo struct {
	AuthorizerAppID        string `json:"authorizer_appid"`
	AuthorizerAccessToken  string `json:"authorizer_access_token"`
	ExpiresIn              int    `json:"expires_in"`
	AuthorizerRefreshToken string `json:"authorizer_refresh_token"`
}

func TestAdaptor(t *testing.T) {
	out := new(AuthInfoResponse)
	adaptor := &DefaultAdaptor[*AuthInfoResponse]{}
	err := adaptor.Invoke(context.TODO(), &InvokeOptions{
		PlatformType: WxPlatformType,
		AppID:        "test-123",
		Path:         "https://www.baidu.com/",
		Method:       HttpMethodGet,
		Request: &AuthInfoRequest{
			ComponentAppID:    "test234",
			AuthorizationCode: "test234",
		},
	}, out)
	if err != nil {
		panic(err)
	}

	print(out)
}
  1. 对于业务方来说只需要关注入参和返回值即可,token获取,重试对他们来说就是黑盒。

  2. 另外,可以把能力包装成http或rpc接口提供给外部应用,这里不赘述。

  3. 补充,代码是我脱敏临时敲出来的,大家参考思路根据自己的业务场景调整即可,可能存在 BUG 望理解。

总结

本文讲了 API 网关详细案例,API 网关应用场景挺多的,假设A同学负责公司 open-api 业务,包装内部多个团队接口,对外露出接口,不可能每个团队的接口都写一遍调用接口吧?API 网关就派上用场了。

但,并不是所有场景都适合用「API网关」,有些接口对接科能需要单独拎出来,比如:微信/抖音生态对接token获取,为什么呢?主要是因为API网关和token管理器会相互调,这种设计不是特别好单独拎出来了。

封装API网关后,大部分情况下,你不用关心微信/抖音生态提供的接口了,因为入参和出参都由业务方来定义,省时省力+1。

本文代码较多,大家把代码复制出来看看、改改、写写有助于加深理解。

最后,祝大家五一快乐,安康。