Flink中Window详解之Window的聚合函数AggregateFunction

Flink中Window详解之Window的聚合函数AggregateFunction

原创

wx62be9d88ce294博主文章分类:大数据 ©著作权

文章标签 flink scala apache ide 文章分类 Hadoop 大数据

©著作权归作者所有:来自51CTO博客作者wx62be9d88ce294的原创作品,请联系作者获取转载授权,否则将追究法律责任

和 ReduceFunction 相似,AggregateFunction 也是基于中间状态计算结果的增量计算 函数,但 AggregateFunction 在窗口计算上更加通用。AggregateFunction 接口相对 ReduceFunction 更加灵活,实现复杂度也相对较高。AggregateFunction 接口中定义了三个 需要复写的方法,其中 add()定义数据的添加逻辑,getResult 定义了根据 accumulator 计 算结果的逻辑,merge 方法定义合并 accumulator 的逻辑。

package windowimport org.apache.flink.api.common.functions.AggregateFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.scala.function.WindowFunctionimport org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindowsimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collector    case class StationLog(sid:String,callOut:String,callInput:String,callType:String,callTime:Long,duration:Long)object AggregatFunctionTest {  def main(args: Array[String]): Unit = {    val environment = StreamExecutionEnvironment.getExecutionEnvironment    import org.apache.flink.streaming.api.scala._            val stream: DataStream[StationLog] = environment.socketTextStream("node1", 8888)      .map(line => {        val arr: Array[String] = line.split(",")        new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)      })        stream.map(log=>{(log.sid,1)})      .keyBy(_._1)      .window(SlidingProcessingTimeWindows.of(Time.seconds(8),Time.seconds(5)))      .aggregate(new MyAggregateFuntion,new MyWindowFunction)      .print()    environment.execute()  }    class MyWindowFunction extends WindowFunction[Long,(String,Long),String,TimeWindow] {    override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[(String, Long)]): Unit = {      out.collect((key,input.iterator.next()))    }  }  class MyAggregateFuntion extends AggregateFunction[(String,Int),Long,Long] {        override def createAccumulator(): Long = 0        override def add(value: (String, Int), accumulator: Long): Long = accumulator+value._2        override def getResult(accumulator: Long): Long = accumulator    override def merge(a: Long, b: Long): Long = a+b  }}

Flink中Window详解之Window的聚合函数AggregateFunction
Flink中Window详解之Window的聚合函数AggregateFunction
  • 收藏
  • 评论
  • *举报

上一篇:Flink中Window详解之Window的API

下一篇:Flink中Window详解之Window的聚合函数ProcessWindowFunction

Original: https://blog.51cto.com/u_15704423/5434859
Author: wx62be9d88ce294
Title: Flink中Window详解之Window的聚合函数AggregateFunction

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/516942/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球