Flink1.14学习测试:将数据写入到Hive&Hdfs(二)

Flink1.14学习测试:将数据写入到Hive&Hdfs(二)

参考

接收Kafka数据并写入到Hive (实现思路一)

说明

消息结构(JSON格式)
{"name":"Fznjui","age":16,"gender":"女"}
Kafka表定义

Kafka Table配置 json format。 Schema 结构保持与消息内容结构一致,当消息接收到时即可直接转换。

自定义UDF函数

此处是因为中文分区目录HIve不识别,所以做个转换函数。

测试完整代码

import cn.hutool.core.io.resource.ResourceUtil
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.Expressions.{$, currentTimestamp, dateFormat}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{ApiExpression, DataTypes, FieldExpression, Schema, SqlDialect, TableDescriptor, call}
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.table.functions.ScalarFunction

import java.util.concurrent.TimeUnit

object KafkaToHiveTest1 {

  private val sourceTopic = "ly_test"
  private val kafkaServers = "192.168.11.160:9092,192.168.11.161:9092,192.168.11.162:9092"

  def main(args: Array[String]): Unit = {

    val parameterTool = ParameterTool.fromArgs(args)

    val hiveConfigDir = parameterTool.get("hiveCfgDir", ResourceUtil.getResource("hive/conf").getPath)

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.enableCheckpointing(TimeUnit.SECONDS.toMillis(1), CheckpointingMode.EXACTLY_ONCE)

    val tableEnv = StreamTableEnvironment.create(env)

    val catalogName = "lyTest"
    val hiveCatalog = new HiveCatalog(catalogName, "ly_test", hiveConfigDir)
    tableEnv.registerCatalog(catalogName, hiveCatalog)

    tableEnv.useCatalog(catalogName)

    val sourceTable = "kafkaTable"
    tableEnv.createTemporaryTable(sourceTable, TableDescriptor
      .forConnector("kafka")

      .schema(Schema.newBuilder()
        .column("name", DataTypes.STRING())
        .column("age", DataTypes.INT())
        .column("gender", DataTypes.STRING())
        .build())
      .option("topic", sourceTopic)
      .option("properties.bootstrap.servers", kafkaServers)
      .option("properties.group.id", "KafkaToHiveTest1")
      .option("scan.startup.mode", "latest-offset")

      .option("format", "json")
      .build())

    tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)

    val kafka_sink_hive = "kafka_sink_hive"

    tableEnv.executeSql(
      s"""
         |create table if not exists $kafka_sink_hive (
         | name string,
         | age int,
         | gender string,
         | sink_date_time timestamp(9)
         |) partitioned by (ymd string,sex string)
         |  stored as parquet
         |  tblproperties(
         |    'partition.time-extractor.timestamp-pattern' = '\\u0024ymd 00:00:00',
         |    'sink.partition-commit.policy.kind' = 'metastore'
         |  )
         |""".stripMargin)

    tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)

    tableEnv.from(sourceTable)
      .addColumns(currentTimestamp() as "now")
      .select($"name", $"age", $"gender", $"now",
        dateFormat($("now"), "yyyy-MM-dd") as "ymd",

        call(classOf[GenderConvert], $("gender")).asInstanceOf[ApiExpression] as "sex")
      .executeInsert(kafka_sink_hive)
  }

  class GenderConvert extends ScalarFunction {

    def eval(gender: String): String = {
      gender match {
        case "男" => "boy"
        case "女" => "girl"
        case _ => "other"
      }
    }
  }

}

运行结果

Flink1.14学习测试:将数据写入到Hive&Hdfs(二)

; 查看分区

Flink1.14学习测试:将数据写入到Hive&Hdfs(二)

查看HDFS对应的目录

Flink1.14学习测试:将数据写入到Hive&Hdfs(二)

; 接收Kafka数据并写入到Hive (实现思路二)

本质上练习在上一步已经结束了,此处主要还是为了测试自定义 udtf(表值函数) 函数。目的:使用自定义 udtf 函数解析数据随后入库。(茴字到底有几种写法?)

测试完整代码

import cn.hutool.core.date.DateUtil
import cn.hutool.core.io.resource.ResourceUtil
import cn.hutool.json.JSONUtil
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.api.Expressions.{$, dateFormat}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, FieldExpression, Schema, SqlDialect, TableDescriptor, call}
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.table.functions.TableFunction
import org.apache.flink.types.Row

import java.util.Date
import java.util.concurrent.TimeUnit

object KafkaToHiveTest2 {

  private val sourceTopic = "ly_test"
  private val kafkaServers = "192.168.11.160:9092,192.168.11.161:9092,192.168.11.162:9092"

  def main(args: Array[String]): Unit = {

    val parameterTool = ParameterTool.fromArgs(args)

    val hiveConfigDir = parameterTool.get("hiveCfgDir", ResourceUtil.getResource("hive/conf").getPath)

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.enableCheckpointing(TimeUnit.SECONDS.toMillis(1), CheckpointingMode.EXACTLY_ONCE)

    val tableEnv = StreamTableEnvironment.create(env)

    val catalogName = "lyTest"
    val hiveCatalog = new HiveCatalog(catalogName, "ly_test", hiveConfigDir)
    tableEnv.registerCatalog(catalogName, hiveCatalog)

    tableEnv.useCatalog(catalogName)

    val sourceTable = "kafkaTable"
    tableEnv.createTemporaryTable(sourceTable, TableDescriptor
      .forConnector("kafka")

      .schema(Schema.newBuilder()
        .column("message", DataTypes.STRING())
        .build())
      .option("topic", sourceTopic)
      .option("properties.bootstrap.servers", kafkaServers)
      .option("properties.group.id", "KafkaToHiveTest2")
      .option("scan.startup.mode", "latest-offset")

      .option("format", "raw")
      .build())

    tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)

    val kafka_sink_hive = "kafka_sink_hive"

    tableEnv.executeSql(
      s"""
         |create table if not exists $kafka_sink_hive (
         | name string,
         | age int,
         | gender string,
         | sink_date_time timestamp(9)
         |) partitioned by (ymd string,sex string)
         |  stored as parquet
         |  tblproperties(
         |    'partition.time-extractor.timestamp-pattern' = '\\u0024ymd 00:00:00',
         |    'sink.partition-commit.policy.kind' = 'metastore'
         |  )
         |""".stripMargin)

    tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)

    tableEnv.from(sourceTable)
      .joinLateral(call(classOf[ParseJsonConvert], $("message")))
      .select($"name", $"age", $"gender", $"now",

        dateFormat($("now"), "yyyy-MM-dd") as "ymd", $"sex")
      .executeInsert(kafka_sink_hive)
  }

  class ParseJsonConvert extends TableFunction[Row] {

    @DataTypeHint(value = "Row")
    def eval(message: String): Unit = {
      val json = JSONUtil.parse(message)
      val name = json.getByPath("name", classOf[String])
      val age = json.getByPath("age", classOf[java.lang.Integer])
      val gender = json.getByPath("gender", classOf[String])
      val sex = gender match {
        case "男" => "boy"
        case "女" => "girl"
        case _ => "other"
      }

      val localDateTime = DateUtil.offsetDay(new Date(), 1).toLocalDateTime
      collect(Row.of(name, age, gender, localDateTime, sex))
    }
  }

}

运行结果

Flink1.14学习测试:将数据写入到Hive&Hdfs(二)

; 查看分区

Flink1.14学习测试:将数据写入到Hive&Hdfs(二)

查看HDFS对应的目录

Flink1.14学习测试:将数据写入到Hive&Hdfs(二)
Flink1.14学习测试:将数据写入到Hive&Hdfs(二)

Original: https://blog.csdn.net/baidu_32377671/article/details/125815077
Author: lyanjun
Title: Flink1.14学习测试:将数据写入到Hive&Hdfs(二)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/817883/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球