掘金 后端 ( ) • 2024-06-27 10:15

一、背景

序列化是指将数据从内存中的对象序列化为字节流,以便在网络中传输或持久化存储。序列化在Apache Flink中非常重要,因为它涉及到数据传输和状态管理等关键部分。Apache Flink以其独特的方式来处理数据类型以及序列化,这种方式包括它自身的类型描述符、泛型类型提取以及类型序列化框架。本文将简单介绍它们背后的概念和基本原理,侧重分享在DataStream、Flink SQL自定义函数开发中对数据类型和序列的应用,以提升任务的运行效率。

二、简单理论阐述(基于Flink 1.13)

主要参考Apache Flink 1.13

支持的数据类型

  • Java Tuples and Scala Case Classes
  • Java POJOs
  • Primitive Types
  • Regular Classes
  • Values
  • Hadoop Writables
  • Special Types

具体的数据类型定义在此就不详细介绍了,具体描述可以前往Flink官网查看。

TypeInformation

Apache Flink量身定制了一套序列化框架,好处就是选择自己定制的序列化框架,对类型信息了解越多,可以在早期完成类型检查,更好地选取序列化方式,进行数据布局,节省数据的存储空间,甚至直接操作二进制数据。

TypeInformation类是Apache Flink所有类型描述符的基类,通过阅读源码,我们可以大概分成以下几种数据类型。

  • Basic types:所有的Java类型以及包装类:void,String,Date,BigDecimal,and BigInteger等。
  • Primitive arrays以及Object arrays
  • Composite types
  • Flink Java Tuples(Flink Java API的一部分):最多25个字段,不支持空字段
  • Scala case classes(包括Scala Tuples):不支持null字段
  • Row:具有任意数量字段并支持空字段的Tuples
  • POJO 类:JavaBeans
  • Auxiliary types (Option,Either,Lists,Maps,…)
  • Generic types:Flink内部未维护的类型,这种类型通常是由Kryo序列化。

我们简单看下该类的方法,核心是createSerializer,获取org.apache.flink.api.common.typeutils.TypeSerializer,执行序列化以及反序列化方法,主要是:

  • org.apache.flink.api.common.typeutils.TypeSerializer#serialize
  • org.apache.flink.api.common.typeutils.TypeSerializer#deserialize(org.apache.flink.core.memory.DataInputView)

何时需要数据类型获取

在Apache Flink中,算子间的数据类型传递是通过流处理的数据流来实现的。数据流可以在算子之间流动,每个算子对数据流进行处理并产生输出。当数据流从一个算子流向另一个算子时,数据的类型也会随之传递。Apache Flink使用自动类型推断机制来确定数据流中的数据类型。在算子之间传递数据时,Apache Flink会根据上下文自动推断数据的类型,并在运行时保证数据的类型一致性。

举个例子:新增一个kafka source,这个时候我们需要指定数据输出类型。

@Experimental
public <OUT> DataStreamSource<OUT> fromSource(
        Source<OUT, ?, ?> source,
        WatermarkStrategy<OUT> timestampsAndWatermarks,
        String sourceName,
        TypeInformation<OUT> typeInfo) {

    final TypeInformation<OUT> resolvedTypeInfo =
            getTypeInfo(source, sourceName, Source.class, typeInfo);

    return new DataStreamSource<>(
            this,
            checkNotNull(source, "source"),
            checkNotNull(timestampsAndWatermarks, "timestampsAndWatermarks"),
            checkNotNull(resolvedTypeInfo),
            checkNotNull(sourceName));
}

那输入类型怎么不需要指定呢?可以简单看下OneInputTransformation(单输入算子的基类)类的getInputType()方法,就是以输入算子的输出类型为输入类型的。

/** Returns the {@code TypeInformation} for the elements of the input. */
public TypeInformation<IN> getInputType() {
    return input.getOutputType();
}

这样source的输出类型会变成下一个算子的输入。整个DAG的数据类型都会传递下去。Apache Flink获取到数据类型后,就可以获取对应的序列化方法。

还有一种情况就是与状态后端交互的时候需要获取数据类型,特别是非JVM堆存储的后端,需要频繁的序列化以及反序列化,例如RocksDBStateBackend

举个例子,当我们使用ValueState时需要调用以下API:

org.apache.flink.streaming.api.operators.StreamingRuntimeContext#getState

@Override
public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
    KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);
    stateProperties.initializeSerializerUnlessSet(getExecutionConfig());
    return keyedStateStore.getState(stateProperties);
}

public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) {
    if (serializerAtomicReference.get() == null) {
        checkState(typeInfo != null, "no serializer and no type info");
        // try to instantiate and set the serializer
        TypeSerializer<T> serializer = typeInfo.createSerializer(executionConfig);
        // use cas to assure the singleton
        if (!serializerAtomicReference.compareAndSet(null, serializer)) {
            LOG.debug("Someone else beat us at initializing the serializer.");
        }
    }
}

可以从org.apache.flink.api.common.state.StateDescriptor#initializeSerializerUnlessSet方法看出,需要通过传入的数据类型来获取具体的序列化器。来执行具体的序列化和反序列化逻辑,完成数据的交互。

数据类型的自动推断

乍一看很复杂,各个环节都需要指定数据类型。其实大部分应用场景下,我们不用关注数据的类型以及序列化方式。Flink会尝试推断有关分布式计算期间交换和存储的数据类型的信息。

这里简单介绍Flink类型自动推断的核心类:

org.apache.flink.api.java.typeutils.TypeExtractor

在数据流操作中,Flink使用了泛型来指定输入和输出的类型。例如,DataStream表示一个具有类型T的数据流。在代码中使用的泛型类型参数T会被TypeExtractor类解析和推断。在运行时,Apache Flink会通过调用TypeExtractor的静态方法来分析操作的输入和输出,并将推断出的类型信息存储在运行时的环境中。

举个例子:用的最多的flatMap算子,当我们不指定返回类型的时候,Flink会调用TypeExtractor类自动去推断用户的类型。

public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
    TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes((FlatMapFunction)this.clean(flatMapper), this.getType(), Utils.getCallLocationName(), true);
    return this.flatMap(flatMapper, outType);
}

一般看开源框架某个类的功能我都会先看类的注释,也看TypeExtractor的注释,大概意思这是一个对类进行反射分析的实用程序,用于确定返回的数据类型。

/**
 * A utility for reflection analysis on classes, to determine the return type of implementations of
 * transformation functions.
 *
 * <p>NOTES FOR USERS OF THIS CLASS: Automatic type extraction is a hacky business that depends on a
 * lot of variables such as generics, compiler, interfaces, etc. The type extraction fails regularly
 * with either {@link MissingTypeInfo} or hard exceptions. Whenever you use methods of this class,
 * make sure to provide a way to pass custom type information as a fallback.
 */

我们来看下其中一个核心的静态推断逻辑,org.apache.flink.api.java.typeutils.TypeExtractor#getUnaryOperatorReturnType

@PublicEvolving
public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(
        Function function,
        Class<?> baseClass,
        int inputTypeArgumentIndex,
        int outputTypeArgumentIndex,
        int[] lambdaOutputTypeArgumentIndices,
        TypeInformation<IN> inType,
        String functionName,
        boolean allowMissing) {

    Preconditions.checkArgument(
            inType == null || inputTypeArgumentIndex >= 0,
            "Input type argument index was not provided");
    Preconditions.checkArgument(
            outputTypeArgumentIndex >= 0, "Output type argument index was not provided");
    Preconditions.checkArgument(
            lambdaOutputTypeArgumentIndices != null,
            "Indices for output type arguments within lambda not provided");

    // explicit result type has highest precedence
    if (function instanceof ResultTypeQueryable) {
        return ((ResultTypeQueryable<OUT>) function).getProducedType();
    }

    // perform extraction
    try {
        final LambdaExecutable exec;
        try {
            exec = checkAndExtractLambda(function);
        } catch (TypeExtractionException e) {
            throw new InvalidTypesException("Internal error occurred.", e);
        }
        if (exec != null) {

            // parameters must be accessed from behind, since JVM can add additional parameters
            // e.g. when using local variables inside lambda function
            // paramLen is the total number of parameters of the provided lambda, it includes
            // parameters added through closure
            final int paramLen = exec.getParameterTypes().length;

            final Method sam = TypeExtractionUtils.getSingleAbstractMethod(baseClass);

            // number of parameters the SAM of implemented interface has; the parameter indexing
            // applies to this range
            final int baseParametersLen = sam.getParameterTypes().length;

            final Type output;
            if (lambdaOutputTypeArgumentIndices.length > 0) {
                output =
                        TypeExtractionUtils.extractTypeFromLambda(
                                baseClass,
                                exec,
                                lambdaOutputTypeArgumentIndices,
                                paramLen,
                                baseParametersLen);
            } else {
                output = exec.getReturnType();
                TypeExtractionUtils.validateLambdaType(baseClass, output);
            }

            return new TypeExtractor().privateCreateTypeInfo(output, inType, null);
        } else {
            if (inType != null) {
                validateInputType(
                        baseClass, function.getClass(), inputTypeArgumentIndex, inType);
            }
            return new TypeExtractor()
                    .privateCreateTypeInfo(
                            baseClass,
                            function.getClass(),
                            outputTypeArgumentIndex,
                            inType,
                            null);
        }
    } catch (InvalidTypesException e) {
        if (allowMissing) {
            return (TypeInformation<OUT>)
                    new MissingTypeInfo(
                            functionName != null ? functionName : function.toString(), e);
        } else {
            throw e;
        }
    }
}
  • 首先判断该算子是否实现了ResultTypeQueryable接口,本质上就是用户是否显式指定了数据类型,例如我们熟悉的Kafka source就实现了该方法,当使用了JSONKeyValueDeserializationSchema,就显式指定了类型,用户自定义Schema就需要自己指定。
public class KafkaSource<OUT>
        implements Source<OUT, KafkaPartitionSplit, KafkaSourceEnumState>,
                ResultTypeQueryable<OUT>
//deserializationSchema 是需要用户自己定义的。
@Override
public TypeInformation<OUT> getProducedType() {
    return deserializationSchema.getProducedType();
}                
//JSONKeyValueDeserializationSchema
@Override
public TypeInformation<ObjectNode> getProducedType() {
    return getForClass(ObjectNode.class);
}
  • 未实现ResultTypeQueryable接口,就会通过反射的方法获取ReturnType,判断逻辑大概是从是否是Java 8 lambda方法开始判断的。获取到返回类型后再通过new TypeExtractor()).privateCreateTypeInfo(output,inType,(TypeInformation)null)封装成Flink内部能识别的数据类型;大致分为2类,泛型类型变量TypeVariable以及非泛型类型变量。这个封装的过程也是非常重要的,推断的数据类型是Flink内部封装好的类型,序列化基本都很高效,如果不是, 就会推断为GenericTypeInfo走Kryo等序列化方式。如感兴趣,可以看下这块的源码,在此不再赘述。

通过以上的代码逻辑的阅读,我们大概能总结出以下结论:Flink内部维护了很多高效的序列化方式,通常只有数据类型被推断为org.apache.flink.api.java.typeutils.GenericTypeInfo时我们才需要自定义序列化类型,否则性能就是灾难;或者无法推断类型的时候,例如Flink SQL复杂类型有时候是无法自动推断类型的,当然某些特殊的对象Kryo也无法序列化,比如之前遇到过TreeMap无法Kryo序列化 (也可能是自己姿势不对),建议在开发Apache Flink作业时可以养成显式指定数据类型的好习惯。

三、开发实践

Flink代码作业

如何显式指定数据类型

这个简单了,几乎所有的source、Keyby、算子等都暴露了指定TypeInformation typeInfo的构造方法,以下简单列举几个:

  • source
@Experimental
public <OUT> DataStreamSource<OUT> fromSource(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> timestampsAndWatermarks, String sourceName, TypeInformation<OUT> typeInfo) {
    TypeInformation<OUT> resolvedTypeInfo = this.getTypeInfo(source, sourceName, Source.class, typeInfo);
    return new DataStreamSource(this, (Source)Preconditions.checkNotNull(source, "source"), (WatermarkStrategy)Preconditions.checkNotNull(timestampsAndWatermarks, "timestampsAndWatermarks"), (TypeInformation)Preconditions.checkNotNull(resolvedTypeInfo), (String)Preconditions.checkNotNull(sourceName));
}
  • map
public <R> SingleOutputStreamOperator<R> map(
        MapFunction<T, R> mapper, TypeInformation<R> outputType) {
    return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}
  • 自定义Operator
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
    return this.doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}
  • keyBy
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key, TypeInformation<K> keyType) {
    Preconditions.checkNotNull(key);
    Preconditions.checkNotNull(keyType);
    return new KeyedStream(this, (KeySelector)this.clean(key), keyType);
}
  • 状态后端
public ValueStateDescriptor(String name, TypeInformation<T> typeInfo) {
    super(name, typeInfo, (Object)null);
}

自定义数据类型&自定义序列化器

当遇到复杂数据类型,或者需要优化任务性能时,需要自定义数据类型,以下分享几种场景以及实现代码:

  • POJO类

例如大家最常用的POJO类,何为POJO类大家可以自行查询,Flink对POJO类做了大量的优化,大家使用Java对象最好满足POJO的规范。

举个例子,这是一个典型的POJO类:

@Data
public class BroadcastConfig implements Serializable {
    public String config_type;
    public String date;
    public String media_id;
    public String account_id;
    public String label_id;
    public long start_time;
    public long end_time;
    public int interval;
    public String msg;

    public BroadcastConfig() {
    }
    }

我们可以这样指定其数据类型,返回的数据类就是一个TypeInformation

HashMap<String, TypeInformation<?>> pojoFieldName = new HashMap<>();
pojoFieldName.put("config_type", Types.STRING);
pojoFieldName.put("date", Types.STRING);
pojoFieldName.put("media_id", Types.STRING);
pojoFieldName.put("account_id", Types.STRING);
pojoFieldName.put("label_id", Types.STRING);
pojoFieldName.put("start_time", Types.LONG);
pojoFieldName.put("end_time", Types.LONG);
pojoFieldName.put("interval", Types.INT);
pojoFieldName.put("msg", Types.STRING);

return Types.POJO(
        BroadcastConfig.class,
        pojoFieldName
);

如感兴趣,可以看下org.apache.flink.api.java.typeutils.runtime.PojoSerializer,看Flink本身对其做了哪些优化。

  • 自定义TypeInformation

某些特殊场景可能还需要复杂的对象,例如,需要极致的性能优化,在Flink Table Api中数据对象传输,大部分都是BinaryRowdata,效率非常高。我们在Flink Datastram代码作业中也想使用,怎么操作呢?这里分享一种实现方式——自定义TypeInformation,当然还有更优的实现方式,这里就不介绍了。

代码实现:本质上就是继承TypeInformation,实现对应的方法。核心逻辑是createSerializer()方法,这里我们直接使用Table Api中已经实现的BinaryRowDataSerializer,就可以达到同Flink SQL相同的性能优化。

public  class BinaryRowDataTypeInfo extends TypeInformation<BinaryRowData> {

    private static final long serialVersionUID = 4786289562505208256L;
    private final int numFields;
    private final Class<BinaryRowData> clazz;
    private final TypeSerializer<BinaryRowData> serializer;

    public BinaryRowDataTypeInfo(int numFields) {
        this.numFields=numFields;
        this.clazz=BinaryRowData.class;
        serializer= new BinaryRowDataSerializer(numFields);
    }

    @Override
    public boolean isBasicType() {
        return false;
    }

    @Override
    public boolean isTupleType() {
        return false;
    }

    @Override
    public int getArity() {
        return numFields;
    }

    @Override
    public int getTotalFields() {
        return numFields;
    }

    @Override
    public Class<BinaryRowData> getTypeClass() {
        return this.clazz;
    }

    @Override
    public boolean isKeyType() {
        return false;
    }

    @Override
    public TypeSerializer<BinaryRowData> createSerializer(ExecutionConfig config) {
        return serializer;
    }

    @Override
    public String toString() {
        return "BinaryRowDataTypeInfo<" + clazz.getCanonicalName() + ">";
    }

    @Override
    public boolean equals(Object obj) {
        if (obj instanceof BinaryRowDataTypeInfo) {

            BinaryRowDataTypeInfo that = (BinaryRowDataTypeInfo) obj;

            return that.canEqual(this)
                    && this.numFields==that.numFields;
        } else {
            return false;
        }
    }

    @Override
    public int hashCode() {
        return Objects.hash(this.clazz,serializer.hashCode());
    }

    @Override
    public boolean canEqual(Object obj) {
        return obj instanceof BinaryRowDataTypeInfo;
    }
}

所以这里建议Apache Flink代码作业开发可以尽可能使用已经优化好的数据类型,例如BinaryRowdata,可以用于高性能的数据处理场景,例如在内存中进行批处理或流式处理。由于数据以二进制形式存储,可以更有效地使用内存和进行数据序列化。同时,BinaryRowData还提供了一组方法,用于访问和操作二进制数据。

  • 自定义TypeSerializer

上面的例子只是自定义了TypeInformation,当然还会遇到自定义TypeSerializer的场景,例如Apache Flink本身没有封装的数据类型。

代码实现:这里以位图存储Roaring64Bitmap为例,在某些特殊场景可以使用bitmap精准去重,减少存储空间。

我们需要继承TypeSerializer,实现其核心逻辑也是serialize() 、deserialize() 方法,可以使用Roaring64Bitmap自带的序列化、反序列化方法。如果你使用的复杂对象没有提供序列化方法,你也可以自己实现或者找开源的序列化器。有了自定义的TypeSerializer就可以在你自定义的TypeInformation中调用。

public class Roaring64BitmapTypeSerializer extends TypeSerializer<Roaring64Bitmap> {
    /**
     * Sharable instance of the Roaring64BitmapTypeSerializer.
     */
    public static final Roaring64BitmapTypeSerializer INSTANCE = new Roaring64BitmapTypeSerializer();
    private static final long serialVersionUID = -8544079063839253971L;

    @Override
    public boolean isImmutableType() {
        return false;
    }

    @Override
    public TypeSerializer<Roaring64Bitmap> duplicate() {
        return this;
    }

    @Override
    public Roaring64Bitmap createInstance() {
        return new Roaring64Bitmap();
    }

    @Override
    public Roaring64Bitmap copy(Roaring64Bitmap from) {
        Roaring64Bitmap copiedMap = new Roaring64Bitmap();
        from.forEach(copiedMap::addLong);
        return copiedMap;
    }

    @Override
    public Roaring64Bitmap copy(Roaring64Bitmap from, Roaring64Bitmap reuse) {
        from.forEach(reuse::addLong);
        return reuse;
    }

    @Override
    public int getLength() {
        return -1;
    }

    @Override
    public void serialize(Roaring64Bitmap record, DataOutputView target) throws IOException {
        record.serialize(target);
    }

    @Override
    public Roaring64Bitmap deserialize(DataInputView source) throws IOException {
        Roaring64Bitmap navigableMap = new Roaring64Bitmap();
        navigableMap.deserialize(source);
        return navigableMap;
    }

    @Override
    public Roaring64Bitmap deserialize(Roaring64Bitmap reuse, DataInputView source) throws IOException {
        reuse.deserialize(source);
        return reuse;
    }

    @Override
    public void copy(DataInputView source, DataOutputView target) throws IOException {
        Roaring64Bitmap deserialize = this.deserialize(source);
        copy(deserialize);
    }

    @Override
    public boolean equals(Object obj) {
        if (obj == this) {
            return true;
        } else if (obj != null && obj.getClass() == Roaring64BitmapTypeSerializer.class) {
            return true;
        } else {
            return false;
        }
    }

    @Override
    public int hashCode() {
        return this.getClass().hashCode();
    }

    @Override
    public TypeSerializerSnapshot<Roaring64Bitmap> snapshotConfiguration() {
        return new Roaring64BitmapTypeSerializer.Roaring64BitmapSerializerSnapshot();
    }

    public static final class Roaring64BitmapSerializerSnapshot
            extends SimpleTypeSerializerSnapshot<Roaring64Bitmap> {

        public Roaring64BitmapSerializerSnapshot() {
            super(() -> Roaring64BitmapTypeSerializer.INSTANCE);
        }
    }
}

Flink SQL自定义函数

如何显式指定数据类型

这里简单分享下,在自定义Function开发下遇到复杂数据类型无法在accumulator 或者input、output中使用的问题,这里我们只介绍使用复杂数据对象如何指定数据类型的场景。

我们可以先看下FunctionDefinitionConvertRule,这是Apache Flink中的一个规则(Rule),用于将用户自定义的函数定义转换为对应的实现。其中通过getTypeInference()方法返回用于执行对此函数定义的调用的类型推理的逻辑。

@Override
public Optional<RexNode> convert(CallExpression call, ConvertContext context) {
    FunctionDefinition functionDefinition = call.getFunctionDefinition();

    // built-in functions without implementation are handled separately
    if (functionDefinition instanceof BuiltInFunctionDefinition) {
        final BuiltInFunctionDefinition builtInFunction =
                (BuiltInFunctionDefinition) functionDefinition;
        if (!builtInFunction.getRuntimeClass().isPresent()) {
            return Optional.empty();
        }
    }

    TypeInference typeInference =
            functionDefinition.getTypeInference(context.getDataTypeFactory());
    if (typeInference.getOutputTypeStrategy() == TypeStrategies.MISSING) {
        return Optional.empty();
    }

    switch (functionDefinition.getKind()) {
        case SCALAR:
        case TABLE:
            List<RexNode> args =
                    call.getChildren().stream()
                            .map(context::toRexNode)
                            .collect(Collectors.toList());

            final BridgingSqlFunction sqlFunction =
                    BridgingSqlFunction.of(
                            context.getDataTypeFactory(),
                            context.getTypeFactory(),
                            SqlKind.OTHER_FUNCTION,
                            call.getFunctionIdentifier().orElse(null),
                            functionDefinition,
                            typeInference);

            return Optional.of(context.getRelBuilder().call(sqlFunction, args));
        default:
            return Optional.empty();
    }
}

那我们指定复杂类型也会从通过该方法实现,不多说了,直接上代码实现。

  • 指定accumulatorType

这是之前写的AbstractLastValueWithRetractAggFunction功能主要是为了实现具有local-global的逻辑的LastValue,提升作业性能。

accumulator对象:LastValueWithRetractAccumulator,可以看到该对象是一个非常复杂的对象,包含5个属性,还有List 复杂嵌套,以及MapView等可以操作状态后端的对象,甚至有Object这种通用的对象。

public static class LastValueWithRetractAccumulator {
    public Object lastValue = null;
    public Long lastOrder = null;
    public List<Tuple2<Object, Long>> retractList = new ArrayList<>();
    public MapView<Object, List<Long>> valueToOrderMap = new MapView<>();
    public MapView<Long, List<Object>> orderToValueMap = new MapView<>();

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof LastValueWithRetractAccumulator)) {
            return false;
        }
        LastValueWithRetractAccumulator that = (LastValueWithRetractAccumulator) o;
        return Objects.equals(lastValue, that.lastValue)
                && Objects.equals(lastOrder, that.lastOrder)
                && Objects.equals(retractList, that.retractList)
                && valueToOrderMap.equals(that.valueToOrderMap)
                && orderToValueMap.equals(that.orderToValueMap)
                ;
    }

    @Override
    public int hashCode() {
        return Objects.hash(lastValue, lastOrder, valueToOrderMap, orderToValueMap, retractList);
    }

}

getTypeInference() 是FunctionDefinition接口的方法,而所有的用户自定义函数都实现了该接口,我们只需要重新实现下该方法就可以,以下是代码实现。

这里我们还需要用到工具类TypeInference,这是Flink中的一个模块,用于进行类型推断和类型推理。

可以看出我们在accumulatorTypeStrategy方法中传入了一个构建好的TypeStrategy;这里我们将LastValueWithRetractAccumulator定义为了一个STRUCTURED,不同的属性定义为具体的数据类型,DataTypes工具类提供了很多丰富的对象形式,还有万能的RAW类型。

public TypeInference getTypeInference(DataTypeFactory typeFactory) {

    return TypeInference.newBuilder()
            .accumulatorTypeStrategy(callContext -> {

                List<DataType> dataTypes = callContext.getArgumentDataTypes();

                DataType argDataType;
                if (dataTypes.get(0)
                        .getLogicalType()
                        .getTypeRoot()
                        .getFamilies()
                        .contains(LogicalTypeFamily.CHARACTER_STRING)) {
                    argDataType = DataTypes.STRING();
                } else
                    argDataType = DataTypeUtils.toInternalDataType(dataTypes.get(0));


                DataType accDataType = DataTypes.STRUCTURED(
                        LastValueWithRetractAccumulator.class,
                        DataTypes.FIELD("lastValue", argDataType.nullable()),
                        DataTypes.FIELD("lastOrder", DataTypes.BIGINT()),
                        DataTypes.FIELD("retractList", DataTypes.ARRAY(
                                DataTypes.STRUCTURED(
                                        Tuple2.class,
                                        DataTypes.FIELD("f0", argDataType.nullable()),
                                        DataTypes.FIELD("f1", DataTypes.BIGINT())
                                )).bridgedTo(List.class)),
                        DataTypes.FIELD(
                                "valueToOrderMap",
                                MapView.newMapViewDataType(
                                        argDataType.nullable(),
                                        DataTypes.ARRAY(DataTypes.BIGINT()).bridgedTo(List.class))),
                        //todo:blink 使用SortedMapView 优化性能,开源使用MapView key天然字典升序,倒序遍历性能可能不佳
                        DataTypes.FIELD(
                                "orderToValueMap",
                                MapView.newMapViewDataType(
                                        DataTypes.BIGINT(),
                                        DataTypes.ARRAY(argDataType.nullable()).bridgedTo(List.class)))
                );

                return Optional.of(accDataType);
            })
            .build()
            ;
}
  • 指定outputType

这个也很简单,直接上代码实现,主要就是outputTypeStrategy中传入需要输出的数据类型即可。

@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {

    return TypeInference.newBuilder()
            .outputTypeStrategy(callContext -> {

                List<DataType> dataTypes = callContext.getArgumentDataTypes();

                DataType argDataType;

                if (dataTypes.get(0)
                        .getLogicalType()
                        .getTypeRoot()
                        .getFamilies()
                        .contains(LogicalTypeFamily.CHARACTER_STRING)) {
                    argDataType = DataTypes.STRING();
                } else
                    argDataType = DataTypeUtils.toInternalDataType(dataTypes.get(0));

                return Optional.of(argDataType);
            })
            .build()
            ;
}
  • 指定intputType

在此就不做介绍了,同以上类似,在inputTypeStrategy方法传入定义好的TypeStrategy就好。

  • 根据inputType动态调整outType或者accumulatorType

在某些场景下,我们需要让函数功能性更强,比如当我输入是bigint类型的时候,我输出bigint类型等,类似的逻辑。

大家可以发现outputTypeStrategy或者 accumulatorTypeStrategy的入参都是 实现了 TypeStrategy接口的对象,并且需要实现inferType方法。在Flink框架调用该方法的时候会传入一个上下文对象CallContext,提供了获取函数入参类型的api getArgumentDataTypes();

代码实现:这里的逻辑是将获取到的第一个入参对象的类型指定为输出对象的类型。

.outputTypeStrategy(callContext -> {

    List<DataType> dataTypes = callContext.getArgumentDataTypes();

    DataType argDataType;

    if (dataTypes.get(0)
            .getLogicalType()
            .getTypeRoot()
            .getFamilies()
            .contains(LogicalTypeFamily.CHARACTER_STRING)) {
        argDataType = DataTypes.STRING();
    } else
        argDataType = DataTypeUtils.toInternalDataType(dataTypes.get(0));

    return Optional.of(argDataType);
}

自定义DataType

可以发现以上分享几乎都是使用的DataTypes封装好的类型,比如DataTypes.STRING()、DataTypes.Long()等。那如果我们需要封装一些其他对象如何操作呢?上文提到DataTypes提供了一个自定义任意类型的方法。

/**
 * Data type of an arbitrary serialized type. This type is a black box within the table
 * ecosystem and is only deserialized at the edges.
 *
 * <p>The raw type is an extension to the SQL standard.
 *
 * <p>This method assumes that a {@link TypeSerializer} instance is present. Use {@link
 * #RAW(Class)} for automatically generating a serializer.
 *
 * @param clazz originating value class
 * @param serializer type serializer
 * @see RawType
 */
public static <T> DataType RAW(Class<T> clazz, TypeSerializer<T> serializer) {
    return new AtomicDataType(new RawType<>(clazz, serializer));
}

我们有这样的一个场景,需要在自定义的函数中使用bitmap计算UV值,需要定义Roaring64Bitmap为accumulatorType,直接上代码实现。

这里的Roaring64BitmapTypeSerializer已经在《自定义TypeSerializer》小段中实现,有兴趣的同学可以往上翻翻。

public TypeInference getTypeInference(DataTypeFactory typeFactory) {

    return TypeInference.newBuilder()
            .accumulatorTypeStrategy(callContext -> {
                DataType type = DataTypes.RAW(
                        Roaring64Bitmap.class,
                        Roaring64BitmapTypeSerializer.INSTANCE
                );
                return Optional.of(type);
            })
            .outputTypeStrategy(callContext -> Optional.of(DataTypes.BIGINT()))
            .build()
            ;
}

四、结语

本文主要简单分享了一些自身对Flink类型及序列化的认识和应用实践,能力有限,不足之处欢迎指正。

引用: https://nightlies.apache.org/flink/flink-docs-release-1.13/

*文/ 木木

本文属得物技术原创,更多精彩文章请看:得物技术

未经得物技术许可严禁转载,否则依法追究法律责任!