Apache FlinkCEP 实现超时状态监控的步骤详解
这篇文章主要介绍了Apache FlinkCEP 实现超时状态监控的步骤,本文给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
CEP - Complex Event Processing复杂事件处理。
订单下单后超过一定时间还未进行支付确认。
打车订单生成后超过一定时间没有确认上车。
外卖超过预定送达时间一定时限还没有确认送达。
Apache FlinkCEP API
CEPTimeoutEventJob
FlinkCEP源码简析
DataStream和PatternStream
DataStream 一般由相同类型事件或元素组成,一个DataStream可以通过一系列的转换操作如Filter、Map等转换为另一个DataStream。
PatternStream 是对CEP模式匹配的流的抽象,把DataStream和Pattern组合在一块,然后对外提供select和flatSelect等方法。PatternStream并不是DataStream,它提供方法把匹配的模式序列和与其相关联的事件组成的映射(就是Map<模式名称,List<事件>>)发出去,发到SingleOutputStreamOperator里面,SingleOutputStreamOperator是DataStream。
CEPOperatorUtils工具类里的方法和变量使用了「PatternStream」来命名,比如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | public static <IN, OUT> SingleOutputStreamOperator <OUT> createPatternStream(...){...} public static <IN, OUT1, OUT2> SingleOutputStreamOperator <OUT1> createTimeoutPatternStream(...){...} final SingleOutputStreamOperator <OUT> patternStream; |
SingleOutputStreamOperator
1 2 3 4 5 6 7 8 9 10 11 12 | @Public public class SingleOutputStreamOperator <T> extends DataStream <T> {...} |
PatternStream的构造方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | PatternStream ( final DataStream <T> inputStream, final Pattern <T, ?> pattern) { this .inputStream = inputStream; this .pattern = pattern; this .comparator = null ; } PatternStream ( final DataStream <T> inputStream, final Pattern <T, ?> pattern, final EventComparator <T> comparator) { this .inputStream = inputStream; this .pattern = pattern; this .comparator = comparator; } |
Pattern、Quantifier和EventComparator
Pattern是模式定义的Base Class,Builder模式,定义好的模式会被NFACompiler用来生成NFA。
如果想要自己实现类似next和followedBy这种方法,比如timeEnd,对Pattern进行扩展重写应该是可行的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | public class Pattern <T, F extends T> { /** 模式名称 */ private final String name; /** 前面一个模式 */ private final Pattern <T, ? extends T> previous; /** 一个事件如果要被当前模式匹配到,必须满足的约束条件 */ private IterativeCondition <F> condition; /** 时间窗口长度,在时间长度内进行模式匹配 */ private Time windowTime; /** 模式量词,意思是一个模式匹配几个事件等 默认是匹配到一个 */ private Quantifier quantifier = Quantifier .one( ConsumingStrategy .STRICT); /** 停止将事件收集到循环状态时,事件必须满足的条件 */ private IterativeCondition <F> untilCondition; /** * 适用于{@code times}模式,用来维护模式里事件可以连续发生的次数 */ private Times times; // 匹配到事件之后的跳过策略 private final AfterMatchSkipStrategy afterMatchSkipStrategy; ... } |
Quantifier是用来描述具体模式行为的,主要有三大类:
Single-单一匹配、Looping-循环匹配、Times-一定次数或者次数范围内都能匹配到。
每一个模式Pattern可以是optional可选的(单一匹配或循环匹配),并可以设置ConsumingStrategy。
循环和次数也有一个额外的内部ConsumingStrategy,用在模式中接收的事件之间。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | public class Quantifier { ... /** * 5个属性,可以组合,但并非所有的组合都是有效的 */ public enum QuantifierProperty { SINGLE, LOOPING, TIMES, OPTIONAL, GREEDY } /** * 描述在此模式中匹配哪些事件的策略 */ public enum ConsumingStrategy { STRICT, SKIP_TILL_NEXT, SKIP_TILL_ANY, NOT_FOLLOW, NOT_NEXT } /** * 描述当前模式里事件可以连续发生的次数;举个例子,模式条件无非就是boolean,满足true条件的事件连续出现times次,或者一个次数范围,比如2~4次,2次,3次,4次都会被当前模式匹配出来,因此同一个事件会被重复匹配到 */ public static class Times { private final int from; private final int to; private Times ( int from, int to) { Preconditions .checkArgument(from > 0 , "The from should be a positive number greater than 0." ); Preconditions .checkArgument(to >= from, "The to should be a number greater than or equal to from: " + from + "." ); this .from = from; this .to = to; } public int getFrom() { return from; } public int getTo() { return to; } // 次数范围 public static Times of( int from, int to) { return new Times (from, to); } // 指定具体次数 public static Times of( int times) { return new Times (times, times); } @Override public boolean equals( Object o) { if ( this == o) { return true ; } if (o == null || getClass() != o.getClass()) { return false ; } Times times = ( Times ) o; return from == times.from && to == times.to; } @Override public int hashCode() { return Objects .hash(from, to); } } ... } |
EventComparator,自定义事件比较器,实现EventComparator接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | public interface EventComparator <T> extends Comparator <T>, Serializable { long serialVersionUID = 1L ; } |
NFACompiler和NFA
NFACompiler提供将Pattern编译成NFA或者NFAFactory的方法,使用NFAFactory可以创建多个NFA。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | public class NFACompiler { ... /** * NFAFactory 创建NFA的接口 * * @param <T> Type of the input events which are processed by the NFA */ public interface NFAFactory <T> extends Serializable { NFA<T> createNFA(); } /** * NFAFactory的具体实现NFAFactoryImpl * * <p>The implementation takes the input type serializer, the window time and the set of * states and their transitions to be able to create an NFA from them. * * @param <T> Type of the input events which are processed by the NFA */ private static class NFAFactoryImpl <T> implements NFAFactory <T> { private static final long serialVersionUID = 8939783698296714379L ; private final long windowTime; private final Collection < State <T>> states; private final boolean timeoutHandling; private NFAFactoryImpl ( long windowTime, Collection < State <T>> states, boolean timeoutHandling) { this .windowTime = windowTime; this .states = states; this .timeoutHandling = timeoutHandling; } @Override public NFA<T> createNFA() { // 一个NFA由状态集合、时间窗口的长度和是否处理超时组成 return new NFA<>(states, windowTime, timeoutHandling); } } } |
NFA:Non-deterministic finite automaton - 非确定的有限(状态)自动机。
更多内容参见
https://zh.wikipedia.org/wiki/非确定有限状态自动机
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | public class NFA<T> { /** * NFACompiler返回的所有有效的NFA状态集合 * These are directly derived from the user-specified pattern. */ private final Map < String , State <T>> states; /** * Pattern.within(Time)指定的时间窗口长度 */ private final long windowTime; /** * 一个超时匹配的标记 */ private final boolean handleTimeout; ... } |
PatternSelectFunction和PatternFlatSelectFunction
当一个包含被匹配到的事件的映射能够通过模式名称访问到的时候,PatternSelectFunction的select()方法会被调用。模式名称是由Pattern定义的时候指定的。select()方法恰好返回一个结果,如果需要返回多个结果,则可以实现PatternFlatSelectFunction。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | public interface PatternSelectFunction <IN, OUT> extends Function , Serializable { /** * 从给到的事件映射中生成一个结果。这些事件使用他们关联的模式名称作为唯一标识 */ OUT select( Map < String , List <IN>> pattern) throws Exception ; } |
PatternFlatSelectFunction,不是返回一个OUT,而是使用Collector 把匹配到的事件收集起来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | public interface PatternFlatSelectFunction <IN, OUT> extends Function , Serializable { /** * 生成一个或多个结果 */ void flatSelect( Map < String , List <IN>> pattern, Collector <OUT> out) throws Exception ; } |
SelectTimeoutCepOperator、PatternTimeoutFunction
SelectTimeoutCepOperator是在CEPOperatorUtils中调用createTimeoutPatternStream()方法时创建出来。
SelectTimeoutCepOperator中会被算子迭代调用的方法是processMatchedSequences()和processTimedOutSequences()。
模板方法...对应到抽象类AbstractKeyedCEPPatternOperator中processEvent()方法和advanceTime()方法。
还有FlatSelectTimeoutCepOperator和对应的PatternFlatTimeoutFunction。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 | public class SelectTimeoutCepOperator <IN, OUT1, OUT2, KEY> extends AbstractKeyedCEPPatternOperator <IN, KEY, OUT1, SelectTimeoutCepOperator . SelectWrapper <IN, OUT1, OUT2>> { private OutputTag <OUT2> timedOutOutputTag; public SelectTimeoutCepOperator ( TypeSerializer <IN> inputSerializer, boolean isProcessingTime, NFACompiler . NFAFactory <IN> nfaFactory, final EventComparator <IN> comparator, AfterMatchSkipStrategy skipStrategy, // 参数命名混淆了flat...包括SelectWrapper类中的成员命名... PatternSelectFunction <IN, OUT1> flatSelectFunction, PatternTimeoutFunction <IN, OUT2> flatTimeoutFunction, OutputTag <OUT2> outputTag, OutputTag <IN> lateDataOutputTag) { super ( inputSerializer, isProcessingTime, nfaFactory, comparator, skipStrategy, new SelectWrapper <>(flatSelectFunction, flatTimeoutFunction), lateDataOutputTag); this .timedOutOutputTag = outputTag; } ... } public interface PatternTimeoutFunction <IN, OUT> extends Function , Serializable { OUT timeout( Map < String , List <IN>> pattern, long timeoutTimestamp) throws Exception ; } public interface PatternFlatTimeoutFunction <IN, OUT> extends Function , Serializable { void timeout( Map < String , List <IN>> pattern, long timeoutTimestamp, Collector <OUT> out) throws Exception ; } |
CEP和CEPOperatorUtils
CEP是创建PatternStream的工具类,PatternStream只是DataStream和Pattern的组合。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | public class CEP { public static <T> PatternStream <T> pattern( DataStream <T> input, Pattern <T, ?> pattern) { return new PatternStream <>(input, pattern); } public static <T> PatternStream <T> pattern( DataStream <T> input, Pattern <T, ?> pattern, EventComparator <T> comparator) { return new PatternStream <>(input, pattern, comparator); } } |
CEPOperatorUtils是在PatternStream的select()方法和flatSelect()方法被调用的时候,去创建SingleOutputStreamOperator(DataStream)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 | public class CEPOperatorUtils { ... private static <IN, OUT, K> SingleOutputStreamOperator <OUT> createPatternStream( final DataStream <IN> inputStream, final Pattern <IN, ?> pattern, final TypeInformation <OUT> outTypeInfo, final boolean timeoutHandling, final EventComparator <IN> comparator, final OperatorBuilder <IN, OUT> operatorBuilder) { final TypeSerializer <IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig()); // check whether we use processing time final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic . ProcessingTime ; // compile our pattern into a NFAFactory to instantiate NFAs later on final NFACompiler . NFAFactory <IN> nfaFactory = NFACompiler .compileFactory(pattern, timeoutHandling); final SingleOutputStreamOperator <OUT> patternStream; if (inputStream instanceof KeyedStream ) { KeyedStream <IN, K> keyedStream = ( KeyedStream <IN, K>) inputStream; patternStream = keyedStream.transform( operatorBuilder.getKeyedOperatorName(), outTypeInfo, operatorBuilder.build( inputSerializer, isProcessingTime, nfaFactory, comparator, pattern.getAfterMatchSkipStrategy())); } else { KeySelector <IN, Byte > keySelector = new NullByteKeySelector <>(); patternStream = inputStream.keyBy(keySelector).transform( operatorBuilder.getOperatorName(), outTypeInfo, operatorBuilder.build( inputSerializer, isProcessingTime, nfaFactory, comparator, pattern.getAfterMatchSkipStrategy() )).forceNonParallel(); } return patternStream; } ... } |
FlinkCEP实现步骤
IN: DataSource -> DataStream -> Transformations -> DataStream
Pattern: Pattern.begin.where.next.where...times...
PatternStream: CEP.pattern(DataStream, Pattern)
DataStream: PatternStream.select(PatternSelectFunction) PatternStream.flatSelect(PatternSelectFunction)
OUT: DataStream -> Transformations -> DataStream -> DataSink
FlinkCEP匹配超时实现步骤
TimeoutCEP的流需要keyBy,即KeyedStream,如果inputStream不是KeyedStream,会new一个0字节的Key(上面CEPOperatorUtils源码里有提到)。
1 2 3 4 5 6 7 8 | KeySelector <IN, Byte > keySelector = new NullByteKeySelector <>(); |
Pattern最后调用within设置窗口时间。 如果是对主键进行分组,一个时间窗口内最多只会匹配出一个超时事件,使用PatternStream.select(...)就可以了。
IN: DataSource -> DataStream -> Transformations -> DataStream -> keyBy -> KeyedStream
Pattern: Pattern.begin.where.next.where...within(Time windowTime)
PatternStream: CEP.pattern(KeyedStream, Pattern)
OutputTag: new OutputTag(...)
SingleOutputStreamOperator: PatternStream.flatSelect(OutputTag, PatternFlatTimeoutFunction, PatternFlatSelectFunction)
DataStream: SingleOutputStreamOperator.getSideOutput(OutputTag)
OUT: DataStream -> Transformations -> DataStream -> DataSink
FlinkCEP超时不足
和Flink窗口聚合类似,如果使用事件时间和依赖事件生成的水印向前推进,需要后续的事件到达,才会触发窗口进行计算和输出结果。
FlinkCEP超时完整demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 | public class CEPTimeoutEventJob { private static final String LOCAL_KAFKA_BROKER = "localhost:9092" ; private static final String GROUP_ID = CEPTimeoutEventJob . class .getSimpleName(); private static final String GROUP_TOPIC = GROUP_ID; public static void main( String [] args) throws Exception { // 参数 ParameterTool params = ParameterTool .fromArgs(args); StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment(); // 使用事件时间 env.setStreamTimeCharacteristic( TimeCharacteristic . EventTime ); env.enableCheckpointing( 5000 ); env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig . ExternalizedCheckpointCleanup .RETAIN_ON_CANCELLATION); env.getConfig().disableSysoutLogging(); env.getConfig().setRestartStrategy( RestartStrategies .fixedDelayRestart( 5 , 10000 )); // 不使用POJO的时间 final AssignerWithPeriodicWatermarks extractor = new IngestionTimeExtractor <POJO>(); // 与Kafka Topic的Partition保持一致 env.setParallelism( 3 ); Properties kafkaProps = new Properties (); kafkaProps.setProperty( "bootstrap.servers" , LOCAL_KAFKA_BROKER); kafkaProps.setProperty( "group.id" , GROUP_ID); // 接入Kafka的消息 FlinkKafkaConsumer011 <POJO> consumer = new FlinkKafkaConsumer011 <>(GROUP_TOPIC, new POJOSchema (), kafkaProps); DataStream <POJO> pojoDataStream = env.addSource(consumer) .assignTimestampsAndWatermarks(extractor); pojoDataStream.print(); // 根据主键aid分组 即对每一个POJO事件进行匹配检测【不同类型的POJO,可以采用不同的within时间】 // 1. DataStream <POJO> keyedPojos = pojoDataStream .keyBy( "aid" ); // 从初始化到终态-一个完整的POJO事件序列 // 2. Pattern <POJO, POJO> completedPojo = Pattern .<POJO>begin( "init" ) .where( new SimpleCondition <POJO>() { private static final long serialVersionUID = - 6847788055093903603L ; @Override public boolean filter(POJO pojo) throws Exception { return "02" .equals(pojo.getAstatus()); } }) .followedBy( "end" ) // .next("end") .where( new SimpleCondition <POJO>() { private static final long serialVersionUID = - 2655089736460847552L ; @Override public boolean filter(POJO pojo) throws Exception { return "00" .equals(pojo.getAstatus()) || "01" .equals(pojo.getAstatus()); } }); // 找出1分钟内【便于测试】都没有到终态的事件aid // 如果针对不同类型有不同within时间,比如有的是超时1分钟,有的可能是超时1个小时 则生成多个PatternStream // 3. PatternStream <POJO> patternStream = CEP.pattern(keyedPojos, completedPojo.within( Time .minutes( 1 ))); // 定义侧面输出timedout // 4. OutputTag <POJO> timedout = new OutputTag <POJO>( "timedout" ) { private static final long serialVersionUID = 773503794597666247L ; }; // OutputTag<L> timeoutOutputTag, PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction, PatternFlatSelectFunction<T, R> patternFlatSelectFunction // 5. SingleOutputStreamOperator <POJO> timeoutPojos = patternStream.flatSelect( timedout, new POJOTimedOut (), new FlatSelectNothing () ); // 打印输出超时的POJO // 6.7. timeoutPojos.getSideOutput(timedout).print(); timeoutPojos.print(); env.execute( CEPTimeoutEventJob . class .getSimpleName()); } /** * 把超时的事件收集起来 */ public static class POJOTimedOut implements PatternFlatTimeoutFunction <POJO, POJO> { private static final long serialVersionUID = - 4214641891396057732L ; @Override public void timeout( Map < String , List <POJO>> map, long l, Collector <POJO> collector) throws Exception { if ( null != map.get( "init" )) { for (POJO pojoInit : map.get( "init" )) { System .out.println( "timeout init:" + pojoInit.getAid()); collector.collect(pojoInit); } } // 因为end超时了,还没收到end,所以这里是拿不到end的 System .out.println( "timeout end: " + map.get( "end" )); } } /** * 通常什么都不做,但也可以把所有匹配到的事件发往下游;如果是宽松临近,被忽略或穿透的事件就没办法选中发往下游了 * 一分钟时间内走完init和end的数据 * * @param <T> */ public static class FlatSelectNothing <T> implements PatternFlatSelectFunction <T, T> { private static final long serialVersionUID = - 3029589950677623844L ; @Override public void flatSelect( Map < String , List <T>> pattern, Collector <T> collector) { System .out.println( "flatSelect: " + pattern); } } } |
测试结果(followedBy):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 | 3 > POJO{aid= 'ID000-0' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419728242 , energy= 529.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-1' , astyle= 'STYLE000-2' , aname= 'NAME-1' , logTime= 1563419728783 , energy= 348.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-0' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419749259 , energy= 492.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '00' , createTime= null , updateTime= null } flatSelect: {init=[POJO{aid= 'ID000-0' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419728242 , energy= 529.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null }], end =[POJO{aid= 'ID000-0' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419749259 , energy= 492.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '00' , createTime= null , updateTime= null }]} timeout init:ID000- 1 3 > POJO{aid= 'ID000-1' , astyle= 'STYLE000-2' , aname= 'NAME-1' , logTime= 1563419728783 , energy= 348.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } timeout end : null 3 > POJO{aid= 'ID000-2' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419829639 , energy= 467.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '03' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-2' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419841394 , energy= 107.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '00' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-3' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419967721 , energy= 431.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-3' , astyle= 'STYLE000-2' , aname= 'NAME-0' , logTime= 1563419979567 , energy= 32.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '03' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-3' , astyle= 'STYLE000-2' , aname= 'NAME-0' , logTime= 1563419993612 , energy= 542.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '01' , createTime= null , updateTime= null } flatSelect: {init=[POJO{aid= 'ID000-3' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563419967721 , energy= 431.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null }], end =[POJO{aid= 'ID000-3' , astyle= 'STYLE000-2' , aname= 'NAME-0' , logTime= 1563419993612 , energy= 542.00 , age= 26 , tt= 2019 - 07 - 18 , astatus= '01' , createTime= null , updateTime= null }]} 3 > POJO{aid= 'ID000-4' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563420063760 , energy= 122.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } 3 > POJO{aid= 'ID000-4' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563420078008 , energy= 275.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '03' , createTime= null , updateTime= null } timeout init:ID000- 4 3 > POJO{aid= 'ID000-4' , astyle= 'STYLE000-0' , aname= 'NAME-0' , logTime= 1563420063760 , energy= 122.00 , age= 0 , tt= 2019 - 07 - 18 , astatus= '02' , createTime= null , updateTime= null } timeout end : null |
总结
以上所述是小编给大家介绍的Apache FlinkCEP 实现超时状态监控的步骤,希望对大家有所帮助