掘金 后端 ( ) • 2024-03-04 17:14

@[TOC]

一、认识时间语义

1、官网

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/time/

2、event time与process time

event time:事件/数据真正产生的时间,这个时间一旦产生了,肯定就不变了。 (进Flink之前就已经存在了)

process time:事件/数据处理的世界,这个时间是不确定的,跟处理服务器的时间相关。 (比如说每小时处理一次,这一小时内的数据都会在整点处理) (优点:性能好,低延迟;缺点:结果是不确定的) (比如说,10::59:20产生的数据,正常是在10-11点的窗口内,但是由于网络抖动、服务器等原因在11点之后执行) 在这里插入图片描述 所以,如果不关注准确度的话,就可以使用process time处理,因为性能更高。如果准确度要求很高,就使用event time处理,但是性能会降低。

3、Windows:窗口

窗口是处理无限流的核心。窗口将流分成有限大小的“桶”,我们可以在这些桶上进行计算。

官网: https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/overview/#window https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/

我们都知道Spark是一个批处理引擎,但是也能做流式处理。 就像Flink是一个流处理引擎也能做批处理一样,使用Windows窗口就可以实现批处理。

二、Window详解

1、Window的分类

(1)按照是否是KeyBy划分:Keyed Windows

// […]是可选的
stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

(2)按照是否是KeyBy划分:Non-Keyed Windows

// […]是可选的
stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

(3)按照时间或者数量划分

按照时间划分:根据时间对数据流进行切片,比如每隔30S划分一个窗口。 在这里插入图片描述

按照数量划分:按照元素个数对数据流进行切片,比如3个元素划分一个窗口。 在这里插入图片描述

(4)按照Window Assigners:窗口分配器划分

窗口分配器(WindowAssigner)定义如何将元素分配给窗口。 这是通过在窗口中指定选择的WindowAssigner:window(...) (对keyed streams) 或者 windowAll() (对non-keyed streams) 调用。

WindowAssigner负责将每个传入元素分配给一个或多个窗口。

Flink为最常见的用例提供了预定义的窗口分配器,即滚动窗口、滑动窗口、会话窗口和全局窗口(tumbling windows, sliding windows, session windows, global windows)。您还可以通过扩展WindowAssigner类来实现自定义窗口分配器。所有内置窗口分配器(除了全局窗口)都根据时间将元素分配给窗口,时间可以是event time或process time。

基于时间的窗口有一个开始时间戳(含)和一个结束时间戳(不含),它们共同描述了窗口的大小。在代码中,Flink在处理基于时间的窗口时使用TimeWindow,该窗口有查询开始和结束时间戳的方法,还有一个返回给定窗口允许的最大时间戳的附加方法maxTimestamp()。

2、Tumbling Windows:滚动窗口

将每个元素分配给指定窗口大小的窗口。滚动窗口具有固定的大小并且不重叠。例如,如果您指定一个大小为5分钟的滚动窗口,将评估当前窗口,并且每五分钟启动一个新窗口。 在这里插入图片描述

DataStream<T> input = ...;

// 根据event-time
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// 根据processing-time
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// 每天 根据 event-time ,并且指定时间偏移量
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

可选的根据毫秒、秒、分钟等等:Time.milliseconds(x), Time.seconds(x), Time.minutes(x)

如最后一个示例所示,翻转窗口分配器还带有一个可选的偏移参数,可用于更改窗口的对齐方式。例如,如果没有偏移,每小时滚动窗口将与epoch对齐,也就是说,您将获得1:00:00.000 - 1:59:59.999、2:00:00.000 - 2:59:59.999等窗口。如果你想改变,你可以给出一个偏移量。例如,如果偏移量为15分钟,您将得到1:15:00.000 - 2:14:59.999、2:15:00.000 - 3:14:59.999等。偏移量的一个重要用例是根据UTC-0以外的时区调整窗口。例如,在中国,您必须指定Time.hours的偏移量(-8)。

3、Sliding Windows:滑动窗口

将元素分配给固定长度的窗口。与滚动窗口分配器类似,窗口的大小由窗口大小参数配置。一个附加的窗口滑动参数控制滑动窗口的启动频率。因此,如果幻灯片小于窗口大小,滑动窗口可能会重叠。在这种情况下,元素被分配给多个窗口。

可以说,滚动窗口是滑动窗口的特例,只不过滚动窗口大小和滑动大小是一样的。

例如,可以将大小为10分钟的窗口滑动5分钟。这样,每5分钟就会看到一个窗口,其中包含最近10分钟内到达的事件,如下图所示。 在这里插入图片描述

DataStream<T> input = ...;

// 滑动的 event-time windows
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// 滑动的processing-time windows
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// 滑动的 processing-time windows 偏移 -8 hours
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

可选的根据毫秒、秒、分钟等等:Time.milliseconds(x), Time.seconds(x), Time.minutes(x)

如最后一个示例所示,滑动窗口分配器还带有一个可选的偏移参数,可用于更改窗口的对齐方式。例如,如果没有偏移,每小时滑动30分钟的窗口将与纪元对齐,也就是说,您将获得1:00:00.000 - 1:59:59.999、1:30:00.000 - 2:29:59.999等窗口。如果你想改变,你可以给出一个偏移量。例如,如果偏移量为15分钟,您将得到1:15:00.000 - 2:14:59.999、1:45:00.000 - 2:44:59.999等。偏移量的一个重要用例是根据UTC-0以外的时区调整窗口。例如,在中国,您必须指定Time.hours的偏移量(-8)。

4、Session Windows:会话窗口(用的少)

按活动会话对元素进行分组。与滚动窗口和滑动窗口相比,会话窗口不重叠,也没有固定的开始和结束时间。相反,当会话窗口在一定时间内没有接收到元素时,即当出现不活动间隙时,会话窗口关闭。会话窗口分配器可以配置有静态会话间隙或会话间隙提取器功能,该功能定义多长时间不活动。这段时间到期后,当前会话关闭,后续元素被分配给新的会话窗口。

在这里插入图片描述

DataStream<T> input = ...;

// event-time session windows with 静态 gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// event-time session windows with 动态 gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .<windowed transformation>(<window function>);

// processing-time session windows with 静态  gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// processing-time session windows with 动态 gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .<windowed transformation>(<window function>);

可选的根据毫秒、秒、分钟等等:Time.milliseconds(x), Time.seconds(x), Time.minutes(x)

动态间隙是通过实现SessionWindowTimeGapExtractor接口来指定的。

由于会话窗口没有固定的开始和结束时间,因此对它们的评估不同于滚动和滑动窗口。在内部,会话窗口操作员为每个到达的记录创建一个新窗口,如果窗口之间的距离小于定义的间隙,则将窗口合并在一起。为了能够合并,会话窗口运算符需要一个合并触发器和一个合并窗口函数,如ReduceFunction、AggregateFunction或ProcessWindowFunction

5、Global Windows:全局窗口(用的少)

具有相同关键字的所有元素分配给同一个全局窗口。仅当您还指定了自定义触发器时,此窗口模式才有用。否则,将不会执行任何计算,因为全局窗口没有我们可以处理聚合元素的自然终点。 在这里插入图片描述

DataStream<T> input = ...;

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>);

6、CountWindow:按元素个数划分窗口

CountWindow是最简单的窗口,指定元素个数,每几个元素划分为一个窗口。

7、代码实例

我们可以看出,是否使用keyBy的效果是不一样的。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.Random;

public class WindowApp {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 添加数据源
        SingleOutputStreamOperator<Tuple2<String, Integer>> sources = env.addSource(new SourceFunction<String>() {
            volatile boolean isRunning = true;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {

                Random random = new Random();
                String[] datas = {"test1,1", "test2,1", "test3,1"};
                while (isRunning) {
                    // 每500毫秒造一条数据
                    ctx.collect(datas[random.nextInt(datas.length)]);
                    Thread.sleep(500);
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        }).map(x -> {
            String[] splits = x.split(",");
            return Tuple2.of(splits[0].trim(), Integer.parseInt(splits[1].trim()));
        }).returns(Types.TUPLE(Types.STRING, Types.INT));


//        countWindow(sources);
//        tumblingWindow(sources);
        slidingWindow(sources);
//        sessionWindow(sources);

        env.execute();
    }

    public static void sessionWindow(StreamExecutionEnvironment env) {
        SingleOutputStreamOperator<Integer> source = env.socketTextStream("localhost", 9527)
                .map(x -> Integer.parseInt(x.trim()));

        source.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
                .sum(0) // WindowFunction
                .print();

    }

    public static void slidingWindow(SingleOutputStreamOperator<Tuple2<String, Integer>> sources) {

        /**
         * 窗口大小是10s,滑动大小是5s
         * 0      5     10     15
         * 1:0-5
         * 2:0-10
         * 3:5-15
         *
         */
        sources.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .sum(0)
                .print();

    }

    public static void tumblingWindow(SingleOutputStreamOperator<Tuple2<String, Integer>> sources) {

//        SingleOutputStreamOperator<Integer> source = env.socketTextStream("localhost", 9527)
//                .map(x -> Integer.parseInt(x.trim()));
//
        // 不带key的
//        source.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 5秒一个窗口
//        .sum(0)
//                .print();


        // 带key的
        sources.keyBy(x -> x.f0) // 先分组
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 5秒一个窗口
                .sum(1) // WindowFunction
                .print();
    }


    /**
     * 测试countWindow
     */
    public static void countWindow(SingleOutputStreamOperator<Tuple2<String, Integer>> sources) {

//        SingleOutputStreamOperator<Integer> source = env.socketTextStream("localhost", 9527)
//                .map(x -> Integer.parseInt(x.trim()));
//
            // 输入 1 2 3 4 5,凑齐5个会求和并打印
//
//        // countWindowAll的并行度是多少? 思路:打开UI
//        source.countWindowAll(5)
//                .sum(0) // WindowFunction
//                .print();




        sources.keyBy(x -> x.f0) // 先分组
            .countWindow(5) // 5个元素一个窗口
        .sum(1) // windowFunction
                .print();
        /**
         * 执行结果:
         * 13> (test2,5)
         * 8> (test3,5)
         * 7> (test1,5)
         * 13> (test2,5)
         * 13> (test2,5)
         * 8> (test3,5)
         * 7> (test1,5)
         */

        /**
         * 注意事项:
         * 对于non-key,只要满足元素个数就会触发作业执行
         * 对于key,每个组达到一定的元素个数才会触发作业执行
         *
         * tumbling count windows
         */
    }
}

三、WindowFunction

1、概述

WindowFunction对窗口中数据的计算。WindowFunction可以是ReduceFunction、AggregateFunction或ProcessWindowFunction之一。 在这里插入图片描述

2、ReduceFunction

ReduceFunction指定如何组合输入中的两个元素以产生相同类型的输出元素。 Flink使用ReduceFunction来增量聚合窗口的元素。

DataStream<Tuple2<String, Long>> input = ...;

// 对窗口中所有元素的元组的第二个字段进行求和。
input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce(new ReduceFunction<Tuple2<String, Long>>() {
      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
      }
    });

3、AggregateFunction

AggregateFunction是ReduceFunction的一般化版本,它有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型是输入流中元素的类型,AggregateFunction有一个向累加器添加一个输入元素的方法。该接口还具有创建初始累加器、将两个累加器合并为一个累加器以及从累加器中提取输出(类型为OUT)的方法。

与ReduceFunction一样,Flink将在窗口的输入元素到达时对其进行增量聚合。

/**
 * 累加器用于保存累计和计数。{@code getResult}方法
 * 计算窗口中元素的第二个字段的平均值。
 */
private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate());

4、ProcessWindowFunction

ProcessWindowFunction获得一个包含窗口所有元素的Iterable和一个可以访问时间和状态信息的Context对象,这使它比其他窗口函数具有更大的灵活性。这是以性能和资源消耗为代价的,因为元素不能增量聚合,而是需要在内部缓冲,直到窗口被认为准备好进行处理。

DataStream<Tuple2<String, Long>> input = ...;

input
  .keyBy(t -> t.f0)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new MyProcessWindowFunction());

/* ... */
// 对窗口中的元素进行计数的ProcessWindowFunction。此外,window函数将有关窗口的信息添加到输出中。
public class MyProcessWindowFunction 
    extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {

  @Override
  public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
    long count = 0;
    for (Tuple2<String, Long> in: input) {
      count++;
    }
    out.collect("Window: " + context.window() + "count: " + count);
  }
}

5、ProcessWindowFunction +增量聚合

ProcessWindowFunction可以与ReduceFunction或AggregateFunction结合使用,以便在元素到达窗口时对其进行增量聚合。当窗口关闭时,将向ProcessWindowFunction提供聚合结果。这允许它在访问ProcessWindowFunction的附加窗口元信息的同时递增地计算窗口。

还可以使用传统的WindowFunction代替ProcessWindowFunction进行增量窗口聚合。

(1)与ReduceFunction 聚合

//将incremental ReduceFunction与ProcessWindowFunction结合使用,以返回窗口中的最小事件以及窗口的开始时间。
DataStream<SensorReading> input = ...;

input
  .keyBy(<key selector>)
  .window(<window assigner>)
  .reduce(new MyReduceFunction(), new MyProcessWindowFunction());

// Function definitions

private static class MyReduceFunction implements ReduceFunction<SensorReading> {

  public SensorReading reduce(SensorReading r1, SensorReading r2) {
      return r1.value() > r2.value() ? r2 : r1;
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {

  public void process(String key,
                    Context context,
                    Iterable<SensorReading> minReadings,
                    Collector<Tuple2<Long, SensorReading>> out) {
      SensorReading min = minReadings.iterator().next();
      out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
  }
}

(2)与AggregateFunction 聚合

// 将增量AggregateFunction与ProcessWindowFunction结合起来计算平均值,并同时发出键和窗口以及平均值。
DataStream<Tuple2<String, Long>> input = ...;

input
  .keyBy(<key selector>)
  .window(<window assigner>)
  .aggregate(new AverageAggregate(), new MyProcessWindowFunction());

// Function definitions

/**
 * The accumulator is used to keep a running sum and a count. The {@code getResult} method
 * computes the average.
 */
private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {

  public void process(String key,
                    Context context,
                    Iterable<Double> averages,
                    Collector<Tuple2<String, Double>> out) {
      Double average = averages.iterator().next();
      out.collect(new Tuple2<>(key, average));
  }
}

6、代码实例

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.util.*;

public class WindowFunctionApp {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 添加数据源
        SingleOutputStreamOperator<Tuple2<String, Integer>> source = env.addSource(new SourceFunction<String>() {
            volatile boolean isRunning = true;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {

                Random random = new Random();
                String[] datas = {"test1,1", "test2,1", "test3,1"};
                while (isRunning) {
                    // 每500毫秒造一条数据
                    ctx.collect(datas[random.nextInt(datas.length)]);
                    Thread.sleep(500);
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        }).map(x -> {
            String[] splits = x.split(",");
            return Tuple2.of(splits[0].trim(), Integer.parseInt(splits[1].trim()));
        }).returns(Types.TUPLE(Types.STRING, Types.INT));

        // 调用
        test04(source);

        env.execute();
    }

    public static void test01(SingleOutputStreamOperator<Tuple2<String, Integer>> source) {

        source
                .keyBy(x -> x.f0) // 根据key :keyBy
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 5秒一个窗口
                //.sum(1)// 下面的逻辑,其实就是.sum(1) 根据第[1]个参数进行求和
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> x, Tuple2<String, Integer> y) throws Exception {
                        System.out.println("执行reduce操作:" + x + " , " + y);
                        return Tuple2.of(x.f0, x.f1 + y.f1); // 根据key,求和
                    }
                })
                .print();
        /**
         * 执行结果:
         * 执行reduce操作:(test2,1) , (test2,1)
         * 执行reduce操作:(test3,1) , (test3,1)
         * 执行reduce操作:(test3,2) , (test3,1)
         * 执行reduce操作:(test1,1) , (test1,1)
         * 执行reduce操作:(test2,2) , (test2,1)
         * 执行reduce操作:(test3,3) , (test3,1)
         * 执行reduce操作:(test2,3) , (test2,1)
         * 13> (test2,4)
         * 8> (test3,4)
         * 7> (test1,2)
         */
    }


    /**
     * 求平均数
     * a,100
     * a,2
     * <p>
     * 51.0
     */
    public static void test02(SingleOutputStreamOperator<Tuple2<String, Integer>> source) {

        source.keyBy(x -> x.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                // 第一个类型是入参类型,第二个类型是累加器参数类型,第三个类型是结果类型
                .aggregate(new AggregateFunction<Tuple2<String, Integer>, Tuple2<Integer, Integer>, Double>() {

                    /**
                     * 初始累加器状态,0,0
                     */
                    @Override
                    public Tuple2<Integer, Integer> createAccumulator() {
                        return Tuple2.of(0, 0);
                    }

                    /**
                     * 把当前进来的元素添加到累加器中,并返回一个全新的累加器
                     * @param value  当前进来的元素
                     * @param accumulator  累加器
                     * @return 新的累加器
                     */
                    @Override
                    public Tuple2<Integer, Integer> add(Tuple2<String, Integer> value, Tuple2<Integer, Integer> accumulator) {

                        System.out.println("... add invoked ... " + value.f0 + "===>" + value.f1);

                        // (累加器的第一个值 + 新元素第二个值, 次数+1)
                        return Tuple2.of(accumulator.f0 + value.f1, accumulator.f1 + 1);
                    }

                    /**
                     * 获取结果
                     * 获取结果,需要取累加器的第一个值(和)/累加器第二个值(个数)
                     */
                    @Override
                    public Double getResult(Tuple2<Integer, Integer> accumulator) {
                        return Double.valueOf(accumulator.f0) / accumulator.f1;
                    }

                    /**
                     * 合并两个累加器,返回具有合并状态的累加器。 此函数可以重用任何给定的累加器作为合并的目标并返回该目标。
                     * 假设给定的累加器在传递给此函数后将不再使用。
                     * 参数: a -要合并的累加器 b -另一个累加器合并
                     * 返回: 具有合并状态的累加器
                     */
                    @Override
                    public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
                        return null;
                    }
                })
                .print();

        /**
         * 需求:求平均数
         *
         * 平均数 = 总和 / 个数
         *
         * 那么为了求出平均数,我们必然是需要先算出 value的和 以及 次数
         *
         * AggregateFunction<T, ACC, R>
         */
    }


    /**
     * 使用ProcessWindowFunction完成窗口内数据的排序,并输出
     * <p>
     * 计数窗口   5条数据一个窗口
     * <p>
     * 全量
     * <p>
     * 一种是apply  一种process
     */
    public static void test03(SingleOutputStreamOperator<Tuple2<String, Integer>> source) {
        /**
         * 简单的process
         * 三个类型,第一个为key类型,第二个为输入类型,第三个为输出集合
         */
//        source
//                .keyBy(x -> x.f0)
//                .process(new KeyedProcessFunction<String, Tuple2<String, Integer>, Object>() {
//                    // 第一个参数为输入参数,第二个为Context上下文,第三个参数为输出集合
//                    @Override
//                    public void processElement(Tuple2<String, Integer> value, KeyedProcessFunction<String, Tuple2<String, Integer>, Object>.Context ctx, Collector<Object> out) throws Exception {
//                        System.out.println("处理数据:" + value.f0);
//                        out.collect(value.f1);
//                    }
//                }).print().setParallelism(1);

        /**
         * window 的process
         * 三个类型,第一个为输入类型,第二个为输出类型,第三个为key类型,第四个为Window
         */
//        source
//                .keyBy(x -> x.f0)
//                .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 5秒一个窗口
//                .process(new ProcessWindowFunction<Tuple2<String, Integer>, Object, String, TimeWindow>() {
//                    // 第一个参数是key,第二个参数是Context,第三个参数是窗口内元素集合,第四个参数是收集器
//                    @Override
//                    public void process(String s, ProcessWindowFunction<Tuple2<String, Integer>, Object, String, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Object> out) throws Exception {
//                        List<Integer> list = new ArrayList<>();
//                        for (Tuple2<String, Integer> value : elements) {
//                            list.add(value.f1);
//                        }
//
//                        list.sort(new Comparator<Integer>() {
//                            @Override
//                            public int compare(Integer o1, Integer o2) {
//                                return o2 - o1;
//                            }
//                        });
//
//                        for (Integer res : list) {
//                            out.collect(res);
//                        }
//                    }
//                })
//                .print().setParallelism(1);


        /**
         * WindowAll
         * apply 第一个类型输入类型,第二个类型输出类型,第三个类型是全局Window
         */
        source
                .countWindowAll(5) // 每5个元素为一个窗口
                .apply(new AllWindowFunction<Tuple2<String, Integer>, Integer, GlobalWindow>() {
                    // 第一个参数是Window,第二个参数是窗口内元素集合,第三个参数是收集器
                    @Override
                    public void apply(GlobalWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception {
                        List<Integer> list = new ArrayList<>();
                        for (Tuple2<String, Integer> value : values) {
                            list.add(value.f1);
                        }

                        list.sort(new Comparator<Integer>() {
                            @Override
                            public int compare(Integer o1, Integer o2) {
                                return o2 - o1;
                            }
                        });

                        for (Integer res : list) {
                            out.collect(res);
                        }
                    }

                }).print().setParallelism(1);
    }


    /**
     * 全量 配合 增量一起使用
     * <p>
     * 需求:求UV(使用增量函数去求)  输出一个统计信息(全量输出)
     */
    public static void test04(SingleOutputStreamOperator<Tuple2<String, Integer>> source) {

        source.print("----原始数据---");
        source.keyBy(x -> true) // 所有的key
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 5秒
                .aggregate(
                        // 自定义增量聚合函数
                        new AggregateFunction<Tuple2<String, Integer>, HashSet<String>, Long>() {
                            // 初始化累加器
                            @Override
                            public HashSet<String> createAccumulator() {
                                return new HashSet<String>();
                            }

                            @Override
                            public HashSet<String> add(Tuple2<String, Integer> value, HashSet<String> accumulator) {

                                System.out.println("---add invoked....---");
                                accumulator.add(value.f0);
                                return accumulator;
                            }

                            // 计算数量
                            @Override
                            public Long getResult(HashSet<String> accumulator) {
                                return (long) accumulator.size();
                            }

                            @Override
                            public HashSet<String> merge(HashSet<String> a, HashSet<String> b) {
                                return null;
                            }
                        },

                        // 自定义窗口处理函数,输出最终信息
                        new ProcessWindowFunction<Long, String, Boolean, TimeWindow>() {
                            @Override
                            public void process(Boolean aBoolean, Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
                                long start = context.window().getStart();
                                long end = context.window().getEnd();

                                StringBuilder res = new StringBuilder();
                                res.append("窗口:【")
                                        .append(new Timestamp(start))
                                        .append(",")
                                        .append(new Timestamp(end))
                                        .append("】,UV是:")
                                        .append(elements.iterator().next());

                                out.collect(res.toString());
                            }
                        }
                ).print();
        /**
         * 结果
         *
         */
    }
}

四、认识Watermark

1、为什么要用Watermark

(1)数据延迟问题

举个例子,比如说我们窗口的大小是10分钟,如果使用process time(执行时间)来处理的话,正常是这样的: 在这里插入图片描述

假如说,10:09的那一条数据,因为网络抖动或者服务器的原因,晚执行了一分钟,此时处理窗口就会变成这样: 在这里插入图片描述 数据就会不准确。

(2)数据乱序问题

同理,比如说我们发送的数据顺序为1 2 3 4 5 6 7 ...,如果使用process time(执行时间)来处理的话,因为网络抖动或者服务器的原因造成了数据乱序,Flink接收到的数据是1 2 3 5 4 6 7 ...,就会造成数据乱序问题。

因此我们需要使用event time 数据产生的时间来进行数据处理,这就需要使用到Watermark(水印),这是一种衡量event进展的机制。

2、官网

https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/event-time/generating_watermarks/

关于event time 与process time:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/time/

3、认识Watermark

使用WaterMark解决乱序与时间问题,其实是event time + window + watermark共同完成的,只要watermark>=window的end时间就会触发前面的窗口执行。

同时,需要在数据中指定一个event time。

Flink底层会周期性插入一种时间戳,这就是单调递增的向前推进时间watermark,watermark可以从数据源头产生的时候就带上,也可以在operator中带上。

4、使用Watermark(代码实例)

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Date;

public class WMApp01 {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1); // 单并行度

        // 添加数据源   时间,数据
        SingleOutputStreamOperator<Tuple2<Long, String>> source = env.addSource(new SourceFunction<String>() {
            volatile boolean isRunning = true;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                while (isRunning) {
                    Thread.sleep(5000);
                    ctx.collect( "1000,test1");
                    Thread.sleep(100);
                    ctx.collect( "2000,test2");
                    Thread.sleep(100);
                    ctx.collect( "3000,test3");
                    Thread.sleep(100);
                    ctx.collect( "5000,test5");
                    Thread.sleep(100);
                    ctx.collect( "4000,test4");
                    Thread.sleep(100);
                    ctx.collect( "6000,test6");
                    Thread.sleep(100);
                    ctx.collect( "7000,test7");
                    Thread.sleep(100);
                    ctx.collect( "10000,test10");
                    Thread.sleep(100);
                    ctx.collect( "9000,test9");
                    Thread.sleep(100);
                    ctx.collect( "8000,test8");
                    Thread.sleep(100);
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        }).map(x -> {
            String[] splits = x.split(",");
            return Tuple2.of(Long.parseLong(splits[0].trim()), splits[1].trim());
        }).returns(Types.TUPLE(Types.LONG, Types.STRING));



        /**
         * Watermark的策略:
         * WatermarkStrategy.noWatermarks(); // 不生成Watermark
         * WatermarkStrategy.forMonotonousTimestamps(); // 创建一个Watermark,是一个严格递增的
         * WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(0)); //为记录乱序的情况创建Watermark策略
         */

        WatermarkStrategy<Tuple2<Long, String>> watermarkStrategy = WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofMillis(0))
                // 需要指定数据的event time。event就是原始数据,需要指定类型,不指定类型是Object
                .withTimestampAssigner((event, timestamp) -> event.f0);

        // 往source添加Watermark
        source.assignTimestampsAndWatermarks(watermarkStrategy)
                .process(new ProcessFunction<Tuple2<Long, String>, Tuple2<Long, String>>() {
                    @Override
                    public void processElement(Tuple2<Long, String> value, Context ctx, Collector<Tuple2<Long, String>> out) throws Exception {

                        long watermark = ctx.timerService().currentWatermark();
                        System.out.println("该数据是:" + value + " , WM是:" + watermark);

                        out.collect(value);
                    }
                }).setParallelism(1).print();


        /**
         * 结果:WM不会后退,只会前进
         * 该数据是:(1000,test1) , WM是:-9223372036854775808
         * (1000,test1)
         * 该数据是:(2000,test2) , WM是:-9223372036854775808
         * (2000,test2)
         * 该数据是:(3000,test3) , WM是:1999
         * (3000,test3)
         * 该数据是:(5000,test5) , WM是:1999
         * (5000,test5)
         * 该数据是:(4000,test4) , WM是:4999
         * (4000,test4)
         * 该数据是:(6000,test6) , WM是:4999
         * (6000,test6)
         * 该数据是:(7000,test7) , WM是:5999
         * (7000,test7)
         * 该数据是:(10000,test10) , WM是:6999
         * (10000,test10)
         * 该数据是:(9000,test9) , WM是:6999
         * (9000,test9)
         * 该数据是:(8000,test8) , WM是:9999
         * (8000,test8)
         */

        env.execute("WMApp01");
    }


}

5、基于窗口解决数据延时问题

在这里插入图片描述 在这里插入图片描述

在这里插入图片描述

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

import java.time.Duration;

/**
 * 数据延迟/乱序 三种
 * 小:容忍度
 * 中:allowedLateness
 * 大:sideOutputLateData
 *
 * 一起使用
 */
public class WMWindowApp {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        OutputTag<Tuple2<String, Integer>> outputTag = new OutputTag<Tuple2<String, Integer>>("late-data"){};


        // 事件时间,domain,traffic  开窗口  groupby  求窗口内每个domain出现的次数
        DataStreamSource<String> source = env.socketTextStream("hadoop000", 9527);

        WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                .withTimestampAssigner((event, timestamp) -> Long.parseLong(event.split(",")[0].trim()));

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = source.assignTimestampsAndWatermarks(watermarkStrategy)
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        String[] splits = value.split(",");
                        return Tuple2.of(splits[1].trim(), Integer.parseInt(splits[2].trim()));
                    }
                }).keyBy(x -> x.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sideOutputLateData(outputTag)
                .sum(1);

        DataStream<Tuple2<String, Integer>> sideOutput = result.getSideOutput(outputTag);
        sideOutput.print("-----side output------");

        result.print();


        /**
         * 滑动窗口大小是6秒,每隔2秒滑动一次
         *
         * [0,2)
         * [0,4)
         * [0,6)
         */

        /**
         * [window_start, window_end)
         * [0000,5000)
         *
         * Watermark >= window_end 就会触发前面的执行
         * 4999 >= 4999
         *
         * [5000,10000)
         * 11999 >= 9999
         */


        env.execute("WMApp01");
    }

}

6、总结

通过设置WatermarkStrategy.<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofMillis(0))时间容忍度,可以设置小容忍,容忍度之外的数据会丢失。

通过设置.allowedLateness(Time.seconds(10))可设置中度容忍,容忍度之内的数据会后续再参与计算。

通过设置sideOutputLateData来设置最大容忍度,可以单独拿出来延迟数据,可以进行单独处理。