掘金 后端 ( ) • 2024-03-28 11:20

Golang框架实战-KisFlow流式计算框架专栏

Golang框架实战-KisFlow流式计算框架(1)-概述

Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)

Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)

Golang框架实战-KisFlow流式计算框架(4)-数据流

Golang框架实战-KisFlow流式计算框架(5)-Function调度

Golang框架实战-KisFlow流式计算框架(6)-Connector

Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出

Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action

Golang框架实战-KisFlow流式计算框架(9)-Cache/Params 数据缓存与数据参数

Golang框架实战-KisFlow流式计算框架(10)-Flow多副本


接下来我们来增强KisFlow中Function对业务数据处理的聚焦,将之前Function的写法:

func FuncDemo3Handler(ctx context.Context, flow kis.Flow) error {
	fmt.Println("---> Call funcName3Handler ----")
	fmt.Printf("Params = %+v\n", flow.GetFuncParamAll())

	for _, row := range flow.Input() {
		str := fmt.Sprintf("In FuncName = %s, FuncId = %s, row = %s", flow.GetThisFuncConf().FName, flow.GetThisFunction().GetId(), row)
		fmt.Println(str)
	}

	return nil
}

是从flow.Input()中 获取到原始数据,改成可以直接获取到业务想要的具体数据结构类型,而无需断言等类型判断和转换。改成的Function扩展参数用法大致如下:

proto

type StuScores struct {
	StuId  int `json:"stu_id"`
	Score1 int `json:"score_1"`
	Score2 int `json:"score_2"`
	Score3 int `json:"score_3"`
}

type StuAvgScore struct {
	StuId    int     `json:"stu_id"`
	AvgScore float64 `json:"avg_score"`
}

FaaS

type AvgStuScoreIn struct {
	serialize.DefaultSerialize
	proto.StuScores
}

type AvgStuScoreOut struct {
	serialize.DefaultSerialize
	proto.StuAvgScore
}

// AvgStuScore(FaaS) 计算学生平均分
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
	for _, row := range rows {
		avgScore := proto.StuAvgScore{
			StuId:    row.StuId,
			AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
		}
		// 提交结果数据
		_ = flow.CommitRow(AvgStuScoreOut{StuAvgScore: avgScore})
	}

	return nil
}

这样,我们可以通过第三个形式参数rows直接拿到我们期待的目标输出结构体数据,不需要断言和转换,更加关注业务方的开发效率。

当然,如果你希望获取到原始的数据,依然可以从flow.Input() 中获取到。

本章将实现KisFlow上述功能。

11.1 FaaS业务回调函数自描述

本节将完成FaaS的自描述概念改造,我们知道之前的FaaS回调如下:

type FaaS func(context.Context, Flow) error

那么我们需要一个结构体,来描述这个函数属性,包括他的函数名称、函数地址、形参数量、相残类型、返回值类型等等。

11.1.1 FaaSDesc 回调自描述类型

kis-flow/kis/下,新创建一个文件faas.go,定义如下结构体:

kis-flow/kis/faas.go

// FaaS Function as a Service

// 将
// type FaaS func(context.Context, Flow) error
// 改为
// type FaaS func(context.Context, Flow, ...interface{}) error
// 可以通过可变参数的任意输入类型进行数据传递
type FaaS interface{}

// FaaSDesc FaaS 回调计算业务函数 描述
type FaaSDesc struct {
	FnName    string         // Function名称
	f         interface{}    // FaaS 函数
	fName     string         // 函数名称
	ArgsType  []reflect.Type // 函数参数类型(集合)
	ArgNum    int            // 函数参数个数
	FuncType  reflect.Type   // 函数类型
	FuncValue reflect.Value  // 函数值(函数地址)
}

将之前的FaaS改进成interface{},而FaaSDesc具备了一些属性。

  • FnName: 表示当前Function的名称,例如我们之前例子的"funcDemo1" 等,这个是用来KisFlow给Function标识的FunctionName。
  • f:表示定义的FaaS函数。
  • fName: 定义f函数的函数名称。
  • ArgsType:定义的f函数的形参类型列表,这是一个slice。
  • ArgNum:定义的f函数的输入形参个数。
  • FuncType:定义的f函数的数据类型。
  • FuncValue:定义的f函数的函数值(可以被调度的函数地址)。

11.1.2 新建一个FaaSDesc对象

下面,提供一个新建FaaSDesc的构造函数,形参的类型就是KisFlow的FunctionName和定义的FaaS函数,如下:

kis-flow/kis/faas.go

// NewFaaSDesc 根据用户注册的FnName 和FaaS 回调函数,创建 FaaSDesc 描述实例
func NewFaaSDesc(fnName string, f FaaS) (*FaaSDesc, error) {
	// 传入的回调函数FaaS,函数值(函数地址)
	funcValue := reflect.ValueOf(f)

	// 传入的回调函数FaaS 类型
	funcType := funcValue.Type()

	// 判断传递的FaaS指针是否是函数类型
	if !isFuncType(funcType) {
		return nil, fmt.Errorf("provided FaaS type is %s, not a function", funcType.Name())
	}

	// 判断传递的FaaS函数是否有返回值类型是只包括(error)
	if funcType.NumOut() != 1 || funcType.Out(0) != reflect.TypeOf((*error)(nil)).Elem() {
		return nil, errors.New("function must have exactly one return value of type error")
	}

	// FaaS函数的参数类型集合
	argsType := make([]reflect.Type, funcType.NumIn())

	// 获取FaaS的函数名称
	fullName := runtime.FuncForPC(funcValue.Pointer()).Name()

	// 确保  FaaS func(context.Context, Flow, ...interface{}) error 形参列表,存在context.Context 和 kis.Flow

	// 是否包含kis.Flow类型的形参
	containsKisFlow := false
	// 是否包含context.Context类型的形参
	containsCtx := false

	// 遍历FaaS的形参类型
	for i := 0; i < funcType.NumIn(); i++ {

		// 取出第i个形式参数类型
		paramType := funcType.In(i)

		if isFlowType(paramType) {
			// 判断是否包含kis.Flow类型的形参
			containsKisFlow = true

		} else if isContextType(paramType) {
			// 判断是否包含context.Context类型的形参
			containsCtx = true

		} else if isSliceType(paramType) {
            // 判断是否包含Slice类型的形参

			// 获取当前参数Slice的元素类型
			itemType := paramType.Elem()

			// 如果当前参数是一个指针类型,则获取指针指向的结构体类型
			if itemType.Kind() == reflect.Ptr {
				itemType = itemType.Elem() // 获取指针指向的结构体类型
			}
		} else {
			// Other types are not supported...
		}

		// 将当前形参类型追加到argsType集合中
		argsType[i] = paramType
	}

	if !containsKisFlow {
		// 不包含kis.Flow类型的形参,返回错误
		return nil, errors.New("function parameters must have kis.Flow param, please use FaaS type like: [type FaaS func(context.Context, Flow, ...interface{}) error]")
	}

	if !containsCtx {
		// 不包含context.Context类型的形参,返回错误
		return nil, errors.New("function parameters must have context, please use FaaS type like: [type FaaS func(context.Context, Flow, ...interface{}) error]")
	}

	// 返回FaaSDesc描述实例
	return &FaaSDesc{
		FnName:    fnName,
		f:         f,
		fName:     fullName,
		ArgsType:  argsType,
		ArgNum:    len(argsType),
		FuncType:  funcType,
		FuncValue: funcValue,
	}, nil
}

这里面通过用reflect反射能力,依次从f函数中获取相关的属性值,存放在FaaSDesc中。 这里面为了确保开发者传递的FaaS原因满足如下格式:

type FaaS func(context.Context, Flow, ...interface{}) error

所以对形参context.Context和形参Flow做了严格的形参类型校验,其中的校验方法如下:

kis-flow/kis/faas.go

// isFuncType 判断传递进来的 paramType 是否是函数类型
func isFuncType(paramType reflect.Type) bool {
	return paramType.Kind() == reflect.Func
}

// isFlowType 判断传递进来的 paramType 是否是 kis.Flow 类型
func isFlowType(paramType reflect.Type) bool {
	var flowInterfaceType = reflect.TypeOf((*Flow)(nil)).Elem()

	return paramType.Implements(flowInterfaceType)
}

// isContextType 判断传递进来的 paramType 是否是 context.Context 类型
func isContextType(paramType reflect.Type) bool {
	typeName := paramType.Name()

	return strings.Contains(typeName, "Context")
}

// isSliceType 判断传递进来的 paramType 是否是切片类型
func isSliceType(paramType reflect.Type) bool {
	return paramType.Kind() == reflect.Slice
}

NewFaaSDesc()containsKisFlowcontainsCtx两个bool类型的变量来判断是否包括Context和Flow类型。 下面这段代码是为了兼容传递的形参类型是结构体指针时候的兼容:

            // ... ... 

            // 获得当前形参类型
			itemType := paramType.Elem()

			// 如果当前参数是一个指针类型,则获取指针指向的结构体类型
			if itemType.Kind() == reflect.Ptr {
				itemType = itemType.Elem() // 获取指针指向的结构体类型
			}

            // ... ... 

比如开发者传递的FaaS函数原型如下:

func MyFaaSDemo(ctx context.Context, flow Flow, []*A) error

和:

func MyFaaSDemo(ctx context.Context, flow Flow, []A) error

11.1.3 注册FaaS函数

那么接下来,我们将kisPool模块,的注册FaaS函数的方法修改成注册一个FaaSDesc描述,修改后的注册方法如下:

kis-flow/kis/pool.go

// FaaS 注册 Function 计算业务逻辑, 通过Function Name 索引及注册
func (pool *kisPool) FaaS(fnName string, f FaaS) {

	// 当注册FaaS计算逻辑回调时,创建一个FaaSDesc描述对象
	faaSDesc, err := NewFaaSDesc(fnName, f)
	if err != nil {
		panic(err)
	}

	pool.fnLock.Lock() // 写锁
	defer pool.fnLock.Unlock()

	if _, ok := pool.fnRouter[fnName]; !ok {
		// 将FaaSDesc描述对象注册到fnRouter中
		pool.fnRouter[fnName] = faaSDesc
	} else {
		errString := fmt.Sprintf("KisPoll FaaS Repeat FuncName=%s", fnName)
		panic(errString)
	}

	log.Logger().InfoF("Add KisPool FuncName=%s", fnName)
}

那么现在的fnRouter中保存的key依然是FunctionName,但是value则为当前FaaS函数的描述对象FaaSDesc.

11.1.4 kisPool调度FaaSDesc

最后再调度Function的时候,通过FaaSDesc取出调度函数地址和函数形参列表进行函数的调度。 修改的后的CallFunction()如下:

kis-flow/kis/pool.go

// CallFunction 调度 Function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {

	if funcDesc, ok := pool.fnRouter[fnName]; ok {

		// 被调度Function的形参列表
		params := make([]reflect.Value, 0, funcDesc.ArgNum)

		for _, argType := range funcDesc.ArgsType {

			// 如果是Flow类型形参,则将 flow的值传入
			if isFlowType(argType) {
				params = append(params, reflect.ValueOf(flow))
				continue
			}

			// 如果是Context类型形参,则将 ctx的值传入
			if isContextType(argType) {
				params = append(params, reflect.ValueOf(ctx))
				continue
			}

			// 如果是Slice类型形参,则将 flow.Input()的值传入
			if isSliceType(argType) {
				params = append(params, value)
				continue
			}

			// 传递的参数,既不是Flow类型,也不是Context类型,也不是Slice类型,则默认给到零值
			params = append(params, reflect.Zero(argType))
		}

		// 调用当前Function 的计算逻辑
		retValues := funcDesc.FuncValue.Call(params)

		// 取出第一个返回值,如果是nil,则返回nil
		ret := retValues[0].Interface()
		if ret == nil {
			return nil
		}

		// 如果返回值是error类型,则返回error
		return retValues[0].Interface().(error)

	}

	log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName)

	return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
}

函数的整体调度逻辑大致如下: 首选通过fnName进行从fnRouter路由到对应的FaaSDesc。遍历FaaSDesc的形参列表: 将Context和Flow对象依次取出来,将额外传递的自定义切片形参取出来,如果传递的参数,既不是Flow类型,也不是Context类型,也不是Slice类型,则默认给到零值,如下:

			params = append(params, reflect.Zero(argType))

最后执行函数的调度:

		retValues := funcDesc.FuncValue.Call(params)

得到第一个返回值error的数值,为nil则返回nil,否则返回error类型。

这样我们的FaaS自描述的调度模式就建立成功了,那么有了这套功能KisFlow可以做什么事情呢,下一节我们可以再调度FaaSDesc的时候将传递的自定义形参的数据类型进行序列化,得到开发者期待的数据类型。

11.2 FaaS形参的自定义数据类型序列化

11.2.1 Serialize序列化接口

首先,我们定义一个数据序列化接口,在kis-flow/kis/下创建serialize.go 文件,如下:

kis-flow/kis/serialize.go

// Serialize 数据序列化接口
type Serialize interface {
	// UnMarshal 用于将 KisRowArr 反序列化为指定类型的值。
	UnMarshal(common.KisRowArr, reflect.Type) (reflect.Value, error)
	// Marshal 用于将指定类型的值序列化为 KisRowArr。
	Marshal(interface{}) (common.KisRowArr, error)
}

其中KisRowArr是我们KisFlow中传递每个Function的数据切片,之前我们定义在了kis-flow/common/data_type.go中:

package common

// KisRow 一行数据
type KisRow interface{}

// KisRowArr 一次业务的批量数据
type KisRowArr []KisRow

/*
	KisDataMap 当前Flow承载的全部数据
   	key	:  数据所在的Function ID
    value: 对应的KisRow
*/
type KisDataMap map[string]KisRowArr

Serialize提供了两个接口:

  • UnMarshal:用于将 KisRowArr 反序列化为指定类型的值。
  • Marshal:用于将指定类型的值序列化为 KisRowArr。

KisFlow会提供一个默认的Serialize给每个FaaS函数,开发者也可以自定义自己的Serialize来对FaaS传递的形参进行自定义的数据序列化动作。

11.2.2 KisFlow默认的Serialize序列化

KisFlow提供一个默认的Serialize序列化实例,主要以Json格式为主,在kis-flow/下创建serialize/文件夹,在kis-flow/serialize/下创建serialize_default.go文件,实现的序列化和反序列化代码如下:

kis-flow/serialize/serialize_default.go

package serialize

import (
	"encoding/json"
	"fmt"
	"kis-flow/common"
	"reflect"
)

type DefaultSerialize struct{}

// UnMarshal 用于将 KisRowArr 反序列化为指定类型的值。
func (f *DefaultSerialize) UnMarshal(arr common.KisRowArr, r reflect.Type) (reflect.Value, error) {
	// 确保传入的类型是一个切片
	if r.Kind() != reflect.Slice {
		return reflect.Value{}, fmt.Errorf("r must be a slice")
	}

	slice := reflect.MakeSlice(r, 0, len(arr))

	// 遍历每个元素并尝试反序列化
	for _, row := range arr {
		var elem reflect.Value
		var err error

		// 尝试断言为结构体或指针
		elem, err = unMarshalStruct(row, r.Elem())
		if err == nil {
			slice = reflect.Append(slice, elem)
			continue
		}

		// 尝试直接反序列化字符串
		elem, err = unMarshalJsonString(row, r.Elem())
		if err == nil {
			slice = reflect.Append(slice, elem)
			continue
		}

		// 尝试先序列化为 JSON 再反序列化
		elem, err = unMarshalJsonStruct(row, r.Elem())
		if err == nil {
			slice = reflect.Append(slice, elem)
		} else {
			return reflect.Value{}, fmt.Errorf("failed to decode row: %v", err)
		}
	}

	return slice, nil
}

// Marshal 用于将指定类型的值序列化为 KisRowArr(json 序列化)。
func (f *DefaultSerialize) Marshal(i interface{}) (common.KisRowArr, error) {
	var arr common.KisRowArr

	switch reflect.TypeOf(i).Kind() {
	case reflect.Slice, reflect.Array:
		slice := reflect.ValueOf(i)
		for i := 0; i < slice.Len(); i++ {
			// 序列化每个元素为 JSON 字符串,并将其添加到切片中。
			jsonBytes, err := json.Marshal(slice.Index(i).Interface())
			if err != nil {
				return nil, fmt.Errorf("failed to marshal element to JSON: %v  ", err)
			}
			arr = append(arr, string(jsonBytes))
		}
	default:
		// 如果不是切片或数组类型,则直接序列化整个结构体为 JSON 字符串。
		jsonBytes, err := json.Marshal(i)
		if err != nil {
			return nil, fmt.Errorf("failed to marshal element to JSON: %v  ", err)
		}
		arr = append(arr, string(jsonBytes))
	}

	return arr, nil
}

其中一些函数定义如下:

kis-flow/serialize/serialize_default.go

// 尝试断言为结构体或指针
func unMarshalStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
	// 检查 row 是否为结构体或结构体指针类型
	rowType := reflect.TypeOf(row)
	if rowType == nil {
		return reflect.Value{}, fmt.Errorf("row is nil pointer")
	}
	if rowType.Kind() != reflect.Struct && rowType.Kind() != reflect.Ptr {
		return reflect.Value{}, fmt.Errorf("row must be a struct or struct pointer type")
	}

	// 如果 row 是指针类型,则获取它指向的类型
	if rowType.Kind() == reflect.Ptr {
		// 空指针
		if reflect.ValueOf(row).IsNil() {
			return reflect.Value{}, fmt.Errorf("row is nil pointer")
		}

		// 解引用
		row = reflect.ValueOf(row).Elem().Interface()

		// 拿到解引用后的类型
		rowType = reflect.TypeOf(row)
	}

	// 检查是否可以将 row 断言为 elemType(目标类型)
	if !rowType.AssignableTo(elemType) {
		return reflect.Value{}, fmt.Errorf("row type cannot be asserted to elemType")
	}

	// 将 row 转换为 reflect.Value 并返回
	return reflect.ValueOf(row), nil
}

// 尝试直接反序列化字符串(将Json字符串 反序列化为 结构体)
func unMarshalJsonString(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
	// 判断源数据是否可以断言成string
	str, ok := row.(string)
	if !ok {
		return reflect.Value{}, fmt.Errorf("not a string")
	}

	// 创建一个新的结构体实例,用于存储反序列化后的值
	elem := reflect.New(elemType).Elem()

	// 尝试将json字符串反序列化为结构体。
	if err := json.Unmarshal([]byte(str), elem.Addr().Interface()); err != nil {
		return reflect.Value{}, fmt.Errorf("failed to unmarshal string to struct: %v", err)
	}

	return elem, nil
}

// 尝试先序列化为 JSON 再反序列化(将结构体转换成Json字符串,再将Json字符串 反序列化为 结构体)
func unMarshalJsonStruct(row common.KisRow, elemType reflect.Type) (reflect.Value, error) {
	// 将 row 序列化为 JSON 字符串
	jsonBytes, err := json.Marshal(row)
	if err != nil {
		return reflect.Value{}, fmt.Errorf("failed to marshal row to JSON: %v  ", err)
	}

	// 创建一个新的结构体实例,用于存储反序列化后的值
	elem := reflect.New(elemType).Interface()

	// 将 JSON 字符串反序列化为结构体
	if err := json.Unmarshal(jsonBytes, elem); err != nil {
		return reflect.Value{}, fmt.Errorf("failed to unmarshal JSON to element: %v  ", err)
	}

	return reflect.ValueOf(elem).Elem(), nil
}

  • UnMarshal(): 的实现 首先判断形参是否是一个Slice,如果是的话,那么切片中的每个元素的数据进行序列化,优先尝试unMarshalStruct()结构体反序列化,其次尝试json字符串的反序列化unMarshalJsonString(),最后再尝试具备相同属性的结构体但是名称不同的反序列化unMarshalJsonStruct()
  • Marshal(): 则是将任意类型序列化为json二进制字符串存储在KisRowArr中。

注意:KisFlow目前的默认序列化只实现了json格式的序列化,开发者可以参考DefaultSerialize{} 来实现自己其他格式的数据序列化和反序列化动作。

11.2.3 默认的默认的Serialize实例

在serialize的接口定义中,定义一个全局默认的序列化实例,defaultSerialize。

kis-flow/kis/serialize.go

// defaultSerialize KisFlow提供的默认序列化实现(开发者可以自定义)
var defaultSerialize = &serialize.DefaultSerialize{}

同时提供一个判断一个数据类型是否实现了抽象接口Serialize的校验方法,如下:

kis-flow/kis/serialize.go

// isSerialize 判断传递进来的 paramType 是否实现了 Serialize 接口
func isSerialize(paramType reflect.Type) bool {
	return paramType.Implements(reflect.TypeOf((*Serialize)(nil)).Elem())
}

11.2.4 FaaSDesc实现Serialize序列化接口

现在将FaaSDesc去继承且实现Serialize接口,在调度FaaSDesc的时候将传递的输入参数进行序列化得到相对应的具体类型形参,定义如下:

kis-flow/kis/faas.go

// FaaSDesc FaaS 回调计算业务函数 描述
type FaaSDesc struct {
    // +++++++
	Serialize                // 当前Function的数据输入输出序列化实现
    // +++++++
	FnName    string         // Function名称
	f         interface{}    // FaaS 函数
	fName     string         // 函数名称
	ArgsType  []reflect.Type // 函数参数类型(集合)
	ArgNum    int            // 函数参数个数
	FuncType  reflect.Type   // 函数类型
	FuncValue reflect.Value  // 函数值(函数地址)
}

然后,在构造方法NewFaaSDesc()加上对自定义形参的判断,判断传递的自定义形参是否实现了Serialize的两个序列化接口,如果实现了,则使用自定义的序列化接口,如果没有实现,则使用默认的DefaultSerialize{}实例。

kis-flow/kis/faas.go

// NewFaaSDesc 根据用户注册的FnName 和FaaS 回调函数,创建 FaaSDesc 描述实例
func NewFaaSDesc(fnName string, f FaaS) (*FaaSDesc, error) {

    // ++++++++++
	// 输入输出序列化实例
	var serializeImpl Serialize
    // ++++++++++

	// ... ...
    // ... ...
    
	// 遍历FaaS的形参类型
	for i := 0; i < funcType.NumIn(); i++ {

		// 取出第i个形式参数类型
		paramType := funcType.In(i)

		if isFlowType(paramType) {
			// 判断是否包含kis.Flow类型的形参
			containsKisFlow = true

		} else if isContextType(paramType) {
			// 判断是否包含context.Context类型的形参
			containsCtx = true

		} else if isSliceType(paramType) {

			// 获取当前参数Slice的元素类型
			itemType := paramType.Elem()

			// 如果当前参数是一个指针类型,则获取指针指向的结构体类型
			if itemType.Kind() == reflect.Ptr {
				itemType = itemType.Elem() // 获取指针指向的结构体类型
			}


            // +++++++++++++++++++++++++++++

			// Check if f implements Serialize interface
			// (检测传递的FaaS函数是否实现了Serialize接口)
			if isSerialize(itemType) {
				// 如果当前形参实现了Serialize接口,则使用当前形参的序列化实现
				serializeImpl = reflect.New(itemType).Interface().(Serialize)

			} else {
				// 如果当前形参没有实现Serialize接口,则使用默认的序列化实现
				serializeImpl = defaultSerialize // Use global default implementation
			}
            // +++++++++++++++++++++++++++++++
            
		} else {
			// Other types are not supported
		}

		// 将当前形参类型追加到argsType集合中
		argsType[i] = paramType
	}

	// ... ...
    // ... ...

	// 返回FaaSDesc描述实例
	return &FaaSDesc{
		Serialize: serializeImpl,
		FnName:    fnName,
		f:         f,
		fName:     fullName,
		ArgsType:  argsType,
		ArgNum:    len(argsType),
		FuncType:  funcType,
		FuncValue: funcValue,
	}, nil
}

11.2.5 完成调度FaaS数据序列化

最后在调度FaaSDesc的时候,解析形参的时候,如果是自定义的Slice参数,则对齐进行反序列化操作,将flow.Input()的原数据反序列化成为开发者需要的结构体数据,进行调度FaaS,实现如下:

kis-flow/kis/pool.go

// CallFunction 调度 Function
func (pool *kisPool) CallFunction(ctx context.Context, fnName string, flow Flow) error {

	if funcDesc, ok := pool.fnRouter[fnName]; ok {

		// 被调度Function的形参列表
		params := make([]reflect.Value, 0, funcDesc.ArgNum)

		for _, argType := range funcDesc.ArgsType {

			// 如果是Flow类型形参,则将 flow的值传入
			if isFlowType(argType) {
				params = append(params, reflect.ValueOf(flow))
				continue
			}

			// 如果是Context类型形参,则将 ctx的值传入
			if isContextType(argType) {
				params = append(params, reflect.ValueOf(ctx))
				continue
			}

			// 如果是Slice类型形参,则将 flow.Input()的值传入
			if isSliceType(argType) {

                // +++++++++++++++++++
				// 将flow.Input()中的原始数据,反序列化为argType类型的数据
				value, err := funcDesc.Serialize.UnMarshal(flow.Input(), argType)
				if err != nil {
					log.Logger().ErrorFX(ctx, "funcDesc.Serialize.DecodeParam err=%v", err)
				} else {
					params = append(params, value)
					continue
				}
                // +++++++++++++++++++
			}

			// 传递的参数,既不是Flow类型,也不是Context类型,也不是Slice类型,则默认给到零值
			params = append(params, reflect.Zero(argType))
		}

		// 调用当前Function 的计算逻辑
		retValues := funcDesc.FuncValue.Call(params)

		// 取出第一个返回值,如果是nil,则返回nil
		ret := retValues[0].Interface()
		if ret == nil {
			return nil
		}

		// 如果返回值是error类型,则返回error
		return retValues[0].Interface().(error)

	}

	log.Logger().ErrorFX(ctx, "FuncName: %s Can not find in KisPool, Not Added.\n", fnName)

	return errors.New("FuncName: " + fnName + " Can not find in NsPool, Not Added.")
}


这样我们就将数据序列化的动作和FaaSDesc模块结合起来了,接下来,我们写一个单元测试来测试这部分的能力。

11.3 自定义形参序列化单元测试

11.3.1 Flow与Function的配置文件定义

单元测试,我们新建两个Function配置如下:

kis-flow/test/load_conf/func/func-avgStuScore.yml

kistype: func
fname: AvgStuScore
fmode: Calculate
source:
    name: 学生平均分
    must:
        - stu_id

kis-flow/test/load_conf/func/func-PrintStuAvgScore.yml

kistype: func
fname: PrintStuAvgScore
fmode: Expand
source:
    name: 学生平均分
    must:
        - stu_id

然后我们来定义一个Flow将上述的两个Function链接起来

kis-flow/test/load_conf/flow/flow-StuAvg.yml

kistype: flow
status: 1
flow_name: StuAvg
flows:
    - fname: AvgStuScore
    - fname: PrintStuAvgScore

11.3.2 自定义基础数据proto定义

kis-flow/test/下创建proto/文件夹,创建一个自定义的基础数据proto,为了今后数据协议的复用,如下:

kis-flow/test/proto/stu_score.go

package proto

// 学生学习分数
type StuScores struct {
	StuId  int `json:"stu_id"`
	Score1 int `json:"score_1"`
	Score2 int `json:"score_2"`
	Score3 int `json:"score_3"`
}

// 学生的平均分
type StuAvgScore struct {
	StuId    int     `json:"stu_id"`
	AvgScore float64 `json:"avg_score"`
}

11.3.3 定义两个FaaS计算回调函数

定义两个FaaS计算函数,一个为计算一个Student的平均分,一个打印Student的平均分,如下:

kis-flow/test/faas/faas_stu_score_avg.go

package faas

import (
	"context"
	"kis-flow/kis"
	"kis-flow/serialize"
	"kis-flow/test/proto"
)

type AvgStuScoreIn struct {
	serialize.DefaultSerialize
	proto.StuScores
}

type AvgStuScoreOut struct {
	serialize.DefaultSerialize
	proto.StuAvgScore
}

// AvgStuScore(FaaS) 计算学生平均分
func AvgStuScore(ctx context.Context, flow kis.Flow, rows []*AvgStuScoreIn) error {
	for _, row := range rows {
		avgScore := proto.StuAvgScore{
			StuId:    row.StuId,
			AvgScore: float64(row.Score1+row.Score2+row.Score3) / 3,
		}
		// 提交结果数据
		_ = flow.CommitRow(AvgStuScoreOut{StuAvgScore: avgScore})
	}

	return nil
}

AvgStuScore()方法为我们改进之后的FaaS函数,其中第三个形参rows []*AvgStuScoreIn为我们自定义序列化的形参,之前我们通过flow.Input()来拿到原始的数据,然后进行遍历,其实现在依然可以这么处理,但是每次可能需要开发者在FaaS中自行断言判断,对开发的效率有些成本,那么现在开发者完全可以通过AvgStuScoreIn来描述一个形参的数据,然后在AvgStuScore的业务中,通过遍历rows得到已经序列化好的结构体,增加的代码的可读性也降低的写业务的开发成本,提高了效率。 打印平均分的FaaS实现如下:

kis-flow/test/faas/faas_stu_score_avg_print.go

package faas

import (
	"context"
	"fmt"
	"kis-flow/kis"
	"kis-flow/serialize"
	"kis-flow/test/proto"
)

type PrintStuAvgScoreIn struct {
	serialize.DefaultSerialize
	proto.StuAvgScore
}

type PrintStuAvgScoreOut struct {
	serialize.DefaultSerialize
}

func PrintStuAvgScore(ctx context.Context, flow kis.Flow, rows []*PrintStuAvgScoreIn) error {

	for _, row := range rows {
		fmt.Printf("stuid: [%+v], avg score: [%+v]\n", row.StuId, row.AvgScore)
	}

	return nil
}

与上述函数一样,我们依然采用自定义的输入形参来进行逻辑开发。

11.3.4 单元测试用例

接下来我们来编写上面Flow的测试用例单元测试,代码如下:

kis-flow/test/kis_auto_inject_param_test.go

package test

import (
	"context"
	"kis-flow/common"
	"kis-flow/config"
	"kis-flow/file"
	"kis-flow/flow"
	"kis-flow/kis"
	"kis-flow/test/faas"
	"kis-flow/test/proto"
	"testing"
)

func TestAutoInjectParamWithConfig(t *testing.T) {
	ctx := context.Background()

	kis.Pool().FaaS("AvgStuScore", faas.AvgStuScore)
	kis.Pool().FaaS("PrintStuAvgScore", faas.PrintStuAvgScore)

	// 1. 加载配置文件并构建Flow
	if err := file.ConfigImportYaml("load_conf/"); err != nil {
		panic(err)
	}

	// 2. 获取Flow
	flow1 := kis.Pool().GetFlow("StuAvg")
	if flow1 == nil {
		panic("flow1 is nil")
	}

	// 3. 提交原始数据
	_ = flow1.CommitRow(&faas.AvgStuScoreIn{
		StuScores: proto.StuScores{
			StuId:  100,
			Score1: 1,
			Score2: 2,
			Score3: 3,
		},
	})
	_ = flow1.CommitRow(faas.AvgStuScoreIn{
		StuScores: proto.StuScores{
			StuId:  100,
			Score1: 1,
			Score2: 2,
			Score3: 3,
		},
	})

	// 提交原始数据(json字符串)
	_ = flow1.CommitRow(`{"stu_id":101}`)

	// 4. 执行flow1
	if err := flow1.Run(ctx); err != nil {
		panic(err)
	}
}


在提交原始数据的时候,我们这里面采用的是使用默认的序列化方式,支持json的反序列化支持,在CommitRow()的时候,一共提交的3条数据,前两条是提交的结构体数据,最后一次是提交的json字符串,目前都可以支持。

cd 到kis-flow/test/下,执行:

$ go test -test.v -test.paniconexit0 -test.run TestAutoInjectParamWithConfig

得到结果如下:

$ go test -test.v -test.paniconexit0 -test.run TestAutoInjectParamWithConfig
...
...
Add KisPool FuncName=AvgStuScore
Add KisPool FuncName=PrintStuAvgScore
...
...
Add FlowRouter FlowName=StuAvg
context.Background
====> After CommitSrcData, flow_name = StuAvg, flow_id = flow-1265702bc905400da1788c0354080ded
All Level Data =
 map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}]]

KisFunctionC, flow = &{Id:flow-1265702bc905400da1788c0354080ded Name:StuAvg Conf:0xc000286100 Funcs:map[AvgStuScore:0xc00023af80 PrintStuAvgScore:0xc00023b000] FlowHead:0xc00023af80 FlowTail:0xc00023b000 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00023af80 ThisFunctionId:func-12a05e62a12a45fdade8477a3bddd2fd PrevFunctionId:FunctionIdFirstVirtual funcParams:map[func-12a05e62a12a45fdade8477a3bddd2fd:map[] func-7f308d00f4fa49488760ff1dfb85dc46:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}]] inPut:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc000210b88 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}

context.Background
 ====> After commitCurData, flow_name = StuAvg, flow_id = flow-1265702bc905400da1788c0354080ded
All Level Data =
 map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}] func-12a05e62a12a45fdade8477a3bddd2fd:[{DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:101 AvgScore:0}}]]

KisFunctionE, flow = &{Id:flow-1265702bc905400da1788c0354080ded Name:StuAvg Conf:0xc000286100 Funcs:map[AvgStuScore:0xc00023af80 PrintStuAvgScore:0xc00023b000] FlowHead:0xc00023af80 FlowTail:0xc00023b000 flock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} ThisFunction:0xc00023b000 ThisFunctionId:func-7f308d00f4fa49488760ff1dfb85dc46 PrevFunctionId:func-12a05e62a12a45fdade8477a3bddd2fd funcParams:map[func-12a05e62a12a45fdade8477a3bddd2fd:map[] func-7f308d00f4fa49488760ff1dfb85dc46:map[]] fplock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0} buffer:[] data:map[FunctionIdFirstVirtual:[0xc0002bab40 {DefaultSerialize:{} StuScores:{StuId:100 Score1:1 Score2:2 Score3:3}} {"stu_id":101}] func-12a05e62a12a45fdade8477a3bddd2fd:[{DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:101 AvgScore:0}}]] inPut:[{DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:100 AvgScore:2}} {DefaultSerialize:{} StuAvgScore:{StuId:101 AvgScore:0}}] abort:false action:{DataReuse:false ForceEntryNext:false JumpFunc: Abort:false} cache:0xc000210b88 metaData:map[] mLock:{w:{state:0 sema:0} writerSem:0 readerSem:0 readerCount:0 readerWait:0}}

stuid: [100], avg score: [2]
stuid: [100], avg score: [2]
stuid: [101], avg score: [0]
--- PASS: TestAutoInjectParamWithConfig (0.01s)
PASS
ok      kis-flow/test   0.030s

11.4 【V1.0】 源代码

https://github.com/aceld/kis-flow/releases/tag/v1.0


作者:刘丹冰Aceld github: https://github.com/aceld
KisFlow开源项目地址:https://github.com/aceld/kis-flow

Golang框架实战-KisFlow流式计算框架专栏

Golang框架实战-KisFlow流式计算框架(1)-概述

Golang框架实战-KisFlow流式计算框架(2)-项目构建/基础模块-(上)

Golang框架实战-KisFlow流式计算框架(3)-项目构建/基础模块-(下)

Golang框架实战-KisFlow流式计算框架(4)-数据流

Golang框架实战-KisFlow流式计算框架(5)-Function调度

Golang框架实战-KisFlow流式计算框架(6)-Connector

Golang框架实战-KisFlow流式计算框架(7)-配置导入与导出

Golang框架实战-KisFlow流式计算框架(8)-KisFlow Action

Golang框架实战-KisFlow流式计算框架(10)-Flow多副本