阅读 772

flink - join操作&迟到数据处理

flink针对迟到数据的策略

  • 丢弃

  • 重新触发计算

    • 1、已经计算窗口是否有销毁?

    • 2、计算逻辑,来一条迟到数据计算一次?

    • 3、计算结果重复往下游发送问题如何解决?

  • 旁路输出

其中,重新计算最为复杂,涉及到窗口销毁以及结果重新发射至下游问题。重新触发计算核心的方法是allowedLateness(), 该方法会设置一个时间参数,代表迟到数据允许迟到的时间,避免窗口持续存在,增加资源的损耗。

window = 10

EVENTtimestampmaxTimestampWatermark(max - 6)
(1, 2)22-4
(1, 6)660
(1, 8)882
(1, 3)382
(1, 10)10104
(1, 15)15159Case 1: 9 >= 9 触发计算
(1, 7)7159Case 2: 迟到数据
(1, 9)9159case 3: 迟到数据
(1, 21)212115
(1, 5)52115Case4: 测试延迟时间是以什么标准判断,测试事件时间
(1, 25)252519
(1, 3)32519Case5: 测试水位线

Case 1:

image-20211121192639472

1、watermark >= window。getEnd - 1时触发计算

Case 2:

获取一条迟到数据后,窗口重新计算

image-20211121193253081

case 3:

image-20211121193414934

Case 4:

不是根据事件时间,因为第一个窗口[0, 10),目前最大事件时间是21,差值大于10,按时仍然可以接受迟到数据

image-20211121193834587

Case5:

当前水位线为19,超过第一个窗口结束时间9,差值为10,(1, 3)并未触发重新计算,表示延迟时间基于水位线判断

For example, with an event-time-based windowing strategy that creates non-overlapping (or tumbling) windows every 5 minutes and has an allowed lateness of 1 min, Flink will create a new window for the interval between 12:00 and 12:05 when the first element with a timestamp that falls into this interval arrives, and it will remove it when the watermark passes the 12:06 timestamp.

image-20211121194217171

综上,可以得出结论:

1、窗口计算操作触发标准是watermark >= windowEnd - 1;

2、窗口结束时间戳是第一个不属于该窗口的元素,flink窗口都是左闭右开的

3、延迟时间基于水位线进行判断

4、在延迟时间内窗口不会被销毁

join

  • 是否可以a窗口设置时间 + b窗口设置时间

  • 数据撤回问题 - 针对迟到数据处理逻辑

  • 是否可以多窗口join

    • 不能多窗口合并

Join

flink join操作用于多流合并且并不限制流的数据类型,与window配合使用,因为不管是无界流还是有界流,在数据量比较大的情况下,如果不采用window加以限制,使用join操作,时间和空间上的消耗都是巨大的。flink join操作类似于sql中的inner join,取多个数据流的交集。

1、基本语法
source1.join(source2)   .where(Keyselector)   .equalTo(keySelector)   .window()   .process()   .addsink(); 复制代码

2、适用场景
  • 支持Tumbling、Sliding、session窗口

  • 支持processing、event时间语义

  • DataStream以及其子类支持join()操作, WindowedStream无法进行join()操作,表示窗口内无法再进行流join操作

3、事件时间场景下join
  • 合并操作触发时机

  • 如何处理迟到数据,迟到数据处理结束时机

3.1、理论支撑

1、构建join操作图

stream.join(stream2)                 .where(tuple -> tuple.f0)                 .equalTo(tuple -> tuple.f0)                 .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))                 .allowedLateness(Time.milliseconds(30))                 .apply(new JoinWindowResult())                 .addSink(new CommonPrintSink<>("流join")); 复制代码

2、转换成coGroup操作,实现apply方法

public <T> DataStream<T> apply(                 JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {   // clean the closure   function = input1.getExecutionEnvironment().clean(function);   // 将join操作转化为coGroup操作   coGroupedWindowedStream =     input1.coGroup(input2)     .where(keySelector1)     .equalTo(keySelector2)     .window(windowAssigner)     .trigger(trigger)     .evictor(evictor)     .allowedLateness(allowedLateness);   return coGroupedWindowedStream.apply(new JoinCoGroupFunction<>(function), resultType); }         @Override   public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out)     throws Exception {     for (T1 val1 : first) {       for (T2 val2 : second) {       // 调用实现的join方法         out.collect(wrappedFunction.join(val1, val2));       }     }   }  复制代码

3、将coGroup转化成union操作

DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);             // we explicitly create the keyed stream to manually pass the key type information in             windowedStream =                     new KeyedStream<TaggedUnion<T1, T2>, KEY>(                                     unionStream, unionKeySelector, keyType)                             .window(windowAssigner);             if (trigger != null) {                 windowedStream.trigger(trigger);             }             if (evictor != null) {                 windowedStream.evictor(evictor);             }             if (allowedLateness != null) {                 windowedStream.allowedLateness(allowedLateness);             }             return windowedStream.apply(                     new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType); ==================================================================   public static class TaggedUnion<T1, T2> {         private final T1 one;         private final T2 two;            ===============================================================       private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window>             extends WrappingFunction<CoGroupFunction<T1, T2, T>>             implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> {         private static final long serialVersionUID = 1L;         public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) {             super(userFunction);         }         @Override         public void apply(KEY key, W window, Iterable<TaggedUnion<T1, T2>> values, Collector<T> out)                 throws Exception {             List<T1> oneValues = new ArrayList<>();             List<T2> twoValues = new ArrayList<>();             for (TaggedUnion<T1, T2> val : values) {                 if (val.isOne()) {                     oneValues.add(val.getOne());                 } else {                     twoValues.add(val.getTwo());                 }             }             wrappedFunction.coGroup(oneValues, twoValues, out);         }     } 复制代码

由源码可知,join操作本质上是一次union操作。首先将A,B流数据统一成TaggedUnion类型,然后执行union操作,得到DataStream流,后面就和普通的单流窗口操作一致了,包括对迟到数据的处理。唯一的区别是union后的流的watermark来自于上游两个数据源。

statusWaterValue.inputWatermark()

窗口大小10ms

stream 0stream 1channelStatus 0 (保证递增)channelStatus 1newMinWatermark(0,1流最小值)lastOutputWatermark
(1,4)
-2-1000-1000-1000

(1,6)-2-5-5-5
(1,7)
1-5-5-5

(1,15)1411
(1,15)
9444

(1,29)918999>=9(窗口最大时间戳)触发操作
3.2、基于事件时间的两流join

窗口大小 10ms, 默认watermark -100

A EventA timestampA maxTimestampA watermark(max - 6)watermarkB EVENTB timestampB maxTimestampB watermark(max - 11)watermark
(1,4)44-2
(1,3)33-8
(1,7)772
(1,6)66-5
(1,6)672
(1,12)12121
(1,15)15159(9, 1) -> 1 < 9未触发join操作(1,15)15154





(1,30)303019(9, 19) -> 9 >= 9触发join操作
coGroupedWindowedStream =                     input1.coGroup(input2)                             .where(keySelector1)                             .equalTo(keySelector2)                             .window(windowAssigner)                             .trigger(trigger)                             .evictor(evictor)                             .allowedLateness(allowedLateness);             return coGroupedWindowedStream.apply(new JoinCoGroupFunction<>(function), resultType);                           void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception; 复制代码

3.3、迟到数据处理
stream.join(stream2)                 .where(tuple -> tuple.f0)                 .equalTo(tuple -> tuple.f0)                 .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))                 .allowedLateness(Time.milliseconds(30))                 .apply(new JoinWindowResult())                 .addSink(new CommonPrintSink<>("流join")); 复制代码

窗口A: 10ms. 窗口B:10ms 延迟时间 30ms

A EventA timestampA maxTimestampA watermark(max - 6)
B EVENTB timestampB maxTimestampB watermark(max - 11)
(1,4)44-2
(1,3)33-8
(1,6)672
(1,12)12121
(1,15)15159触发窗口操作(9, 1) -> 1(1,22)222211(9,11) -> 9 触发窗口操作
(1,2)3159迟到数据1(1,7)72211迟到数据2
(1,45)454539(39,11) -> 11(1,1)12211迟到数据3





(1,50)505039(39,39)->39 第一个窗口迟到数据处理结束
(1,8)84539未触发迟到数据处理




窗口操作

image-20211125121050781

迟到数据1

image-20211125121019731

迟到数据2

image-20211125121136081

迟到数据3

![image-20211125121538870](/Users/lihui/Library/Application Support/typora-user-images/image-20211125121538870.png)

 伪原创工具 SEO网站优化  https://www.237it.com/ 

作者:LHQWERzzz
链接:https://juejin.cn/post/7035501060101767176


文章分类
代码人生
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐