Flink的CEP编程之CEP案例(找到哪些用户名是恶意登录)

Flink的CEP编程之CEP案例(找到哪些用户名是恶意登录)

原创

wx62be9d88ce294博主文章分类:大数据 ©著作权

文章标签 flink scala apache 用户名 文章分类 Hadoop 大数据

©著作权归作者所有:来自51CTO博客作者wx62be9d88ce294的原创作品,请联系作者获取转载授权,否则将追究法律责任

需求:

从一堆的登录日志中,匹配一个恶意登录的模式(如果一个用户连续失败三次, 则是恶意登录),从而找到哪些用户名是恶意登录。

package cepimport org.apache.flink.cep.PatternSelectFunctionimport org.apache.flink.cep.scala.{CEP, PatternStream}import org.apache.flink.cep.scala.pattern.Patternimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.time.Timeimport java.util         case class LoginEvent(id:Long,name:String,eventType:String,eventTime:Long)object TestCEPByLogin {  def main(args: Array[String]): Unit = {    val streamEvn = StreamExecutionEnvironment.getExecutionEnvironment    streamEvn.setParallelism(1)    streamEvn.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)    import org.apache.flink.streaming.api.scala._            val stream: DataStream[LoginEvent] = streamEvn.fromCollection(List(      new LoginEvent(1, "yqq", "fail", 1577080451),      new LoginEvent(2, "yqq", "fail", 1577080452),      new LoginEvent(3, "yqq", "fail", 1577080453),      new LoginEvent(4, "zifan", "fail", 1577080459),      new LoginEvent(4, "zifan", "success", 1577080460),      new LoginEvent(5, "yqq", "fail", 1577080463)    )).assignAscendingTimestamps(_.eventTime*1000)         val pattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("start").where(_.eventType.equals("fail"))      .next("fail2").where(_.eventType.equals("fail"))      .next("fail3").where(_.eventType.equals("fail"))      .within(Time.seconds(10))         val patternStream: PatternStream[LoginEvent] = CEP.pattern(stream.keyBy(_.name), pattern)         val result: DataStream[String] = patternStream.select(new PatternSelectFunction[LoginEvent, String] {      override def select(map: util.Map[String, util.List[LoginEvent]]): String = {        val keyIter: util.Iterator[String] = map.keySet().iterator()        val e1: LoginEvent = map.get(keyIter.next()).iterator().next()        val e2: LoginEvent = map.get(keyIter.next()).iterator().next()        val e3: LoginEvent = map.get(keyIter.next()).iterator().next()        "用户名:" + e1.name + "登录时间" + ":" + e1.eventTime + ":" + e2.eventTime + ":" + e3.eventTime      }    })    result.print()    streamEvn.execute()  }}

Flink的CEP编程之CEP案例(找到哪些用户名是恶意登录)
  • 收藏
  • 评论
  • *举报

上一篇:Flink的CEP编程之Pattern API

下一篇:Flink性能优化之Flink内存优化

Original: https://blog.51cto.com/u_15704423/5434865
Author: wx62be9d88ce294
Title: Flink的CEP编程之CEP案例(找到哪些用户名是恶意登录)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/516930/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球