CEP - Complex Event Processing复杂事件处理。
订单下单后超过一定时间还未进行支付确认。
打车订单生成后超过一定时间没有确认上车。
外卖超过预定送达时间一定时限还没有确认送达。
Apache FlinkCEP API
CEPTimeoutEventJob
DataStream 一般由相同类型事件或元素组成,一个DataStream可以通过一系列的转换操作如Filter、Map等转换为另一个DataStream。
PatternStream 是对CEP模式匹配的流的抽象,把DataStream和Pattern组合在一块,然后对外提供select和flatSelect等方法。PatternStream并不是DataStream,它提供方法把匹配的模式序列和与其相关联的事件组成的映射(就是Map<模式名称,List<事件>>)发出去,发到SingleOutputStreamOperator里面,SingleOutputStreamOperator是DataStream。
CEPOperatorUtils工具类里的方法和变量使用了「PatternStream」来命名,比如:
SingleOutputStreamOperator
PatternStream的构造方法:
Pattern是模式定义的Base Class,Builder模式,定义好的模式会被NFACompiler用来生成NFA。
如果想要自己实现类似next和followedBy这种方法,比如timeEnd,对Pattern进行扩展重写应该是可行的。
Quantifier是用来描述具体模式行为的,主要有三大类:
Single-单一匹配、Looping-循环匹配、Times-一定次数或者次数范围内都能匹配到。
每一个模式Pattern可以是optional可选的(单一匹配或循环匹配),并可以设置ConsumingStrategy。
循环和次数也有一个额外的内部ConsumingStrategy,用在模式中接收的事件之间。
EventComparator,自定义事件比较器,实现EventComparator接口。
NFACompiler提供将Pattern编译成NFA或者NFAFactory的方法,使用NFAFactory可以创建多个NFA。
NFA:Non-deterministic finite automaton - 非确定的有限(状态)自动机。
更多内容参见https://zh.wikipedia.org/wiki/非确定有限状态自动机
当一个包含被匹配到的事件的映射能够通过模式名称访问到的时候,PatternSelectFunction的select()方法会被调用。模式名称是由Pattern定义的时候指定的。select()方法恰好返回一个结果,如果需要返回多个结果,则可以实现PatternFlatSelectFunction。
PatternFlatSelectFunction,不是返回一个OUT,而是使用Collector
SelectTimeoutCepOperator是在CEPOperatorUtils中调用createTimeoutPatternStream()方法时创建出来。
SelectTimeoutCepOperator中会被算子迭代调用的方法是processMatchedSequences()和processTimedOutSequences()。
模板方法...对应到抽象类AbstractKeyedCEPPatternOperator中processEvent()方法和advanceTime()方法。
还有FlatSelectTimeoutCepOperator和对应的PatternFlatTimeoutFunction。
CEP是创建PatternStream的工具类,PatternStream只是DataStream和Pattern的组合。
CEPOperatorUtils是在PatternStream的select()方法和flatSelect()方法被调用的时候,去创建SingleOutputStreamOperator(DataStream)。
IN: DataSource -> DataStream -> Transformations -> DataStream
Pattern: Pattern.begin.where.next.where...times...
PatternStream: CEP.pattern(DataStream, Pattern)
DataStream: PatternStream.select(PatternSelectFunction) PatternStream.flatSelect(PatternSelectFunction)
OUT: DataStream -> Transformations -> DataStream -> DataSink
TimeoutCEP的流需要keyBy,即KeyedStream,如果inputStream不是KeyedStream,会new一个0字节的Key(上面CEPOperatorUtils源码里有提到)。
Pattern最后调用within设置窗口时间。 如果是对主键进行分组,一个时间窗口内最多只会匹配出一个超时事件,使用PatternStream.select(...)就可以了。
IN: DataSource -> DataStream -> Transformations -> DataStream -> keyBy -> KeyedStream
Pattern: Pattern.begin.where.next.where...within(Time windowTime)
PatternStream: CEP.pattern(KeyedStream, Pattern)
OutputTag: new OutputTag(...)
SingleOutputStreamOperator: PatternStream.flatSelect(OutputTag, PatternFlatTimeoutFunction, PatternFlatSelectFunction)
DataStream: SingleOutputStreamOperator.getSideOutput(OutputTag)
OUT: DataStream -> Transformations -> DataStream -> DataSink
和Flink窗口聚合类似,如果使用事件时间和依赖事件生成的水印向前推进,需要后续的事件到达,才会触发窗口进行计算和输出结果。
测试结果(followedBy):
End
在看