Flink1.14学习测试:将数据写入到Hive&Hdfs(二)
参考
- Kafka SQL 连接器 :https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/kafka/
- 标量函数(udf) :https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/#%E6%A0%87%E9%87%8F%E5%87%BD%E6%95%B0
- Formats :https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/formats/overview/
接收Kafka数据并写入到Hive (实现思路一)
说明
消息结构(JSON格式)
{"name":"Fznjui","age":16,"gender":"女"}
Kafka表定义
Kafka Table配置 json
format。 Schema
结构保持与消息内容结构一致,当消息接收到时即可直接转换。
自定义UDF函数
此处是因为中文分区目录HIve不识别,所以做个转换函数。
测试完整代码
import cn.hutool.core.io.resource.ResourceUtil
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.Expressions.{$, currentTimestamp, dateFormat}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{ApiExpression, DataTypes, FieldExpression, Schema, SqlDialect, TableDescriptor, call}
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.table.functions.ScalarFunction
import java.util.concurrent.TimeUnit
object KafkaToHiveTest1 {
private val sourceTopic = "ly_test"
private val kafkaServers = "192.168.11.160:9092,192.168.11.161:9092,192.168.11.162:9092"
def main(args: Array[String]): Unit = {
val parameterTool = ParameterTool.fromArgs(args)
val hiveConfigDir = parameterTool.get("hiveCfgDir", ResourceUtil.getResource("hive/conf").getPath)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(1), CheckpointingMode.EXACTLY_ONCE)
val tableEnv = StreamTableEnvironment.create(env)
val catalogName = "lyTest"
val hiveCatalog = new HiveCatalog(catalogName, "ly_test", hiveConfigDir)
tableEnv.registerCatalog(catalogName, hiveCatalog)
tableEnv.useCatalog(catalogName)
val sourceTable = "kafkaTable"
tableEnv.createTemporaryTable(sourceTable, TableDescriptor
.forConnector("kafka")
.schema(Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("gender", DataTypes.STRING())
.build())
.option("topic", sourceTopic)
.option("properties.bootstrap.servers", kafkaServers)
.option("properties.group.id", "KafkaToHiveTest1")
.option("scan.startup.mode", "latest-offset")
.option("format", "json")
.build())
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
val kafka_sink_hive = "kafka_sink_hive"
tableEnv.executeSql(
s"""
|create table if not exists $kafka_sink_hive (
| name string,
| age int,
| gender string,
| sink_date_time timestamp(9)
|) partitioned by (ymd string,sex string)
| stored as parquet
| tblproperties(
| 'partition.time-extractor.timestamp-pattern' = '\\u0024ymd 00:00:00',
| 'sink.partition-commit.policy.kind' = 'metastore'
| )
|""".stripMargin)
tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
tableEnv.from(sourceTable)
.addColumns(currentTimestamp() as "now")
.select($"name", $"age", $"gender", $"now",
dateFormat($("now"), "yyyy-MM-dd") as "ymd",
call(classOf[GenderConvert], $("gender")).asInstanceOf[ApiExpression] as "sex")
.executeInsert(kafka_sink_hive)
}
class GenderConvert extends ScalarFunction {
def eval(gender: String): String = {
gender match {
case "男" => "boy"
case "女" => "girl"
case _ => "other"
}
}
}
}
运行结果
; 查看分区
查看HDFS对应的目录
; 接收Kafka数据并写入到Hive (实现思路二)
本质上练习在上一步已经结束了,此处主要还是为了测试自定义 udtf(表值函数)
函数。目的:使用自定义 udtf
函数解析数据随后入库。(茴字到底有几种写法?)
测试完整代码
import cn.hutool.core.date.DateUtil
import cn.hutool.core.io.resource.ResourceUtil
import cn.hutool.json.JSONUtil
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.api.Expressions.{$, dateFormat}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{DataTypes, FieldExpression, Schema, SqlDialect, TableDescriptor, call}
import org.apache.flink.table.catalog.hive.HiveCatalog
import org.apache.flink.table.functions.TableFunction
import org.apache.flink.types.Row
import java.util.Date
import java.util.concurrent.TimeUnit
object KafkaToHiveTest2 {
private val sourceTopic = "ly_test"
private val kafkaServers = "192.168.11.160:9092,192.168.11.161:9092,192.168.11.162:9092"
def main(args: Array[String]): Unit = {
val parameterTool = ParameterTool.fromArgs(args)
val hiveConfigDir = parameterTool.get("hiveCfgDir", ResourceUtil.getResource("hive/conf").getPath)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(1), CheckpointingMode.EXACTLY_ONCE)
val tableEnv = StreamTableEnvironment.create(env)
val catalogName = "lyTest"
val hiveCatalog = new HiveCatalog(catalogName, "ly_test", hiveConfigDir)
tableEnv.registerCatalog(catalogName, hiveCatalog)
tableEnv.useCatalog(catalogName)
val sourceTable = "kafkaTable"
tableEnv.createTemporaryTable(sourceTable, TableDescriptor
.forConnector("kafka")
.schema(Schema.newBuilder()
.column("message", DataTypes.STRING())
.build())
.option("topic", sourceTopic)
.option("properties.bootstrap.servers", kafkaServers)
.option("properties.group.id", "KafkaToHiveTest2")
.option("scan.startup.mode", "latest-offset")
.option("format", "raw")
.build())
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
val kafka_sink_hive = "kafka_sink_hive"
tableEnv.executeSql(
s"""
|create table if not exists $kafka_sink_hive (
| name string,
| age int,
| gender string,
| sink_date_time timestamp(9)
|) partitioned by (ymd string,sex string)
| stored as parquet
| tblproperties(
| 'partition.time-extractor.timestamp-pattern' = '\\u0024ymd 00:00:00',
| 'sink.partition-commit.policy.kind' = 'metastore'
| )
|""".stripMargin)
tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
tableEnv.from(sourceTable)
.joinLateral(call(classOf[ParseJsonConvert], $("message")))
.select($"name", $"age", $"gender", $"now",
dateFormat($("now"), "yyyy-MM-dd") as "ymd", $"sex")
.executeInsert(kafka_sink_hive)
}
class ParseJsonConvert extends TableFunction[Row] {
@DataTypeHint(value = "Row")
def eval(message: String): Unit = {
val json = JSONUtil.parse(message)
val name = json.getByPath("name", classOf[String])
val age = json.getByPath("age", classOf[java.lang.Integer])
val gender = json.getByPath("gender", classOf[String])
val sex = gender match {
case "男" => "boy"
case "女" => "girl"
case _ => "other"
}
val localDateTime = DateUtil.offsetDay(new Date(), 1).toLocalDateTime
collect(Row.of(name, age, gender, localDateTime, sex))
}
}
}
运行结果
; 查看分区
查看HDFS对应的目录
Original: https://blog.csdn.net/baidu_32377671/article/details/125815077
Author: lyanjun
Title: Flink1.14学习测试:将数据写入到Hive&Hdfs(二)
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/817883/
转载文章受原作者版权保护。转载请注明原作者出处!