Flink中Window详解之Window的聚合函数AggregateFunction
原创
wx62be9d88ce294博主文章分类:大数据 ©著作权
文章标签 flink scala apache ide 文章分类 Hadoop 大数据
©著作权归作者所有:来自51CTO博客作者wx62be9d88ce294的原创作品,请联系作者获取转载授权,否则将追究法律责任
和 ReduceFunction 相似,AggregateFunction 也是基于中间状态计算结果的增量计算 函数,但 AggregateFunction 在窗口计算上更加通用。AggregateFunction 接口相对 ReduceFunction 更加灵活,实现复杂度也相对较高。AggregateFunction 接口中定义了三个 需要复写的方法,其中 add()定义数据的添加逻辑,getResult 定义了根据 accumulator 计 算结果的逻辑,merge 方法定义合并 accumulator 的逻辑。
package windowimport org.apache.flink.api.common.functions.AggregateFunctionimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.scala.function.WindowFunctionimport org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindowsimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.util.Collector case class StationLog(sid:String,callOut:String,callInput:String,callType:String,callTime:Long,duration:Long)object AggregatFunctionTest { def main(args: Array[String]): Unit = { val environment = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.streaming.api.scala._ val stream: DataStream[StationLog] = environment.socketTextStream("node1", 8888) .map(line => { val arr: Array[String] = line.split(",") new StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong) }) stream.map(log=>{(log.sid,1)}) .keyBy(_._1) .window(SlidingProcessingTimeWindows.of(Time.seconds(8),Time.seconds(5))) .aggregate(new MyAggregateFuntion,new MyWindowFunction) .print() environment.execute() } class MyWindowFunction extends WindowFunction[Long,(String,Long),String,TimeWindow] { override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[(String, Long)]): Unit = { out.collect((key,input.iterator.next())) } } class MyAggregateFuntion extends AggregateFunction[(String,Int),Long,Long] { override def createAccumulator(): Long = 0 override def add(value: (String, Int), accumulator: Long): Long = accumulator+value._2 override def getResult(accumulator: Long): Long = accumulator override def merge(a: Long, b: Long): Long = a+b }}
- 赞
- 收藏
- 评论
- *举报
下一篇:Flink中Window详解之Window的聚合函数ProcessWindowFunction
Original: https://blog.51cto.com/u_15704423/5434859
Author: wx62be9d88ce294
Title: Flink中Window详解之Window的聚合函数AggregateFunction
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/516942/
转载文章受原作者版权保护。转载请注明原作者出处!