Flink的CEP编程之CEP案例(找到哪些用户名是恶意登录)
原创
wx62be9d88ce294博主文章分类:大数据 ©著作权
文章标签 flink scala apache 用户名 文章分类 Hadoop 大数据
©著作权归作者所有:来自51CTO博客作者wx62be9d88ce294的原创作品,请联系作者获取转载授权,否则将追究法律责任
需求:
从一堆的登录日志中,匹配一个恶意登录的模式(如果一个用户连续失败三次, 则是恶意登录),从而找到哪些用户名是恶意登录。
package cepimport org.apache.flink.cep.PatternSelectFunctionimport org.apache.flink.cep.scala.{CEP, PatternStream}import org.apache.flink.cep.scala.pattern.Patternimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.time.Timeimport java.util case class LoginEvent(id:Long,name:String,eventType:String,eventTime:Long)object TestCEPByLogin { def main(args: Array[String]): Unit = { val streamEvn = StreamExecutionEnvironment.getExecutionEnvironment streamEvn.setParallelism(1) streamEvn.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) import org.apache.flink.streaming.api.scala._ val stream: DataStream[LoginEvent] = streamEvn.fromCollection(List( new LoginEvent(1, "yqq", "fail", 1577080451), new LoginEvent(2, "yqq", "fail", 1577080452), new LoginEvent(3, "yqq", "fail", 1577080453), new LoginEvent(4, "zifan", "fail", 1577080459), new LoginEvent(4, "zifan", "success", 1577080460), new LoginEvent(5, "yqq", "fail", 1577080463) )).assignAscendingTimestamps(_.eventTime*1000) val pattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("start").where(_.eventType.equals("fail")) .next("fail2").where(_.eventType.equals("fail")) .next("fail3").where(_.eventType.equals("fail")) .within(Time.seconds(10)) val patternStream: PatternStream[LoginEvent] = CEP.pattern(stream.keyBy(_.name), pattern) val result: DataStream[String] = patternStream.select(new PatternSelectFunction[LoginEvent, String] { override def select(map: util.Map[String, util.List[LoginEvent]]): String = { val keyIter: util.Iterator[String] = map.keySet().iterator() val e1: LoginEvent = map.get(keyIter.next()).iterator().next() val e2: LoginEvent = map.get(keyIter.next()).iterator().next() val e3: LoginEvent = map.get(keyIter.next()).iterator().next() "用户名:" + e1.name + "登录时间" + ":" + e1.eventTime + ":" + e2.eventTime + ":" + e3.eventTime } }) result.print() streamEvn.execute() }}
- 赞
- 收藏
- 评论
- *举报
Original: https://blog.51cto.com/u_15704423/5434865
Author: wx62be9d88ce294
Title: Flink的CEP编程之CEP案例(找到哪些用户名是恶意登录)
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/516930/
转载文章受原作者版权保护。转载请注明原作者出处!