阅读 212

Apache FlinkCEP 实现超时状态监控的步骤详解

这篇文章主要介绍了Apache FlinkCEP 实现超时状态监控的步骤,本文给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下

 

CEP - Complex Event Processing复杂事件处理。

订单下单后超过一定时间还未进行支付确认。

打车订单生成后超过一定时间没有确认上车。

外卖超过预定送达时间一定时限还没有确认送达。

Apache FlinkCEP API

CEPTimeoutEventJob

FlinkCEP源码简析

DataStream和PatternStream

DataStream 一般由相同类型事件或元素组成,一个DataStream可以通过一系列的转换操作如Filter、Map等转换为另一个DataStream。

PatternStream 是对CEP模式匹配的流的抽象,把DataStream和Pattern组合在一块,然后对外提供select和flatSelect等方法。PatternStream并不是DataStream,它提供方法把匹配的模式序列和与其相关联的事件组成的映射(就是Map<模式名称,List<事件>>)发出去,发到SingleOutputStreamOperator里面,SingleOutputStreamOperator是DataStream。

CEPOperatorUtils工具类里的方法和变量使用了「PatternStream」来命名,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public
  
static
 <IN, OUT>
SingleOutputStreamOperator
<OUT> createPatternStream(...){...}
public
 
static
 <IN, OUT1, OUT2>
SingleOutputStreamOperator
<OUT1> createTimeoutPatternStream(...){...}
 
final
  
SingleOutputStreamOperator
<OUT> patternStream;

SingleOutputStreamOperator

1
2
3
4
5
6
7
8
9
10
11
12
@Public
 
public
  
class
  
SingleOutputStreamOperator
<T>
extends
  
DataStream
<T> {...}

PatternStream的构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
PatternStream
(
final
  
DataStream
<T> inputStream,
final
  
Pattern
<T, ?> pattern) {
 
   
this
.inputStream = inputStream;
 
   
this
.pattern = pattern;
 
   
this
.comparator =
null
;
 
}
 
 
 
PatternStream
(
final
  
DataStream
<T> inputStream,
final
  
Pattern
<T, ?> pattern,
final
  
EventComparator
<T> comparator) {
 
   
this
.inputStream = inputStream;
 
   
this
.pattern = pattern;
 
   
this
.comparator = comparator;
 
}

Pattern、Quantifier和EventComparator

Pattern是模式定义的Base Class,Builder模式,定义好的模式会被NFACompiler用来生成NFA。

如果想要自己实现类似next和followedBy这种方法,比如timeEnd,对Pattern进行扩展重写应该是可行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public
class
Pattern
<T, F
extends
 T> {
/** 模式名称 */
private
final
String
 name;
/** 前面一个模式 */
private
final
Pattern
<T, ?
extends
 T> previous;
/** 一个事件如果要被当前模式匹配到,必须满足的约束条件 */
private
IterativeCondition
<F> condition;
/** 时间窗口长度,在时间长度内进行模式匹配 */
private
Time
 windowTime;
/** 模式量词,意思是一个模式匹配几个事件等 默认是匹配到一个 */
private
Quantifier
 quantifier =
Quantifier
.one(
ConsumingStrategy
.STRICT);
/** 停止将事件收集到循环状态时,事件必须满足的条件 */
private
IterativeCondition
<F> untilCondition;
/**
   * 适用于{@code times}模式,用来维护模式里事件可以连续发生的次数
   */
private
Times
 times;
// 匹配到事件之后的跳过策略
private
final
AfterMatchSkipStrategy
 afterMatchSkipStrategy;
  ...
}

Quantifier是用来描述具体模式行为的,主要有三大类:

Single-单一匹配、Looping-循环匹配、Times-一定次数或者次数范围内都能匹配到。

每一个模式Pattern可以是optional可选的(单一匹配或循环匹配),并可以设置ConsumingStrategy。

循环和次数也有一个额外的内部ConsumingStrategy,用在模式中接收的事件之间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
public
class
Quantifier
 {
  ...
/**
   * 5个属性,可以组合,但并非所有的组合都是有效的
   */
public
enum
QuantifierProperty
 {
    SINGLE,
    LOOPING,
    TIMES,
    OPTIONAL,
    GREEDY
  }
/**
   * 描述在此模式中匹配哪些事件的策略
   */
public
enum
ConsumingStrategy
 {
    STRICT,
    SKIP_TILL_NEXT,
    SKIP_TILL_ANY,
    NOT_FOLLOW,
    NOT_NEXT
  }
/**
   * 描述当前模式里事件可以连续发生的次数;举个例子,模式条件无非就是boolean,满足true条件的事件连续出现times次,或者一个次数范围,比如2~4次,2次,3次,4次都会被当前模式匹配出来,因此同一个事件会被重复匹配到
   */
public
static
class
Times
 {
private
final
int
 from;
private
final
int
 to;
private
Times
(
int
 from,
int
 to) {
Preconditions
.checkArgument(from >
0
,
"The from should be a positive number greater than 0."
);
Preconditions
.checkArgument(to >= from,
"The to should be a number greater than or equal to from: "
 + from +
"."
);
this
.from = from;
this
.to = to;
    }
public
int
 getFrom() {
return
 from;
    }
public
int
 getTo() {
return
 to;
    }
// 次数范围
public
static
Times
 of(
int
 from,
int
 to) {
return
new
Times
(from, to);
    }
// 指定具体次数
public
static
Times
 of(
int
 times) {
return
new
Times
(times, times);
    }
@Override
public
boolean
 equals(
Object
 o) {
if
 (
this
 == o) {
return
true
;
      }
if
 (o ==
null
 || getClass() != o.getClass()) {
return
false
;
      }
Times
 times = (
Times
) o;
return
 from == times.from &&
        to == times.to;
    }
@Override
public
int
 hashCode() {
return
Objects
.hash(from, to);
    }
  }
  ...
}

EventComparator,自定义事件比较器,实现EventComparator接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public
  
interface
  
EventComparator
<T>
extends
  
Comparator
<T>,
Serializable
 {
long
 serialVersionUID =
1L
;
}

NFACompiler和NFA

NFACompiler提供将Pattern编译成NFA或者NFAFactory的方法,使用NFAFactory可以创建多个NFA。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public
class
NFACompiler
 {
  ...
/**
   * NFAFactory 创建NFA的接口
   *
   * @param <T> Type of the input events which are processed by the NFA
   */
public
interface
NFAFactory
<T>
extends
Serializable
 {
    NFA<T> createNFA();
  }
   
/**
   * NFAFactory的具体实现NFAFactoryImpl
   *
   * <p>The implementation takes the input type serializer, the window time and the set of
   * states and their transitions to be able to create an NFA from them.
   *
   * @param <T> Type of the input events which are processed by the NFA
   */
private
static
class
NFAFactoryImpl
<T>
implements
NFAFactory
<T> {
     
private
static
final
long
 serialVersionUID =
8939783698296714379L
;
     
private
final
long
 windowTime;
private
final
Collection
<
State
<T>> states;
private
final
boolean
 timeoutHandling;
     
private
NFAFactoryImpl
(
long
 windowTime,
Collection
<
State
<T>> states,
boolean
 timeoutHandling) {
       
this
.windowTime = windowTime;
this
.states = states;
this
.timeoutHandling = timeoutHandling;
    }
     
@Override
public
 NFA<T> createNFA() {
// 一个NFA由状态集合、时间窗口的长度和是否处理超时组成
return
new
 NFA<>(states, windowTime, timeoutHandling);
    }
  }
}

NFA:Non-deterministic finite automaton - 非确定的有限(状态)自动机。

更多内容参见

https://zh.wikipedia.org/wiki/非确定有限状态自动机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public
class
 NFA<T> {
/**
   * NFACompiler返回的所有有效的NFA状态集合
   * These are directly derived from the user-specified pattern.
   */
private
final
Map
<
String
,
State
<T>> states;
   
/**
   * Pattern.within(Time)指定的时间窗口长度
   */
private
final
long
 windowTime;
   
/**
   * 一个超时匹配的标记
   */
private
final
boolean
 handleTimeout;
  ...
}

 

PatternSelectFunction和PatternFlatSelectFunction

当一个包含被匹配到的事件的映射能够通过模式名称访问到的时候,PatternSelectFunction的select()方法会被调用。模式名称是由Pattern定义的时候指定的。select()方法恰好返回一个结果,如果需要返回多个结果,则可以实现PatternFlatSelectFunction。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public
  
interface
  
PatternSelectFunction
<IN, OUT>
extends
  
Function
,
Serializable
 {
 
 
 
   
/**
 
   * 从给到的事件映射中生成一个结果。这些事件使用他们关联的模式名称作为唯一标识
 
   */
 
  OUT select(
Map
<
String
,
List
<IN>> pattern)
throws
  
Exception
;
 
}

 

PatternFlatSelectFunction,不是返回一个OUT,而是使用Collector 把匹配到的事件收集起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public
interface
PatternFlatSelectFunction
<IN, OUT>
extends
Function
,
Serializable
 {
   
/**
   * 生成一个或多个结果
   */
void
 flatSelect(
Map
<
String
,
List
<IN>> pattern,
Collector
<OUT> out)
throws
Exception
;
}

SelectTimeoutCepOperator、PatternTimeoutFunction

SelectTimeoutCepOperator是在CEPOperatorUtils中调用createTimeoutPatternStream()方法时创建出来。

SelectTimeoutCepOperator中会被算子迭代调用的方法是processMatchedSequences()和processTimedOutSequences()。

模板方法...对应到抽象类AbstractKeyedCEPPatternOperator中processEvent()方法和advanceTime()方法。

还有FlatSelectTimeoutCepOperator和对应的PatternFlatTimeoutFunction。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
public
class
SelectTimeoutCepOperator
<IN, OUT1, OUT2, KEY>
extends
AbstractKeyedCEPPatternOperator
<IN, KEY, OUT1,
SelectTimeoutCepOperator
.
SelectWrapper
<IN, OUT1, OUT2>> {
private
OutputTag
<OUT2> timedOutOutputTag;
public
SelectTimeoutCepOperator
(
TypeSerializer
<IN> inputSerializer,
boolean
 isProcessingTime,
NFACompiler
.
NFAFactory
<IN> nfaFactory,
final
EventComparator
<IN> comparator,
AfterMatchSkipStrategy
 skipStrategy,
// 参数命名混淆了flat...包括SelectWrapper类中的成员命名...
PatternSelectFunction
<IN, OUT1> flatSelectFunction,
PatternTimeoutFunction
<IN, OUT2> flatTimeoutFunction,
OutputTag
<OUT2> outputTag,
OutputTag
<IN> lateDataOutputTag) {
super
(
      inputSerializer,
      isProcessingTime,
      nfaFactory,
      comparator,
      skipStrategy,
new
SelectWrapper
<>(flatSelectFunction, flatTimeoutFunction),
      lateDataOutputTag);
this
.timedOutOutputTag = outputTag;
  }
  ...
}
public
interface
PatternTimeoutFunction
<IN, OUT>
extends
Function
,
Serializable
 {
  OUT timeout(
Map
<
String
,
List
<IN>> pattern,
long
 timeoutTimestamp)
throws
Exception
;
}
public
interface
PatternFlatTimeoutFunction
<IN, OUT>
extends
Function
,
Serializable
 {
void
 timeout(
Map
<
String
,
List
<IN>> pattern,
long
 timeoutTimestamp,
Collector
<OUT> out)
throws
Exception
;
}

 

CEP和CEPOperatorUtils

CEP是创建PatternStream的工具类,PatternStream只是DataStream和Pattern的组合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public
class
 CEP {
   
public
static
 <T>
PatternStream
<T> pattern(
DataStream
<T> input,
Pattern
<T, ?> pattern) {
return
new
PatternStream
<>(input, pattern);
  }
   
public
static
 <T>
PatternStream
<T> pattern(
DataStream
<T> input,
Pattern
<T, ?> pattern,
EventComparator
<T> comparator) {
return
new
PatternStream
<>(input, pattern, comparator);
  }
}

 

CEPOperatorUtils是在PatternStream的select()方法和flatSelect()方法被调用的时候,去创建SingleOutputStreamOperator(DataStream)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public
class
CEPOperatorUtils
 {
  ...
private
static
 <IN, OUT, K>
SingleOutputStreamOperator
<OUT> createPatternStream(
final
DataStream
<IN> inputStream,
final
Pattern
<IN, ?> pattern,
final
TypeInformation
<OUT> outTypeInfo,
final
boolean
 timeoutHandling,
final
EventComparator
<IN> comparator,
final
OperatorBuilder
<IN, OUT> operatorBuilder) {
final
TypeSerializer
<IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
     
// check whether we use processing time
final
boolean
 isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() ==
TimeCharacteristic
.
ProcessingTime
;
     
// compile our pattern into a NFAFactory to instantiate NFAs later on
final
NFACompiler
.
NFAFactory
<IN> nfaFactory =
NFACompiler
.compileFactory(pattern, timeoutHandling);
     
final
SingleOutputStreamOperator
<OUT> patternStream;
     
if
 (inputStream
instanceof
KeyedStream
) {
KeyedStream
<IN, K> keyedStream = (
KeyedStream
<IN, K>) inputStream;
      patternStream = keyedStream.transform(
        operatorBuilder.getKeyedOperatorName(),
        outTypeInfo,
        operatorBuilder.build(
          inputSerializer,
          isProcessingTime,
          nfaFactory,
          comparator,
          pattern.getAfterMatchSkipStrategy()));
    }
else
 {
KeySelector
<IN,
Byte
> keySelector =
new
NullByteKeySelector
<>();
      patternStream = inputStream.keyBy(keySelector).transform(
        operatorBuilder.getOperatorName(),
        outTypeInfo,
        operatorBuilder.build(
          inputSerializer,
          isProcessingTime,
          nfaFactory,
          comparator,
          pattern.getAfterMatchSkipStrategy()
        )).forceNonParallel();
    }
     
return
 patternStream;
  }
  ...
}

FlinkCEP实现步骤

  1. IN: DataSource -> DataStream -> Transformations -> DataStream

  2. Pattern: Pattern.begin.where.next.where...times...

  3. PatternStream: CEP.pattern(DataStream, Pattern)

  4. DataStream: PatternStream.select(PatternSelectFunction) PatternStream.flatSelect(PatternSelectFunction)

  5. OUT: DataStream -> Transformations -> DataStream -> DataSink

FlinkCEP匹配超时实现步骤

TimeoutCEP的流需要keyBy,即KeyedStream,如果inputStream不是KeyedStream,会new一个0字节的Key(上面CEPOperatorUtils源码里有提到)。

1
2
3
4
5
6
7
8
KeySelector
<IN,
Byte
> keySelector =
new
  
NullByteKeySelector
<>();


Pattern最后调用within设置窗口时间。 如果是对主键进行分组,一个时间窗口内最多只会匹配出一个超时事件,使用PatternStream.select(...)就可以了。

  1. IN: DataSource -> DataStream -> Transformations -> DataStream -> keyBy -> KeyedStream

  2. Pattern: Pattern.begin.where.next.where...within(Time windowTime)

  3. PatternStream: CEP.pattern(KeyedStream, Pattern)

  4. OutputTag: new OutputTag(...)

  5. SingleOutputStreamOperator: PatternStream.flatSelect(OutputTag, PatternFlatTimeoutFunction, PatternFlatSelectFunction)

  6. DataStream: SingleOutputStreamOperator.getSideOutput(OutputTag)

  7. OUT: DataStream -> Transformations -> DataStream -> DataSink

FlinkCEP超时不足

和Flink窗口聚合类似,如果使用事件时间和依赖事件生成的水印向前推进,需要后续的事件到达,才会触发窗口进行计算和输出结果。

FlinkCEP超时完整demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
public
class
CEPTimeoutEventJob
 {
private
static
final
String
 LOCAL_KAFKA_BROKER =
"localhost:9092"
;
private
static
final
String
 GROUP_ID =
CEPTimeoutEventJob
.
class
.getSimpleName();
private
static
final
String
 GROUP_TOPIC = GROUP_ID;
   
public
static
void
 main(
String
[] args)
throws
Exception
 {
// 参数
ParameterTool
 params =
ParameterTool
.fromArgs(args);
     
StreamExecutionEnvironment
 env =
StreamExecutionEnvironment
.getExecutionEnvironment();
// 使用事件时间
    env.setStreamTimeCharacteristic(
TimeCharacteristic
.
EventTime
);
    env.enableCheckpointing(
5000
);
    env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig
.
ExternalizedCheckpointCleanup
.RETAIN_ON_CANCELLATION);
    env.getConfig().disableSysoutLogging();
    env.getConfig().setRestartStrategy(
RestartStrategies
.fixedDelayRestart(
5
,
10000
));
     
// 不使用POJO的时间
final
AssignerWithPeriodicWatermarks
 extractor =
new
IngestionTimeExtractor
<POJO>();
     
// 与Kafka Topic的Partition保持一致
    env.setParallelism(
3
);
     
Properties
 kafkaProps =
new
Properties
();
    kafkaProps.setProperty(
"bootstrap.servers"
, LOCAL_KAFKA_BROKER);
    kafkaProps.setProperty(
"group.id"
, GROUP_ID);
     
// 接入Kafka的消息
FlinkKafkaConsumer011
<POJO> consumer =
new
FlinkKafkaConsumer011
<>(GROUP_TOPIC,
new
POJOSchema
(), kafkaProps);
DataStream
<POJO> pojoDataStream = env.addSource(consumer)
        .assignTimestampsAndWatermarks(extractor);
    pojoDataStream.print();
     
// 根据主键aid分组 即对每一个POJO事件进行匹配检测【不同类型的POJO,可以采用不同的within时间】
// 1.
DataStream
<POJO> keyedPojos = pojoDataStream
        .keyBy(
"aid"
);
     
// 从初始化到终态-一个完整的POJO事件序列
// 2.
Pattern
<POJO, POJO> completedPojo =
Pattern
.<POJO>begin(
"init"
)
            .where(
new
SimpleCondition
<POJO>() {
private
static
final
long
 serialVersionUID = -
6847788055093903603L
;
               
@Override
public
boolean
 filter(POJO pojo)
throws
Exception
 {
return
"02"
.equals(pojo.getAstatus());
              }
            })
            .followedBy(
"end"
)
//            .next("end")
            .where(
new
SimpleCondition
<POJO>() {
private
static
final
long
 serialVersionUID = -
2655089736460847552L
;
               
@Override
public
boolean
 filter(POJO pojo)
throws
Exception
 {
return
"00"
.equals(pojo.getAstatus()) ||
"01"
.equals(pojo.getAstatus());
              }
            });
     
// 找出1分钟内【便于测试】都没有到终态的事件aid
// 如果针对不同类型有不同within时间,比如有的是超时1分钟,有的可能是超时1个小时 则生成多个PatternStream
// 3.
PatternStream
<POJO> patternStream = CEP.pattern(keyedPojos, completedPojo.within(
Time
.minutes(
1
)));
     
// 定义侧面输出timedout
// 4.
OutputTag
<POJO> timedout =
new
OutputTag
<POJO>(
"timedout"
) {
private
static
final
long
 serialVersionUID =
773503794597666247L
;
    };
     
// OutputTag<L> timeoutOutputTag, PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction, PatternFlatSelectFunction<T, R> patternFlatSelectFunction
// 5.
SingleOutputStreamOperator
<POJO> timeoutPojos = patternStream.flatSelect(
        timedout,
new
POJOTimedOut
(),
new
FlatSelectNothing
()
    );
     
// 打印输出超时的POJO
// 6.7.
    timeoutPojos.getSideOutput(timedout).print();
    timeoutPojos.print();
    env.execute(
CEPTimeoutEventJob
.
class
.getSimpleName());
  }
   
/**
   * 把超时的事件收集起来
   */
public
static
class
POJOTimedOut
implements
PatternFlatTimeoutFunction
<POJO, POJO> {
private
static
final
long
 serialVersionUID = -
4214641891396057732L
;
     
@Override
public
void
 timeout(
Map
<
String
,
List
<POJO>> map,
long
 l,
Collector
<POJO> collector)
throws
Exception
 {
if
 (
null
 != map.get(
"init"
)) {
for
 (POJO pojoInit : map.get(
"init"
)) {
System
.out.println(
"timeout init:"
 + pojoInit.getAid());
          collector.collect(pojoInit);
        }
      }
// 因为end超时了,还没收到end,所以这里是拿不到end的
System
.out.println(
"timeout end: "
 + map.get(
"end"
));
    }
  }
   
/**
   * 通常什么都不做,但也可以把所有匹配到的事件发往下游;如果是宽松临近,被忽略或穿透的事件就没办法选中发往下游了
   * 一分钟时间内走完init和end的数据
   *
   * @param <T>
   */
public
static
class
FlatSelectNothing
<T>
implements
PatternFlatSelectFunction
<T, T> {
private
static
final
long
 serialVersionUID = -
3029589950677623844L
;
     
@Override
public
void
 flatSelect(
Map
<
String
,
List
<T>> pattern,
Collector
<T> collector) {
System
.out.println(
"flatSelect: "
 + pattern);
    }
  }
}

测试结果(followedBy):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
3
> POJO{aid=
'ID000-0'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419728242
, energy=
529.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-1'
, astyle=
'STYLE000-2'
, aname=
'NAME-1'
, logTime=
1563419728783
, energy=
348.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-0'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419749259
, energy=
492.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'00'
, createTime=
null
, updateTime=
null
}
flatSelect: {init=[POJO{aid=
'ID000-0'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419728242
, energy=
529.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}],
end
=[POJO{aid=
'ID000-0'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419749259
, energy=
492.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'00'
, createTime=
null
, updateTime=
null
}]}
timeout init:ID000-
1
3
> POJO{aid=
'ID000-1'
, astyle=
'STYLE000-2'
, aname=
'NAME-1'
, logTime=
1563419728783
, energy=
348.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
timeout
end
:
null
3
> POJO{aid=
'ID000-2'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419829639
, energy=
467.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'03'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-2'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419841394
, energy=
107.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'00'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-3'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419967721
, energy=
431.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-3'
, astyle=
'STYLE000-2'
, aname=
'NAME-0'
, logTime=
1563419979567
, energy=
32.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'03'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-3'
, astyle=
'STYLE000-2'
, aname=
'NAME-0'
, logTime=
1563419993612
, energy=
542.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'01'
, createTime=
null
, updateTime=
null
}
flatSelect: {init=[POJO{aid=
'ID000-3'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563419967721
, energy=
431.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}],
end
=[POJO{aid=
'ID000-3'
, astyle=
'STYLE000-2'
, aname=
'NAME-0'
, logTime=
1563419993612
, energy=
542.00
, age=
26
, tt=
2019
-
07
-
18
, astatus=
'01'
, createTime=
null
, updateTime=
null
}]}
3
> POJO{aid=
'ID000-4'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563420063760
, energy=
122.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
3
> POJO{aid=
'ID000-4'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563420078008
, energy=
275.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'03'
, createTime=
null
, updateTime=
null
}
timeout init:ID000-
4
3
> POJO{aid=
'ID000-4'
, astyle=
'STYLE000-0'
, aname=
'NAME-0'
, logTime=
1563420063760
, energy=
122.00
, age=
0
, tt=
2019
-
07
-
18
, astatus=
'02'
, createTime=
null
, updateTime=
null
}
timeout
end
:
null


总结

以上所述是小编给大家介绍的Apache FlinkCEP 实现超时状态监控的步骤,希望对大家有所帮助



文章分类
后端
版权声明:本站是系统测试站点,无实际运营。本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 XXXXXXo@163.com 举报,一经查实,本站将立刻删除。
相关推荐