Flink最全面教程(自己总结的)

1.1 Environment

创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment 会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。


val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment

如果没有设置并行度,会以 flink-conf.yaml 中的配置为准,默认是 1

1.2 Source

在一个本地内存中,生成一个集合作为Flink处理的source。
离线处理代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object ListSource {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive hbase"))
    listDataSet.print()
  }
}

实时处理代码如下:

import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object ListSourceStream {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val listDataStream: DataStream[String] = env.fromCollection(List("hadoop spark","hive hbase"))
    listDataStream.print()
    env.execute("ListSourceStream is runned")
  }
}

导入本地文本数据作为数据源。
离线处理代码如下:

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}

object FileSource {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val fileDataSet = env.readTextFile("C:\\Users\\thinkpad\\Desktop\\words.txt")
fileDataSet.print()
  }
}

实时处理代码如下:

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object FileSourceStream {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val fileDataStream: DataStream[String] = env.readTextFile("C:\\Users\\thinkpad\\Desktop\\words.txt")
    fileDataStream.print()
    env.execute("FileSourceStream is runned")
  }
}

读取hdfs文件,作为数据源。
离线处理代码如下:

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}

object hdfsSource {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val hdfsDataSet: DataSet[String] = env.readTextFile("hdfs://linux01:9000/a.txt")
    hdfsDataSet.print()
  }
}

实时处理代码如下:

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object hdfsSourceStream {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val hdfsDataStream: DataStream[String] = env.readTextFile("hdfs://linux01:9000/a.txt")
    hdfsDataStream.print()
    env.execute("hdfsSourceStream is runned")
  }
}

处理代码如下:

import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.api.scala._

object kafkaSourceStream {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val props = new Properties()
    props.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092")
    props.setProperty("group.id", "consumer-group")
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.setProperty("auto.offset.reset", "latest")

    val kafkaDataStream: DataStream[String] =
env.addSource(new FlinkKafkaConsumer010[String]("test",new SimpleStringSchema(),props))
    kafkaDataStream.print()
    env.execute("kafkaSourceStream is runned")
  }
}

除了以上的source数据来源,我们还可以自定义source,只是继承SourceFunction即可。
自定义source代码如下:

import org.apache.flink.streaming.api.functions.source.SourceFunction

class MySource extends SourceFunction[String] {

  var running = true

  override def cancel(): Unit = {
    running = false
  }

  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
    val data: Range.Inclusive = 1.to(10)
    while (running) {
      data.foreach(t => {
        sourceContext.collect(t.toString)
      })
    }
  }
}

调用自定义source代码如下:

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

object DefineSource {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val defineSource: DataStream[String] = env.addSource(new MySource())
    defineSource.print()
    env.execute("DefineSource is runned")
  }
}

1.3 Sink

sink 也就是Flink运行完后,最终要将数据输出到哪儿。

将数据最终输出到内存中的集合中。
示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object listSink {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[String] = env.fromCollection(List("hadoop","spark","hive"))
    val list: Seq[String] = listDataSet.collect()
    list.foreach(println(_))
  }
}

1.3.2基于本地文件的sink
将结果输出到本地文件系统中。
示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object fileSink {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val fileDataSet: DataSet[String] = env.fromCollection(List("hadoop","spark"))
    fileDataSet.writeAsText("C:\\Users\\thinkpad\\Desktop\\print.txt")
    env.execute("fileSink is runned")
  }
}

将结果输出到hdfs文件系统中。
示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object hdfsSink {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val hdfsDataSet: DataSet[String] = env.fromCollection(List("hadoop","spark"))
    hdfsDataSet.writeAsText("hdfs://linux01:9000/hdfsSink")
    env.execute("hdfsSink is runned")
  }
}

将结果输出到kafka文件系统中,用flink作为kafka的生产者。
示例代码如下:

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010

object kafkaSink {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val props = new Properties()
    props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
    props.setProperty("group.id", "consumer-group")
    props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.setProperty("auto.offset.reset", "latest")
    val listDataStream: DataStream[String] = env.fromCollection(List("hadoop","spark"))
    listDataStream.addSink(new FlinkKafkaProducer010[String]("linux01:9092,linux02:9092,linux03:9092","test",new SimpleStringSchema()))
    env.execute("kafkaSink is runned")
  }
}

将计算结果存储到关系数据库中,如mysql等。
导入依赖:

 <dependency>
     <groupId>mysqlgroupId>
     <artifactId>mysql-connector-javaartifactId>
     <version>5.1.47version>
dependency>

实现MyJdbcSink类,继承RichSinkFunction,用来是实现保存到mysql中调用的命令。

import java.sql
import java.sql.DriverManager
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}

class MyJdbcSink extends RichSinkFunction[String] {

  var conn: Connection = _
  var prepare: PreparedStatement = _

  override def open(parameters: Configuration): Unit = {
    conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata",
      "root", "root")
    prepare= conn.prepareStatement("INSERT INTO infoTest VALUES (?, ?)")
  }

  override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
    prepare.setString(1,value)
    prepare.setString(2,value)
    prepare.execute()
  }

  override def close(): Unit = {
    prepare.close()
    conn.close()
  }
}

将结果写入mysql,调用自定义mysql类,代码如下:

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._

object mysqlSInk {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val listDataSream: DataStream[String] = env.fromCollection(List("hadoop","spark"))
    listDataSream.addSink(new MyJdbcSink())
    env.execute("mysqlSInk is runned")
  }
}

将计算结果存储到redis非关系数据库中。
导入flink-redis依赖:

<dependency>
    <groupId>org.apache.flinkgroupId>
    <artifactId>flink-connector-redis_2.11artifactId>
    <version>1.1.0version>
dependency>

定义一个redis的mapper类,继承RedisMapper类,用于定义保存到 redis时调用的命令,代码如下:

import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

class MyRedisMapper extends RedisMapper[String]{

  override def getCommandDescription: RedisCommandDescription = {
    new RedisCommandDescription(RedisCommand.HSET,"redis")
  }

  override def getKeyFromData(t: String): String = {
    t.hashCode.toString
  }

  override def getValueFromData(t: String): String = {
    t
  }
}

将结果输入到redis代码如下:

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig

object RedisSink {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val listDataStream: DataStream[String] = env.fromCollection(List("hadoop","spark"))
    val conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).setDatabase(0).build()
    listDataStream.addSink(new RedisSink[String](conf,new MyRedisMapper))
    env.execute("RedisSink is runned")
  }
}

1.4 Transform

在flink中有类似于spark的一类转换算子,就是transform,在Flink的编程体系中,我们获取到数据源之后,需要经过一系列的处理即transformation操作,再将最终结果输出到目的Sink使数据落地。

常用的transform转换算子如下:

Transformation说明map将DataSet中的每一个元素转换为另外一个元素flatMap将DataSet中的每一个元素转换为0…n个元素mapPartition将一个分区中的元素转换为另一个元素filter过滤出来一些符合条件的元素reduce可以对一个dataset或者一个group来进行聚合计算,最终聚合成一个元素reduceGroup将一个dataset或者一个group聚合成一个或多个元素aggregate按照内置的方式来进行聚合。例如:SUM/MIN/MAX…distinct去重join将两个DataSet按照一定条件连接到一起,形成新的DataSetunion将两个DataSet取并集,并自动进行去重KeyBy逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的Split根据某些特征把一个 DataStream 拆分成两个或者多个Select从一个 SplitStream 中获取一个或者多个 DataStreamConnect连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。CoMap,CoFlatMap跟map and flatMap类似,只不过作用在ConnectedStreams上rebalance让每个分区的数据均匀分布,避免数据倾斜partitionByHash按照指定的key进行hash分区sortPartition指定字段对分区中的数据进行排序

将DataSet中的每一个元素转换为另外一种形式的元素
示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object Transform {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[Int] = env.fromCollection(List(1,2,3))
    val result: DataSet[Int] = listDataSet.map(_*2)
    result.print()
  }
}

flatMap也是一种类似于遍历循环,是将每一个元素按照特定的标识切分,变成多个元素。
如将集合中每个元素按照空格切分。

示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object Transform {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive mysql","hbase kafka"))
    val result: DataSet[String] = listDataSet.flatMap(_.split(" "))
    result.print()
  }
}

mapPartition:中的函数是在每个分区运行一次

map :每个元素运行一次

mapPartition是按照分区进行处理数据,传入是一个迭代,是将分区中的元素进行转换,map 和 mapPartition 的效果是一样的,但如果在map的函数中,需要访问一些外部存储。例如:
访问mysql数据库,需要打开连接,此时map效率较低。而使用 mapPartition 可以有效减少连接数,提高效率。

示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object Transform {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive mysql","hbase kafka"))
    val result: DataSet[String] = listDataSet.mapPartition(iter => {
      iter.flatMap(_.split(" "))
    })
    result.print()
  }
}

filter是遍历循环dataset中每一个元素,filter中满足表达式的过滤出来,不满足表达式的过滤掉。

示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object Transform {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[String] = env.fromCollection(List("hadoop spark","hive","hbase kafka"))
    val result: DataSet[String] = listDataSet.filter(_.length>=5)
    result.print()
  }
}

reduce是对一个 dataset 或者一个 group 来进行聚合计算,按照表达逻辑最终聚合成一个元素。

示例代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

object Transform {
  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val listDataSet: DataSet[Int] = env.fromCollection(List(1,2,3,4))
    val result = listDataSet.reduce(_+_)
    result.print()
  }
}

2.1 Window概述

streaming 流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而 window 是一种切割无限数据为有限块进行处理的手段。
Window 是无限数据流处理的核心,Window 将一个无限的 stream 拆分成有

2.2 Window类型

Window 可以分成两类:CountWindow:按照指定的数据条数生成一个 Window,与时间无关;TimeWindow:按照时间生成 Window。

CountWindow 根据窗口中相同 key 元素的数量来触发执行,执行时只计算元素数量达到窗口大小的 key 对应的结果。

注意:CountWindow 的 window_size 指的是相同 Key 的元素的个数,不是输入的所有元素的总数。

默认的 CountWindow 是一个滚动窗口,只需要指定窗口大小即可,当相同key元素数量达到窗口大小时,就会触发窗口的执行。

object Windows {
    def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        val file: DataStream[String] = env.socketTextStream("node01",9999)
        val countStream: DataStream[(String, Int)] = file.flatMap(_.split(" "))
            .map((_, 1))
            .keyBy(0)
            .countWindow(2)
            .sum(1)
        countStream.print()
        env.execute("Windows is runned")
    }
}

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个是 window_size,一个是 sliding_size。

下面代码中的 sliding_size 设置为了 2,也就是说,每收到两个相同 key 的数据就计算一次,每一次计算的 window 范围是 5 个元素。

object Windows {
    def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        val file: DataStream[String] = env.socketTextStream("node01",9999)
        val countStream: DataStream[(String, Int)] = file.flatMap(_.split(" "))
            .map((_, 1))
            .keyBy(0)
            .countWindow(5,2)
            .sum(1)
        countStream.print()
        env.execute("Windows is runned")
    }
}

对于 TimeWindow,可以根据窗口实现原理的不同分成三类:

  • 滚动窗口(Tumbling Window)

将数据依据固定的窗口长度对数据进行切片。
特点:时间对齐,窗口长度固定,没有重叠。所有的数据只能落在一个窗口里面
滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个 5 分钟大小的滚动窗口

适用场景: 适合做 BI 统计等(做每个时间段的聚合计算)。

  • 滑动窗口(Sliding Window)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。一次数据统计的时间长度 每次统计移动多长的时间
特点:时间对齐,窗口长度固定,可以有重叠。一个数据可以被统计多次,滑动间隔、窗口长度是某个数值的整数倍
滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。
例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包含着上个 10 分钟产生的数据

  • 会话窗口(Session Window)

电商网站: 登录一个系统之后,多长时间没有操作,session就失效。

手机银行: 登录一个系统之后,多长时间没有操作,session就失效要求重新登录。

由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。

特点: 时间无对齐。多长时间之内没有收到数据,这个不是人为能规定的。

session 窗口分配器通过 session 活动来对元素进行分组,session 窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那这个窗口就会关闭。一个 session 窗口通过一个 session 间隔来配置,这个 session 间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的 session 将关闭并且后续的元素将被分配到新的 session 窗口中去

2.2 Window Function

window function 定义了要对窗口中收集的数据做的计算操作,主要可以分为两类:

每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有ReduceFunction, AggregateFunction。

先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。ProcessWindowFunction 就是一个全窗口函数。

2.3 Window 其他操作

触发器 定义 window 什么时候关闭,触发计算并输出结果

移除器 定义移除某些数据的逻辑

允许处理迟到的数据

将迟到的数据放入侧输出流

获取侧输出流

3.1 概述

Table API是流处理和批处理通用的关系型 API,Table API 可以基于流输入或者批输入来运行而不需要进行任何修改。Table API 是 SQL 语言的超集并专门为 Apache Flink 设计的,Table API 是 Scala 和 Java 语言集成式的 API。与常规 SQL 语言中将查询指定为字符串不同,Table API 查询是以 Java 或 Scala 中的语言嵌入样式来定义的,具有 IDE 支持如:自动完成和语法检测;允许以非常直观的方式组合关系运算符的查询,例如 select,filter 和 join。Flink SQL 的支持是基于实现了SQL标准的 Apache Calcite。无论输入是批输入(DataSet)还是流输入(DataStream),任一接口中指定的查询都具有相同的语义并指定相同的结果。

3.2 Table API


<dependency>
    <groupId>org.apache.flinkgroupId>
    <artifactId>flink-tableartifactId>
    <version>1.9.1version>
    <type>pomtype>
dependency>
<dependency>
    <groupId>org.apache.flinkgroupId>
    <artifactId>flink-table-planner_2.11artifactId>
    <version>1.9.1version>
dependency>
<dependency>
    <groupId>org.apache.flinkgroupId>
    <artifactId>flink-table-api-java-bridge_2.11artifactId>
    <version>1.9.1version>
dependency>
<dependency>
    <groupId>org.apache.flinkgroupId>
    <artifactId>flink-table-api-scala-bridge_2.11artifactId>
    <version>1.9.1version>
dependency>
<dependency>
    <groupId>org.apache.flinkgroupId>
    <artifactId>flink-table-api-scala-bridge_2.11artifactId>
    <version>1.9.1version>
dependency>
<dependency>
    <groupId>org.apache.flinkgroupId>
    <artifactId>flink-table-commonartifactId>
    <version>1.9.1version>
dependency>

TableEnvironment 是 Table API 和SQL集成的核心概念,它负责:

  • 在内部目录中注册表
  • 注册外部目录
  • 执行SQL查询
  • 注册用户定义的函数
  • DataStream 或 DataSet 转换为 Table
  • 持有 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
    Table总是与特定的TableEnvironment 绑定。不能在同一查询中组合不同 TableEnvironments 的表(例如,union 或 join)。

创建 TableEnvironment:


val sEnv = StreamExecutionEnvironment.getExecutionEnvironment

val sTableEnv = StreamTableEnvironment.create(sEnv)

val bEnv: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val bTableEnv: BatchTableEnvironment = BatchTableEnvironment.create(bEnv)

数据加载通常有两种:一者基于流/批,一者基于TableSource,但是后者在Flink1.11中已经被废弃,所以不建议使用。

case class Student(id:Int,name:String,age:Int,gender:String,course:String,score:Int)
object FlinkBatchTableOps {
    def main(args: Array[String]): Unit = {

        val env = ExecutionEnvironment.getExecutionEnvironment
        val bTEnv = BatchTableEnvironment.create(env)
        val dataSets: DataSet[Student] = env.readCsvFile[Student]("E:\\data\\student.csv",

            ignoreFirstLine = true,

            fieldDelimiter = "|")

        val table: Table = bTEnv.fromDataSet(dataSets)

        val result: Table = table.select("name,age").where("age=25")

        bTEnv.toDataSet[Row](result).print()
    }
}
case class Goods(id: Int,brand:String,category:String)
object FlinkStreamTableOps {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val sTEnv = StreamTableEnvironment.create(env)
        val dataStream: DataStream[Goods] = env.fromElements(
            "001|mi|mobile",
            "002|mi|mobile",
            "003|mi|mobile",
            "004|mi|mobile",
            "005|huawei|mobile",
            "006|huawei|mobile",
            "007|huawei|mobile",
            "008|Oppo|mobile",
            "009|Oppo|mobile",
            "010|uniqlo|clothing",
            "011|uniqlo|clothing",
            "012|uniqlo|clothing",
            "013|uniqlo|clothing",
            "014|uniqlo|clothing",
            "015|selected|clothing",
            "016|selected|clothing",
            "017|selected|clothing",
            "018|Armani|clothing",
            "019|lining|sports",
            "020|nike|sports",
            "021|adidas|sports",
            "022|nike|sports",
            "023|anta|sports",
            "024|lining|sports"
        ).map(line => {
            val fields = line.split("\\|")
            Goods(fields(0), fields(1), fields(2))
        })

        var table = sTEnv.fromDataStream(dataStream)

        table.printSchema()

        table = table.select("category").distinct()

        sTEnv.toRetractStream[Row](table).print()
        env.execute("FlinkStreamTableOps")
    }
}
object FlinkSQLOps {
    def main(args: Array[String]): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        val sTEnv = BatchTableEnvironment.create(env)
        val dataStream: DataSet[Goods] = env.fromElements(
            "001|mi|mobile",
            "002|mi|mobile",
            "003|mi|mobile",
            "004|mi|mobile",
            "005|huawei|mobile",
            "006|huawei|mobile",
            "007|huawei|mobile",
            "008|Oppo|mobile",
            "009|Oppo|mobile",
            "010|uniqlo|clothing",
            "011|uniqlo|clothing",
            "012|uniqlo|clothing",
            "013|uniqlo|clothing",
            "014|uniqlo|clothing",
            "015|selected|clothing",
            "016|selected|clothing",
            "017|selected|clothing",
            "018|Armani|clothing",
            "019|lining|sports",
            "020|nike|sports",
            "021|adidas|sports",
            "022|nike|sports",
            "023|anta|sports",
            "024|lining|sports"
        ).map(line => {
            val fields = line.split("\\|")
            Goods(fields(0), fields(1), fields(2))
        })

        sTEnv.registerTable("goods", dataStream)

        var sql =
"""
              |select
              |   id,
              |   brand,
              |   category
              |from goods
              |""".stripMargin
        sql =
"""
              |select
              |   category,
              |   count(1) counts
              |from goods
              |group by category
              |order by counts desc
              |""".stripMargin
        table = sTEnv.sqlQuery(sql)
        sTEnv.toDataSet[Row](table).print()
    }
}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.Table
import org.apache.flink.types.Row

object FlinkTrumblingWindowTableOps {
    def main(args: Array[String]): Unit = {

        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)

        val tblEnv = StreamTableEnvironment.create(env)

        val ds = env.socketTextStream("node01", 9999)
                .map(line => {
                    val fields = line.split("\t")
                    UserLogin(fields(0), fields(1), fields(2), fields(3).toInt,
 fields(4))
                })
                .assignTimestampsAndWatermarks(
                    new BoundedOutOfOrdernessTimestampExtractor[UserLogin](Time.seconds(2)) {
                        override def extractTimestamp(userLogin: UserLogin): Long = {
                            userLogin.dataUnix * 1000
                        }
                    }
                )

        import org.apache.flink.table.api.scala._
        val table: Table = tblEnv.fromDataStream[UserLogin](ds , 'platform, 'server, 'status, 'ts.rowtime)

        tblEnv.sqlQuery(
            s"""
               |select
               |platform,
               |count(1) counts
               |from ${table}
               |where status = 'LOGIN'
               |group by platform, tumble(ts,interval '2' second)
               |""".stripMargin)
            .toAppendStream[Row]
            .print("每隔2秒不同平台登录用户->")
        env.execute()
    }
}

case class UserLogin(platform: String, server: String, uid: String,  dataUnix: Int, status: String)
object FlinkTrumblingWindowTableOps2 {
    def main(args: Array[String]): Unit = {

        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)

        val tblEnv = StreamTableEnvironment.create(env)

        val ds = env.socketTextStream("node01", 9999)
                .map(line => {
                    val fields = line.split("\t")
                    UserLogin(fields(0), fields(1), fields(2), fields(3).toInt, fields(4))
                })

        import org.apache.flink.table.api.scala._
        val table: Table = tblEnv.fromDataStream[UserLogin](ds , 'platform, 'server, 'status, 'ts.proctime)

        tblEnv.sqlQuery(
            s"""
               |select
               |  platform,
               |  count(1) counts
               |from ${table}
               |where status = 'LOGIN'
               |group by platform, tumble(ts,interval '2' second)
               |""".stripMargin)
            .toAppendStream[Row]
            .print("prcotime-每隔2秒不同平台登录用户->")
        env.execute()
    }
}

3.3 Flink Table UDF

自定义标量函数(User Defined Scalar Function)。一行输入一行输出。

某个用户在某个时刻浏览了某个商品,以及商品的价值

{"userID": 2, "eventTime": "2020-10-01 10:02:00", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 1, "eventTime": "2020-10-01 10:02:02", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 1, "eventTime": "2020-10-01 10:02:10", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 1, "eventTime": "2020-10-01 10:02:12", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99
{"userID": 2, "eventTime": "2020-10-01 10:02:06", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 1, "eventTime": "2020-10-01 10:02:15", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
{"userID": 1, "eventTime": "2020-10-01 10:02:16", "eventType": "browse", "productID": "product_5", "productPrice": 20.99}
  • UDF时间转换
  • UDF需要继承 ScalarFunction抽象类,主要实现eval方法。
  • 自定义UDF,实现将eventTime转化为时间戳
object FlinkTableUDFOps {
    def main(args: Array[String]): Unit = {
        val env = ExecutionEnvironment.getExecutionEnvironment
        val bTEnv = BatchTableEnvironment.create(env)
        val ds = env.fromElements(
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:00\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:02\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:10\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:12\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 2, \"eventTime\": \"2020-10-01 10:02:06\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:15\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}",
            "{\"userID\": 1, \"eventTime\": \"2020-10-01 10:02:16\", \"eventType\": \"browse\", \"productID\": \"product_5\", \"productPrice\": 20.99}"
        ).map(line => {
            val jsonObj = new JSONObject(line)
            val userID = jsonObj.getInt("userID")
            val eventTime = jsonObj.getString("eventTime")
            val eventType = jsonObj.getString("eventType")
            val productID = jsonObj.getString("productID")
            val productPrice = jsonObj.getDouble("productPrice")
            UserBrowseLog(userID, eventTime, eventType, productID, productPrice)
        })

        bTEnv.registerFunction("to_time", new TimeScalarFunction())
        bTEnv.registerFunction("myLen", new LenScalarFunction())
        val table = bTEnv.fromDataSet(ds)
        val sql =
            s"""
              |select
              |  userID,
              |  eventTime,
              |  myLen(eventTime) my_len_et,
              |  to_time(eventTime) timestamps
              |from ${table}
              |""".stripMargin
        val ret = bTEnv.sqlQuery(sql)

        bTEnv.toDataSet[Row](ret).print
    }
}
case class UserBrowseLog(
    userID: Int,
    eventTime: String,
    eventType: String,
    productID: String,
    productPrice: Double
)

class TimeScalarFunction extends ScalarFunction {

    private val df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    def eval(eventTime: String): Long = {
        df.parse(eventTime).getTime
    }
}

class LenScalarFunction extends ScalarFunction {

    def eval(str: String): Int = {
        str.length
    }
}

Original: https://blog.csdn.net/MoLeft/article/details/124613097
Author: Moleft
Title: Flink最全面教程(自己总结的)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/722739/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球