theme: channing-cyan
烟囱架构重构-代理网关篇
写作背景
前几天写了一篇关于「烟囱架构重构,最近重构了微信生态对接业务」,有同学对三方生态对接「API 网关」比较感兴趣,写这篇文章详细聊聊一波,我会贴一些简易代码,感兴趣先看看这篇文章,了解前因后果。烟囱架构重构,最近重构了微信生态对接业务 - 掘金
历史案例
早期,对接微信/抖音生态,把所有接口都对接进来了,做了简单包装和透传,这也带来一个问题。
每当有业务团队要接入新接口,不得不新增一个http接口,非常浪费开发资源,历史代码很多都是这么干的,最近参考了某大佬思路,做了一些简化。
为了方便理解我以查询商家应用授权应用信息为栗子,代码如下:
type Client struct {
}
// AuthInfoRequest 使用授权码获取授权信息-请求参数参数
type AuthInfoRequest struct {
ComponentAppID string `json:"component_appid"`
AuthorizationCode string `json:"authorization_code"`
}
// AuthInfoResponse 使用授权码获取授权信息-返回结果
type AuthInfoResponse struct {
AuthorizationInfo AuthorizationInfo `json:"authorization_info"`
}
func (c *Client) QueryAuthInfo(ctx context.Context, token, appID, code string) (*AuthInfoResponse, error) {
opts := &grequests.RequestOptions{
Params: map[string]string{
"component_access_token": token,
},
JSON: &AuthInfoRequest{
ComponentAppID: appID,
AuthorizationCode: code,
},
}
url := "https://api.weixin.qq.com/cgi-bin/component/api_query_auth"
resp, err := grequests.DoRegularRequest("GET", url, opts)
if err != nil {
return nil, err
}
out := &AuthInfoResponse{}
if err = resp.JSON(out); err != nil {
return nil, err
}
return out, nil
}
type AuthorizationInfo struct {
AuthorizerAppID string `json:"authorizer_appid"`
AuthorizerAccessToken string `json:"authorizer_access_token"`
ExpiresIn int `json:"expires_in"`
AuthorizerRefreshToken string `json:"authorizer_refresh_token"`
}
不晓得你们是不是也是这么干的?申请单独的应用来干这事儿并封装了一堆接口。
代码比较简单,做了几件事:
- 拼接 token、appId;
- 参数组装;
- 发起 http 请求,反序列化对象,返回最终数据。
随着业务发展,未来可能还会做几件事
- 若业务场景对正确性、数据要求比较高的,可能会根据接口返回不同的 code 重试;
- 接口限流,三方接口一般都是有频率限制的;
- 缓存(非必需)。
API 网关
抖音/微信生态...接口挺多的,不可能所有接口都接一遍,关键是调用的接口仅仅是简单做了包装,透传参数。
如果每个接口都写一遍上述代码,拼 token、重试、限流、缓存、反序列化,挺浪费研、测试发资源的。所以,能不能把通用的逻辑抽出来整合到「api网关」?如下图:
在三方生态前链路,做一个代理,把通用逻辑封装了,业务方A、B、C只关注参数、返回值,其它就别关心了。
简易代码如下:
定义常量
type PlatformType string
const (
WxPlatformType PlatformType = "wx"
TiktokPlatformType PlatformType = "tiktok"
)
type HttpMethod string
const (
HttpMethodGet HttpMethod = "GET"
HttpMethodPost HttpMethod = "POST"
)
定义对接平台通用结构体
type WXPlatformErrors struct {
ErrCode int `json:"errcode"`
ErrMsg string `json:"errmsg"`
}
type TikTokPlatformErrors struct {
// todo 后续补充
}
定义 adaptor
// InvokeOptions 内部调用 使用结构直接绑定
type InvokeOptions struct {
// 生态枚举
PlatformType PlatformType
// 小程序、公众号 appid
AppID string
// 接口路径地址 相对or绝对
Path string
// http method,get 请求 or post 请求
Method HttpMethod
// Request 因为要反序列化和序列化,所以,必须要带 json tag
// Request 请求体类型指针
Request any
}
type IAdaptor interface {
// Invoke 外部调用接口
Invoke(ctx context.Context, opts *InvokeOptions) (any, error)
}
定义 Default Adaptor
type DefaultAdaptor[R any] struct {
}
func (a *DefaultAdaptor[R]) Invoke(ctx context.Context, opts *InvokeOptions, out R) error {
var (
bs []byte
features IFeatures
)
// 重试请求
retryErr := retry.Do(
func() error {
tmpBs, invokeErr := a.invoke(
ctx,
opts,
)
if invokeErr != nil {
return invokeErr
}
bs = tmpBs
handler, exists := GetHandler(opts.PlatformType)
if !exists { // 如果没有实现类直接退出即可,流程不受影响
return nil
}
tmpFeatures, err := handler.HandleResponse(bs)
if err != nil {
// 记录日志就可以了
fmt.Println(err)
return nil
}
features = tmpFeatures
return nil
},
retry.Attempts(uint(3)), // 重试3次
retry.RetryIf(func(err error) bool {
if err == nil { // err 为 empty 不用重试了
return false
}
// 根据不同平台进行重试
return features.Retry()
}),
)
if retryErr != nil {
return retryErr
}
if features == nil { // 如果token刷新失败,通知token刷新
// 通知token 刷新,强制执行Invoke方法
// todo 通知token刷新,建议token管理器如果触发强制刷新token,增加监控和告警,以便第一时间检查代码是否异常
// 重走流程
// todo ⚠️,这里可能导致无限循环,若果不好把控,opts 可以增加一个标识
return a.Invoke(ctx, opts, out)
}
// 序列化为最终值
return sonic.Unmarshal(bs, &out)
}
func (a *DefaultAdaptor[R]) invoke(ctx context.Context, opts *InvokeOptions) ([]byte, error) {
// TODO 从 token 管理器中获取token,代码省略
// TODO 补充 token 和 appid,主要是替换 path 模板,详细代码省略...
// /cgi-bin/open/get?access_token={{access_token}},需要将 {{access_token}} 替换为 token 即可
path := strings.Replace(opts.Path, "{{appid}}", opts.AppID, -1)
var (
resp *grequests.Response
err error
)
switch opts.Method {
case HttpMethodGet:
resp, err = grequests.DoRegularRequest("GET", path, nil)
if err != nil {
return nil, err
}
case HttpMethodPost:
opts := &grequests.RequestOptions{
JSON: opts.Request,
}
resp, err = grequests.DoRegularRequest("GET", path, opts)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("method not valid")
}
defer func() {
_ = resp.RawResponse.Body.Close() // 关闭流文件读取
}()
// 读取流文件
return io.ReadAll(resp.RawResponse.Body)
}
定义 IFeatures 接口
type IFeatures interface {
// Retry 是否重试
Retry() bool
// TokenExpired token 是否过期
TokenExpired() bool
// todo 后续可以新增其其它特性
}
定义 Wx IFeatures 实现类
type WxFeatures struct {
*WXPlatformErrors
}
func (w *WxFeatures) Retry() bool {
/*
1、if 可以这里可以做成的配置,指定的code重试
2、有些错误码并不需要重试的,比如:token过期、API未授权等...,尽量把这些错误码规避掉。
*/
if w.ErrCode != 0 {
return true
}
return false
}
func (w *WxFeatures) TokenExpired() bool {
return w.ErrCode == 11 // 假设微信平台ErrCode=11是token过期
}
定义 Response handler
type Handler interface {
HandleResponse(in []byte) (IFeatures, error)
}
func GetHandler(in PlatformType) (Handler, bool) {
// todo 平台应该不会太多,暂时用switch,后期可以换成策略模式。
switch in {
case WxPlatformType:
return &WxHandler{}, true
case TiktokPlatformType:
// ... 省略
return nil, true
default:
return nil, false
}
}
定义 Wx Response handler
type WxHandler struct {
}
func (w *WxHandler) HandleResponse(in []byte) (IFeatures, error) {
out := new(WXPlatformErrors)
err := sonic.Unmarshal(in, out)
if err != nil {
return nil, err
}
return &WxFeatures{WXPlatformErrors: out}, nil
}
测试案例,以查询商家应用授权应用信息为栗子。
type AuthInfoRequest struct {
ComponentAppID string `json:"component_appid"`
AuthorizationCode string `json:"authorization_code"`
}
// AuthInfoResponse 使用授权码获取授权信息-返回结果
type AuthInfoResponse struct {
WXPlatformErrors
AuthorizationInfo AuthorizationInfo `json:"authorization_info"`
}
type AuthorizationInfo struct {
AuthorizerAppID string `json:"authorizer_appid"`
AuthorizerAccessToken string `json:"authorizer_access_token"`
ExpiresIn int `json:"expires_in"`
AuthorizerRefreshToken string `json:"authorizer_refresh_token"`
}
func TestAdaptor(t *testing.T) {
out := new(AuthInfoResponse)
adaptor := &DefaultAdaptor[*AuthInfoResponse]{}
err := adaptor.Invoke(context.TODO(), &InvokeOptions{
PlatformType: WxPlatformType,
AppID: "test-123",
Path: "https://www.baidu.com/",
Method: HttpMethodGet,
Request: &AuthInfoRequest{
ComponentAppID: "test234",
AuthorizationCode: "test234",
},
}, out)
if err != nil {
panic(err)
}
print(out)
}
对于业务方来说只需要关注入参和返回值即可,token获取,重试对他们来说就是黑盒。
另外,可以把能力包装成http或rpc接口提供给外部应用,这里不赘述。
补充,代码是我脱敏临时敲出来的,大家参考思路根据自己的业务场景调整即可,可能存在 BUG 望理解。
总结
本文讲了 API 网关详细案例,API 网关应用场景挺多的,假设A同学负责公司 open-api 业务,包装内部多个团队接口,对外露出接口,不可能每个团队的接口都写一遍调用接口吧?API 网关就派上用场了。
但,并不是所有场景都适合用「API网关」,有些接口对接科能需要单独拎出来,比如:微信/抖音生态对接token获取,为什么呢?主要是因为API网关和token管理器会相互调,这种设计不是特别好单独拎出来了。
封装API网关后,大部分情况下,你不用关心微信/抖音生态提供的接口了,因为入参和出参都由业务方来定义,省时省力+1。
本文代码较多,大家把代码复制出来看看、改改、写写有助于加深理解。
最后,祝大家五一快乐,安康。