flink - join操作&迟到数据处理
flink针对迟到数据的策略
丢弃
重新触发计算
1、已经计算窗口是否有销毁?
2、计算逻辑,来一条迟到数据计算一次?
3、计算结果重复往下游发送问题如何解决?
旁路输出
其中,重新计算最为复杂,涉及到窗口销毁以及结果重新发射至下游问题。重新触发计算核心的方法是allowedLateness(), 该方法会设置一个时间参数,代表迟到数据允许迟到的时间,避免窗口持续存在,增加资源的损耗。
window = 10
EVENT | timestamp | maxTimestamp | Watermark(max - 6) | |
---|---|---|---|---|
(1, 2) | 2 | 2 | -4 | |
(1, 6) | 6 | 6 | 0 | |
(1, 8) | 8 | 8 | 2 | |
(1, 3) | 3 | 8 | 2 | |
(1, 10) | 10 | 10 | 4 | |
(1, 15) | 15 | 15 | 9 | Case 1: 9 >= 9 触发计算 |
(1, 7) | 7 | 15 | 9 | Case 2: 迟到数据 |
(1, 9) | 9 | 15 | 9 | case 3: 迟到数据 |
(1, 21) | 21 | 21 | 15 | |
(1, 5) | 5 | 21 | 15 | Case4: 测试延迟时间是以什么标准判断,测试事件时间 |
(1, 25) | 25 | 25 | 19 | |
(1, 3) | 3 | 25 | 19 | Case5: 测试水位线 |
Case 1:
1、watermark >= window。getEnd - 1时触发计算
Case 2:
获取一条迟到数据后,窗口重新计算
case 3:
Case 4:
不是根据事件时间,因为第一个窗口[0, 10),目前最大事件时间是21,差值大于10,按时仍然可以接受迟到数据
Case5:
当前水位线为19,超过第一个窗口结束时间9,差值为10,(1, 3)并未触发重新计算,表示延迟时间基于水位线判断
For example, with an event-time-based windowing strategy that creates non-overlapping (or tumbling) windows every 5 minutes and has an allowed lateness of 1 min, Flink will create a new window for the interval between
12:00
and12:05
when the first element with a timestamp that falls into this interval arrives, and it will remove it when the watermark passes the12:06
timestamp.
综上,可以得出结论:
1、窗口计算操作触发标准是watermark >= windowEnd - 1;
2、窗口结束时间戳是第一个不属于该窗口的元素,flink窗口都是左闭右开的
3、延迟时间基于水位线进行判断
4、在延迟时间内窗口不会被销毁
join
是否可以a窗口设置时间 + b窗口设置时间
数据撤回问题 - 针对迟到数据处理逻辑
是否可以多窗口join
不能多窗口合并
Join
flink join操作用于多流合并且并不限制流的数据类型,与window配合使用,因为不管是无界流还是有界流,在数据量比较大的情况下,如果不采用window加以限制,使用join操作,时间和空间上的消耗都是巨大的。flink join操作类似于sql中的inner join,取多个数据流的交集。
1、基本语法
source1.join(source2) .where(Keyselector) .equalTo(keySelector) .window() .process() .addsink(); 复制代码
2、适用场景
支持Tumbling、Sliding、session窗口
支持processing、event时间语义
DataStream以及其子类支持join()操作, WindowedStream无法进行join()操作,表示窗口内无法再进行流join操作
3、事件时间场景下join
合并操作触发时机
如何处理迟到数据,迟到数据处理结束时机
3.1、理论支撑
1、构建join操作图
stream.join(stream2) .where(tuple -> tuple.f0) .equalTo(tuple -> tuple.f0) .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) .allowedLateness(Time.milliseconds(30)) .apply(new JoinWindowResult()) .addSink(new CommonPrintSink<>("流join")); 复制代码
2、转换成coGroup操作,实现apply方法
public <T> DataStream<T> apply( JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) { // clean the closure function = input1.getExecutionEnvironment().clean(function); // 将join操作转化为coGroup操作 coGroupedWindowedStream = input1.coGroup(input2) .where(keySelector1) .equalTo(keySelector2) .window(windowAssigner) .trigger(trigger) .evictor(evictor) .allowedLateness(allowedLateness); return coGroupedWindowedStream.apply(new JoinCoGroupFunction<>(function), resultType); } @Override public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception { for (T1 val1 : first) { for (T2 val2 : second) { // 调用实现的join方法 out.collect(wrappedFunction.join(val1, val2)); } } } 复制代码
3、将coGroup转化成union操作
DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2); // we explicitly create the keyed stream to manually pass the key type information in windowedStream = new KeyedStream<TaggedUnion<T1, T2>, KEY>( unionStream, unionKeySelector, keyType) .window(windowAssigner); if (trigger != null) { windowedStream.trigger(trigger); } if (evictor != null) { windowedStream.evictor(evictor); } if (allowedLateness != null) { windowedStream.allowedLateness(allowedLateness); } return windowedStream.apply( new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType); ================================================================== public static class TaggedUnion<T1, T2> { private final T1 one; private final T2 two; =============================================================== private static class CoGroupWindowFunction<T1, T2, T, KEY, W extends Window> extends WrappingFunction<CoGroupFunction<T1, T2, T>> implements WindowFunction<TaggedUnion<T1, T2>, T, KEY, W> { private static final long serialVersionUID = 1L; public CoGroupWindowFunction(CoGroupFunction<T1, T2, T> userFunction) { super(userFunction); } @Override public void apply(KEY key, W window, Iterable<TaggedUnion<T1, T2>> values, Collector<T> out) throws Exception { List<T1> oneValues = new ArrayList<>(); List<T2> twoValues = new ArrayList<>(); for (TaggedUnion<T1, T2> val : values) { if (val.isOne()) { oneValues.add(val.getOne()); } else { twoValues.add(val.getTwo()); } } wrappedFunction.coGroup(oneValues, twoValues, out); } } 复制代码
由源码可知,join操作本质上是一次union操作。首先将A,B流数据统一成TaggedUnion类型,然后执行union操作,得到DataStream流,后面就和普通的单流窗口操作一致了,包括对迟到数据的处理。唯一的区别是union后的流的watermark来自于上游两个数据源。
statusWaterValue.inputWatermark()
窗口大小10ms
stream 0 | stream 1 | channelStatus 0 (保证递增) | channelStatus 1 | newMinWatermark(0,1流最小值) | lastOutputWatermark | |
---|---|---|---|---|---|---|
(1,4) | -2 | -1000 | -1000 | -1000 | ||
(1,6) | -2 | -5 | -5 | -5 | ||
(1,7) | 1 | -5 | -5 | -5 | ||
(1,15) | 1 | 4 | 1 | 1 | ||
(1,15) | 9 | 4 | 4 | 4 | ||
(1,29) | 9 | 18 | 9 | 9 | 9>=9(窗口最大时间戳)触发操作 |
3.2、基于事件时间的两流join
窗口大小 10ms, 默认watermark -100
A Event | A timestamp | A maxTimestamp | A watermark(max - 6) | watermark | B EVENT | B timestamp | B maxTimestamp | B watermark(max - 11) | watermark |
---|---|---|---|---|---|---|---|---|---|
(1,4) | 4 | 4 | -2 | (1,3) | 3 | 3 | -8 | ||
(1,7) | 7 | 7 | 2 | (1,6) | 6 | 6 | -5 | ||
(1,6) | 6 | 7 | 2 | (1,12) | 12 | 12 | 1 | ||
(1,15) | 15 | 15 | 9 | (9, 1) -> 1 < 9未触发join操作 | (1,15) | 15 | 15 | 4 | |
(1,30) | 30 | 30 | 19 | (9, 19) -> 9 >= 9触发join操作 |
coGroupedWindowedStream = input1.coGroup(input2) .where(keySelector1) .equalTo(keySelector2) .window(windowAssigner) .trigger(trigger) .evictor(evictor) .allowedLateness(allowedLateness); return coGroupedWindowedStream.apply(new JoinCoGroupFunction<>(function), resultType); void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception; 复制代码
3.3、迟到数据处理
stream.join(stream2) .where(tuple -> tuple.f0) .equalTo(tuple -> tuple.f0) .window(TumblingEventTimeWindows.of(Time.milliseconds(10))) .allowedLateness(Time.milliseconds(30)) .apply(new JoinWindowResult()) .addSink(new CommonPrintSink<>("流join")); 复制代码
窗口A: 10ms. 窗口B:10ms 延迟时间 30ms
A Event | A timestamp | A maxTimestamp | A watermark(max - 6) | B EVENT | B timestamp | B maxTimestamp | B watermark(max - 11) | ||
---|---|---|---|---|---|---|---|---|---|
(1,4) | 4 | 4 | -2 | (1,3) | 3 | 3 | -8 | ||
(1,6) | 6 | 7 | 2 | (1,12) | 12 | 12 | 1 | ||
(1,15) | 15 | 15 | 9 | 触发窗口操作(9, 1) -> 1 | (1,22) | 22 | 22 | 11 | (9,11) -> 9 触发窗口操作 |
(1,2) | 3 | 15 | 9 | 迟到数据1 | (1,7) | 7 | 22 | 11 | 迟到数据2 |
(1,45) | 45 | 45 | 39 | (39,11) -> 11 | (1,1) | 1 | 22 | 11 | 迟到数据3 |
(1,50) | 50 | 50 | 39 | (39,39)->39 第一个窗口迟到数据处理结束 | |||||
(1,8) | 8 | 45 | 39 | 未触发迟到数据处理 |
窗口操作
迟到数据1
迟到数据2
迟到数据3
![image-20211125121538870](/Users/lihui/Library/Application Support/typora-user-images/image-20211125121538870.png)
伪原创工具 SEO网站优化 https://www.237it.com/
作者:LHQWERzzz
链接:https://juejin.cn/post/7035501060101767176