转载

Apache FlinkCEP 实现超时状态监控

Apache FlinkCEP 实现超时状态监控

CEP - Complex Event Processing复杂事件处理。

订单下单后超过一定时间还未进行支付确认。

打车订单生成后超过一定时间没有确认上车。

外卖超过预定送达时间一定时限还没有确认送达。

Apache FlinkCEP API

CEPTimeoutEventJob

FlinkCEP源码简析

DataStream和PatternStream

DataStream 一般由相同类型事件或元素组成,一个DataStream可以通过一系列的转换操作如Filter、Map等转换为另一个DataStream。

PatternStream 是对CEP模式匹配的流的抽象,把DataStream和Pattern组合在一块,然后对外提供select和flatSelect等方法。PatternStream并不是DataStream,它提供方法把匹配的模式序列和与其相关联的事件组成的映射(就是Map<模式名称,List<事件>>)发出去,发到SingleOutputStreamOperator里面,SingleOutputStreamOperator是DataStream。

CEPOperatorUtils工具类里的方法和变量使用了「PatternStream」来命名,比如:

SingleOutputStreamOperator

PatternStream的构造方法:

Pattern、Quantifier和EventComparator

Pattern是模式定义的Base Class,Builder模式,定义好的模式会被NFACompiler用来生成NFA。

如果想要自己实现类似next和followedBy这种方法,比如timeEnd,对Pattern进行扩展重写应该是可行的。

Quantifier是用来描述具体模式行为的,主要有三大类:

Single-单一匹配、Looping-循环匹配、Times-一定次数或者次数范围内都能匹配到。

每一个模式Pattern可以是optional可选的(单一匹配或循环匹配),并可以设置ConsumingStrategy。

循环和次数也有一个额外的内部ConsumingStrategy,用在模式中接收的事件之间。

EventComparator,自定义事件比较器,实现EventComparator接口。

NFACompiler和NFA

NFACompiler提供将Pattern编译成NFA或者NFAFactory的方法,使用NFAFactory可以创建多个NFA。

NFA:Non-deterministic finite automaton - 非确定的有限(状态)自动机。

更多内容参见

https://zh.wikipedia.org/wiki/非确定有限状态自动机

PatternSelectFunction和PatternFlatSelectFunction

当一个包含被匹配到的事件的映射能够通过模式名称访问到的时候,PatternSelectFunction的select()方法会被调用。模式名称是由Pattern定义的时候指定的。select()方法恰好返回一个结果,如果需要返回多个结果,则可以实现PatternFlatSelectFunction。

PatternFlatSelectFunction,不是返回一个OUT,而是使用Collector 把匹配到的事件收集起来。

SelectTimeoutCepOperator、PatternTimeoutFunction

SelectTimeoutCepOperator是在CEPOperatorUtils中调用createTimeoutPatternStream()方法时创建出来。

SelectTimeoutCepOperator中会被算子迭代调用的方法是processMatchedSequences()和processTimedOutSequences()。

模板方法...对应到抽象类AbstractKeyedCEPPatternOperator中processEvent()方法和advanceTime()方法。

还有FlatSelectTimeoutCepOperator和对应的PatternFlatTimeoutFunction。

CEP和CEPOperatorUtils

CEP是创建PatternStream的工具类,PatternStream只是DataStream和Pattern的组合。

CEPOperatorUtils是在PatternStream的select()方法和flatSelect()方法被调用的时候,去创建SingleOutputStreamOperator(DataStream)。

FlinkCEP实现步骤

  1. IN: DataSource -> DataStream -> Transformations -> DataStream

  2. Pattern: Pattern.begin.where.next.where...times...

  3. PatternStream: CEP.pattern(DataStream, Pattern)

  4. DataStream: PatternStream.select(PatternSelectFunction) PatternStream.flatSelect(PatternSelectFunction)

  5. OUT: DataStream -> Transformations -> DataStream -> DataSink

FlinkCEP匹配超时实现步骤

TimeoutCEP的流需要keyBy,即KeyedStream,如果inputStream不是KeyedStream,会new一个0字节的Key(上面CEPOperatorUtils源码里有提到)。

Pattern最后调用within设置窗口时间。 如果是对主键进行分组,一个时间窗口内最多只会匹配出一个超时事件,使用PatternStream.select(...)就可以了。

  1. IN: DataSource -> DataStream -> Transformations -> DataStream -> keyBy -> KeyedStream

  2. Pattern: Pattern.begin.where.next.where...within(Time windowTime)

  3. PatternStream: CEP.pattern(KeyedStream, Pattern)

  4. OutputTag: new OutputTag(...)

  5. SingleOutputStreamOperator: PatternStream.flatSelect(OutputTag, PatternFlatTimeoutFunction, PatternFlatSelectFunction)

  6. DataStream: SingleOutputStreamOperator.getSideOutput(OutputTag)

  7. OUT: DataStream -> Transformations -> DataStream -> DataSink

FlinkCEP超时不足

和Flink窗口聚合类似,如果使用事件时间和依赖事件生成的水印向前推进,需要后续的事件到达,才会触发窗口进行计算和输出结果。

FlinkCEP超时完整demo

测试结果(followedBy):

Apache FlinkCEP 实现超时状态监控

Apache FlinkCEP 实现超时状态监控

End

在看 Apache FlinkCEP 实现超时状态监控 Apache FlinkCEP 实现超时状态监控

原文  https://mp.weixin.qq.com/s/tKQDoLi_ZUHhjZKp5bCxHQ
正文到此结束
Loading...