**
1. ## 简介
CEP(Complex Event Processing):复杂事件处理,用于在流中筛选符合某种复杂模式的事件.
2. 什么是CEP
- CEP允许在无休止的事件流中检测事件模式,让我们能获取重要的部分.
- 一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想要的信息—满足规则的复杂事件
3. Pattern API
- flink为我们支持了CEP中的Pattern API用来处理流式数据中的复杂事件处理.
- 通过Pattern类中begin方法及where方法等来实现想要的需求.
个体模式
- 包括单例模式和循环模式
- 单例模式只接受一个事件就是一个begin方法加一个where方法等后面不加其他紧挨着.下一个等事件
- 循环模式则是接受多个事件可以复杂事件.
- 量词中有times(参数).ooptical匹配0或参数个.times(1,3)一个或者二个.三个times(4,5).greedy匹配4个或者5个可以重复匹配等
模式序列中也分为严格近邻和宽松近邻和非确定性宽松近邻还可以定义不希望近邻关系a next b 近邻 a followedBy b 宽松近邻 a followedbyAny b 非确定性宽松近邻 还有notNext() notFollowedBy() 不希望出现近邻
##格式注意
- 所有模式序列都必须以.begin()开始
- 模式序列结束不能以.notFollowedBy()结束
- not 类型的模式不能被optical所修饰
- 还可以为模式指定时间约束,用来约束多长时间内匹配有效within()方法
使用
- CEP.parttern()里面放第一个参数为流,第二个就是匹配的模式就得到一个PatternStream流
- 匹配事件时间的提取方法为select方法和flatSelect方法.提取事件
- select方法需要传入一个selectFunction作为参数每个成功的事件序列都会调用它
- select方法以一个Map[String,iterable[IN]]来接收匹配到的事件序列,其中key就是每个模式的名称,而value就是所有接收到的事件的Iterable类型
- 超时事件提取当一个模式以within定义窗口时间,就会有遗漏的数据CEP支持我们通过侧输出流的方式处理超时事件的处理
代码实现
val loginFailPattern = Pattern
.begin[LoginEvent]("fail").where(_.eventType == "fail").times(3).consecutive()
.within(Time.seconds(5))
//2.将模式应用到数据流上,得到一个PattenStream
val patternStream = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPattern)
//3.检出符合模式的数据流,需要调用select
val loginFailWarningStream = patternStream.select(new LoginFailEventMatch())
//实现自定义PatternSelectFunction
class LoginFailEventMatch() extends PatternSelectFunction[LoginEvent,LoginFailWarning]{
override def select(pattern: util.Map[String, util.List[LoginEvent]]): LoginFailWarning = {
//当前匹配到的事件序列,就保存在Map里
// val firstFailEvent = pattern.get("firstFail").iterator().next()
// val secondFailEvent = pattern.get("secondFail").get(0)
val iter = pattern.get("fail").iterator()
val firstFailEvent = iter.next()
val secondFailEvent = iter.next()
val thirdFailEvent = iter.next()
LoginFailWarning(firstFailEvent.userId,firstFailEvent.timestamp,thirdFailEvent.timestamp,"login fail")
}
}
版权声明:本文为weixin_49197104原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。