Hudi学习五:Hudi与Hive集成

一、Hudi与Hive集成架构

Hudi学习五:Hudi与Hive集成

二、环境准备

1、Hive安装 ….(参考)

2、拷贝jar包

将编译好的Hudi目录(hudi-0.9.0/packaging/hudi-hadoop-mr-bundle/target/)下的JAR包: hudi-hadoop-mr-bundle-0.9.0.jar,放入hive安装文件的lib目录下

说明:我编译的hudi-0.9.0版本,里面的hive版本是2.3.1没有做修改

Hudi学习五:Hudi与Hive集成

3、重启hive两个服务metastore和hiveserver2

bin/hive --service metastore
bin/hive --service hiveserver2

可以后台启动这两服务

nohup ./bin/hive --service metastore &
nohup ./bin/hive --service hiveserver2 &

三、手动将Huid表的数据同步到Hive

1、将外部数据写入hudi

1)、数据格式

order_id    product_id city_id    district   county type   combo_type traffic_type   passenger_count    driver_product_id  start_dest_distance    arrive_time    departure_time pre_total_fee  normal_time    bubble_trace_id    product_1level dest_lng   dest_lat   starting_lng   starting_lat   year   month  day
17592880231474 3  83 0898   460106 0  0  0  0  3  3806   2017-05-26 00:04:47    2017-05-26 00:02:43    11 14 d88a957f7f1ff9fae80a9791103f0303   3  110.3463   20.0226    110.3249   20.0212    2017   05 26
17592880435172 3  83 0898   460106 0  0  0  0  3  3557   2017-05-26 00:16:07    2017-05-26 00:13:12    11 8  a692221c507544783a0519b810390303   3  110.3285   20.0212    110.3133   20.0041    2017   05 26
17592880622846 3  83 0898   460108 0  0  0  0  3  3950   2017-05-26 01:05:53    2017-05-26 01:03:25    12 8  c0a80166cf3529b8a11ef1af10440303   3  110.3635   20.0061    110.3561   20.0219    2017   05 26
17592880665344 3  83 0898   460106 0  0  0  0  3  2265   2017-05-26 00:51:31    2017-05-26 00:48:24    9  6  6446aa1459270ad8255c9d6e26e5ff02   3  110.3172   19.9907    110.3064   20.0005    2017   05 26
17592880763217 3  83 0898   460106 0  0  0  0  3  7171   0000-00-00 00:00:00    2017-05-26 00:55:16    20 NULL   64469e3e59270c7308f066ae2187a102   3  110.3384   20.0622    110.3347   20.0269    2017   05 26
17592880885186 3  83 0898   460107 0  0  0  0  3  8368   2017-05-26 02:00:15    2017-05-26 01:54:26    24 15 6446a13459271a517a8435b41aa8a002   3  110.3397   20.0395    110.3541   19.9947    2017   05 26
17592881134529 3  83 0898   460106 0  0  0  0  3  4304   2017-05-26 03:38:13    2017-05-26 03:33:24    13 NULL   64469e3b59273182744d550020dd6f02   3  110.3608   20.027 110.3435   20.0444    2017   05 26

2)、将数据写入hudi表

object SparkOperatorHudiCOW {

  def main(args: Array[String]): Unit = {

    val spark = SparkUtils.createSparkSessionEnableHive(this.getClass)

    //1、加载外部数据
    val path: String = "file:///G:/bigdata-parent/bigdata-hudi/datas/dwv_order_make_haikou_1.txt"

    val df: DataFrame = spark.read
      .option("sep", "\\t") // 设置分隔符为制表符
      .option("header", "true") // 文件首行为列名称
      .option("inferSchema", "true") // 依据数值自动推断数据类型
      .csv(path) // 指定文件路径

    //2、对数据进行ETL转换操作:指定ts和partition_path列
    val insertDF: DataFrame = df
      .withColumn("partition_path", concat_ws("-", col("year"), col("month"), col("day"))) //将分区字段设置成yyyy-mm-dd格式
      .drop("year", "month", "day") //删除单独的年月日三列数据
      .withColumn("ts", unix_timestamp(col("departure_time"), "yyyy-MM-dd HH:mm:ss")) //设置数据合并时所依据主键字段

    //3、将数据写入到Huid表,设置为COW模式
    val hudiTableName: String = "didi_haikou_cow"
    val hudiTablePath: String = "/datas/hudi-warehouse/didi_haikou_cow"

    // 导入包
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.config.HoodieWriteConfig._

    // 保存数据
    insertDF.write
      .mode(SaveMode.Overwrite)
      .format("hudi")
      .option(TABLE_TYPE.key(), COW_TABLE_TYPE_OPT_VAL) //设置表写出模式,默认cow模式
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      // Hudi 表的属性值设置
      .option(RECORDKEY_FIELD.key(), "order_id")
      .option(PRECOMBINE_FIELD.key(), "ts")
      .option(PARTITIONPATH_FIELD.key(), "partition_path")
      .option(TBL_NAME.key(), hudiTableName)
      .save(hudiTablePath)

    spark.stop()
  }
}

3)、查看HDFS上已经有数据写入了

Hudi学习五:Hudi与Hive集成

1)、创建Hive分区外部表

(Hudi表是分区表,分区字段是partition_path,格式为yyyy-MM-dd,数据格式HoodieParquetInputFormat)

create external table didi_haikou_cow(
_hoodie_commit_time     string,
_hoodie_commit_seqno    string,
_hoodie_record_key      string,
_hoodie_partition_path  string,
_hoodie_file_name       string,
order_id                bigint,
product_id              int,
city_id                 int,
district                int,
county                  int,
type                    int,
combo_type              int,
traffic_type            int,
passenger_count         int,
driver_product_id       int,
start_dest_distance     int,
arrive_time             string,
departure_time          string,
pre_total_fee           int,
normal_time             string,
bubble_trace_id         string,
product_1level          int,
dest_lng                double,
dest_lat                double,
starting_lng            double,
starting_lat            double,
ts                      bigint)
PARTITIONED BY (partition_path string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT
'org.apache.hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '/datas/hudi-warehouse/didi_haikou_cow';

2)、手动添加分区

alter table hudi_hive.didi_haikou_cow add if not exists partition(partition_path='2017-5-26') location '/datas/hudi-warehouse/didi_haikou_cow/2017-5-26/';

3、查询数据

1)查询报错

这是你创建的Hive表字段类型和Hudi表的类型不匹配导致

Hudi学习五:Hudi与Hive集成

2)、正确查询结果

select order_id,product_id,city_id,district,county,type,combo_type,traffic_type  from hudi_hive.didi_haikou_cow limit 5;

Hudi学习五:Hudi与Hive集成

四、使用spark代码将数据写入Huid(cow模式)并同步到Hive

1、数据格式

{"ad_id":"9","birthday":"1997-11-16","dn":"webA","dt":"20190722","email":"test@126.com","fullname":"王0","iconurl":"-","lastlogin":"-","mailaddr":"-","memberlevel":"1","password":"123456","paymoney":"-","phone":"13711235451","qq":"10000","register":"2015-04-05","regupdatetime":"-","uid":"0","unitname":"-","userip":"222.42.116.199","zipcode":"-"}
{"ad_id":"5","birthday":"1997-03-08","dn":"webA","dt":"20190722","email":"test@126.com","fullname":"王1","iconurl":"-","lastlogin":"-","mailaddr":"-","memberlevel":"4","password":"123456","paymoney":"-","phone":"13711235451","qq":"10000","register":"2017-10-13","regupdatetime":"-","uid":"1","unitname":"-","userip":"106.92.133.13","zipcode":"-"}
{"ad_id":"1","birthday":"1998-10-18","dn":"webA","dt":"20190722","email":"test@126.com","fullname":"王2","iconurl":"-","lastlogin":"-","mailaddr":"-","memberlevel":"7","password":"123456","paymoney":"-","phone":"13711235451","qq":"10000","register":"2018-03-16","regupdatetime":"-","uid":"2","unitname":"-","userip":"139.200.218.184","zipcode":"-"}
{"ad_id":"2","birthday":"1970-10-27","dn":"webA","dt":"20190722","email":"test@126.com","fullname":"王3","iconurl":"-","lastlogin":"-","mailaddr":"-","memberlevel":"2","password":"123456","paymoney":"-","phone":"13711235451","qq":"10000","register":"2016-08-22","regupdatetime":"-","uid":"3","unitname":"-","userip":"121.77.205.103","zipcode":"-"}
{"ad_id":"3","birthday":"1975-06-16","dn":"webA","dt":"20190722","email":"test@126.com","fullname":"王4","iconurl":"-","lastlogin":"-","mailaddr":"-","memberlevel":"8","password":"123456","paymoney":"-","phone":"13711235451","qq":"10000","register":"2015-08-14","regupdatetime":"-","uid":"4","unitname":"-","userip":"121.77.66.4","zipcode":"-"}
{"ad_id":"9","birthday":"1982-08-17","dn":"webA","dt":"20190722","email":"test@126.com","fullname":"王5","iconurl":"-","lastlogin":"-","mailaddr":"-","memberlevel":"1","password":"123456","paymoney":"-","phone":"13711235451","qq":"10000","register":"2018-12-11","regupdatetime":"-","uid":"5","unitname":"-","userip":"121.77.232.117","zipcode":"-"}
{"ad_id":"0","birthday":"1979-01-07","dn":"webA","dt":"20190722","email":"test@126.com","fullname":"王6","iconurl":"-","lastlogin":"-","mailaddr":"-","memberlevel":"3","password":"123456","paymoney":"-","phone":"13711235451","qq":"10000","register":"2016-01-05","regupdatetime":"-","uid":"6","unitname":"-","userip":"182.80.12.221","zipcode":"-"}

2、代码

/**
  * @author oyl
  * @create 2022-06-12 21:12
  * @Description spark操作cow模式的hudi表并将数据同步到hive
  */
object SparkOperatorHudiCOWSyncHive {

  def insertData(sparkSession: SparkSession): Unit = {
    import org.apache.spark.sql.functions._

    val tableName = "hudi_cow_hive"
    val basePath = "/datas/hudi-warehouse/hudi_cow_hive"

    val commitTime = System.currentTimeMillis().toString //生成提交时间

    val resultDF = sparkSession.read.json("/hudi_test_datas/member.log")
      .withColumn("ts", lit(commitTime)) //添加ts时间戳
      .withColumn("hudipartition", concat_ws("/", col("dt"), col("dn"))) //添加分区 两个字段组合分区

    Class.forName("org.apache.hive.jdbc.HiveDriver")

    resultDF.write.format("hudi")
      .option(TABLE_TYPE.key(), COW_TABLE_TYPE_OPT_VAL)   //选择表的类型 到底是MERGE_ON_READ 还是 COPY_ON_WRITE
      .option(RECORDKEY_FIELD.key(), "uid")               //设置主键
      .option(PRECOMBINE_FIELD.key(), "ts")               //数据更新时间戳的
      .option(PARTITIONPATH_FIELD.key(), "hudipartition") //hudi分区列
      .option("hoodie.table.name", tableName)             //hudi表名

      .option("hoodie.datasource.hive_sync.jdbcurl", "jdbc:hive2://hadoop100:10000") //hiveserver2地址
      .option("hoodie.datasource.hive_sync.username","oyl")                          //登入hiveserver2的用户
      .option("hoodie.datasource.hive_sync.password","123123")                       //登入hiveserver2的密码
      .option("hoodie.datasource.hive_sync.database", "hudi_hive")                   //设置hudi与hive同步的数据库
      .option("hoodie.datasource.hive_sync.table", tableName)                        //设置hudi与hive同步的表名
      .option("hoodie.datasource.hive_sync.partition_fields", "dt,dn")               //hive表同步的分区列
      .option("hoodie.datasource.hive_sync.partition_extractor_class", classOf[MultiPartKeysValueExtractor].getName) // 分区提取器 按/ 提取分区
      .option("hoodie.datasource.hive_sync.enable","true")                           //设置数据集注册并同步到hive
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      .mode(SaveMode.Append)
      .save(basePath)
  }

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("SparkOperatorHudiCOWSyncHive").setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    insertData(sparkSession);
    println("将数据以COW模式写入hudi并同步到hive外部表............................")

    sparkSession.stop()
  }
}

3、执行报错

1)、错误一

Exception in thread "main" java.lang.ClassNotFoundException: org.apache.hive.jdbc.HiveDriver
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:264)

解决:代码添加加载hive驱动,pom添加(对应Hive的版本)依赖

Class.forName("org.apache.hive.jdbc.HiveDriver")

    org.apache.hive
    hive-exec
    2.3.6

    org.apache.hive
    hive-jdbc
    2.3.6

2)、错误二

Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:606)
    at org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:330)
    at org.datanucleus.store.AbstractStoreManager.registerConnectionFactory(AbstractStoreManager.java:203)
    ... 99 more
Caused by: org.datanucleus.exceptions.NucleusException: Attempt to invoke the "BONECP" plugin to create a ConnectionPool gave an error : The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH. Please check your CLASSPATH specification, and the name of the driver.
    at org.datanucleus.store.rdbms.ConnectionFactoryImpl.generateDataSources(ConnectionFactoryImpl.java:232)
    at org.datanucleus.store.rdbms.ConnectionFactoryImpl.initialiseDataSources(ConnectionFactoryImpl.java:117)
    at org.datanucleus.store.rdbms.ConnectionFactoryImpl.(ConnectionFactoryImpl.java:82)
    ... 106 more
Caused by: org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException: The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH. Please check your CLASSPATH specification, and the name of the driver.
    at org.datanucleus.store.rdbms.connectionpool.AbstractConnectionPoolFactory.loadDriver(AbstractConnectionPoolFactory.java:58)
    at org.datanucleus.store.rdbms.connectionpool.BoneCPConnectionPoolFactory.createConnectionPool(BoneCPConnectionPoolFactory.java:54)
    at org.datanucleus.store.rdbms.ConnectionFactoryImpl.generateDataSources(ConnectionFactoryImpl.java:213)

解决:pom文件添加mysql依赖


    mysql
    mysql-connector-java
    5.1.27

4、插入数据执行结果

1)、Hdfs里面可以查看到数据

Hudi学习五:Hudi与Hive集成

Hudi学习五:Hudi与Hive集成

可以看到hive里的建表会采用HoodieParquetInputFormat 格式支持快照查询和增量查询

CREATE EXTERNAL TABLE hudi_cow_hive(
   _hoodie_commit_time string,
   _hoodie_commit_seqno string,
   _hoodie_record_key string,
   _hoodie_partition_path string,
   _hoodie_file_name string,
   ad_id string,
   birthday string,
   email string,
   fullname string,
   iconurl string,
   lastlogin string,
   mailaddr string,
   memberlevel string,
   password string,
   paymoney string,
   phone string,
   qq string,
   register string,
   regupdatetime string,
   uid string,
   unitname string,
   userip string,
   zipcode string,
   ts string,
   hudipartition string)
 PARTITIONED BY (
   dt string,
   dn string)
 ROW FORMAT SERDE
   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
 WITH SERDEPROPERTIES (
   'hoodie.query.as.ro.table'='false',
   'path'='/datas/hudi-warehouse/hudi_cow_hive')
 STORED AS INPUTFORMAT
   'org.apache.hudi.hadoop.HoodieParquetInputFormat'
 OUTPUTFORMAT
   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
 LOCATION
   'hdfs://hadoop100:9000/datas/hudi-warehouse/hudi_cow_hive'
 TBLPROPERTIES (
   'last_commit_time_sync'='20220617224055',
   'spark.sql.sources.provider'='hudi',
   'spark.sql.sources.schema.numPartCols'='2',
   'spark.sql.sources.schema.numParts'='1',
   'transient_lastDdlTime'='1655393022')

5、修改数据

修改uid>0 and uid

1)、修改前的数据

Hudi学习五:Hudi与Hive集成

2)、修改数据代码

3)、修改后结果

hive查询的是最新的修改数据

Hudi学习五:Hudi与Hive集成

可以看到HDFS上数据成功写入到另一个文件,且文件大小都相同,所以Copy on Write表其实是和旧数据进行合并后再次写入全量数据。这也验证了官方所说的Copy on Write表写入数据延迟高,wa写入大。所以如果这张表是经常需要修改和写入的建议采纳哦使用Merge on Read表。

Hudi学习五:Hudi与Hive集成

6、查询修改数据结果

1)、快照查询

/**
  * 快照查询,展示uid

Hudi学习五:Hudi与Hive集成

(1)查询前40条数据,分别包含修改的20条数据和未修改的20条数据

可以看到进行修改的数据和原来的数据commit时间戳是不一样的

/**
  * 增量查询,查询前40条数据,分别包含修改的20条数据和未修改的20条数据
  */
def queryData2(sparkSession: SparkSession) = {

  sparkSession.sql("set spark.sql.hive.convertMetastoreParquet=false")
  sparkSession.sql("select _hoodie_commit_time,ad_id,birthday,dn,fullname,uid from hudi_hive.hudi_cow_hive where uid>=0 and uid

Hudi学习五:Hudi与Hive集成

(2) 时间戳增量查询

指 定查询 类型为增量查询,并且传入时间戳,那么spark会查询时间戳以后的数据。

def queryData3(sparkSession: SparkSession) = {

  val df = sparkSession.read.format("hudi")
    .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) //指定增量查询
    .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "20220616232233") //开始时间
    //.option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY,"")  //结束时间
    .load("/datas/hudi-warehouse/hudi_cow_hive/")
  df.show(20)
}

Hudi学习五:Hudi与Hive集成

1、数据格式(同上)

2、代码

/**
  * @author oyl
  * @create 2022-06-12 21:12
  * @Description spark操作mor模式的hudi表并将数据同步到hive
  */
object SparkOperatorHudiMORSyncHive {

  def insertData(sparkSession: SparkSession): Unit = {
    import org.apache.spark.sql.functions._

    val tableName = "hudi_mor_hive"
    val basePath = "/datas/hudi-warehouse/hudi_mor_hive"

    val commitTime = System.currentTimeMillis().toString //生成提交时间

    val resultDF = sparkSession.read.json("/hudi_test_datas/member.log")
      .withColumn("ts", lit(commitTime)) //添加ts时间戳
      .withColumn("hudipartition", concat_ws("/", col("dt"), col("dn"))) //添加分区 两个字段组合分区

    Class.forName("org.apache.hive.jdbc.HiveDriver")

    resultDF.write.format("hudi")
      .option(TABLE_TYPE.key(), MOR_TABLE_TYPE_OPT_VAL)   //选择表的类型 到底是MERGE_ON_READ 还是 COPY_ON_WRITE
      .option(RECORDKEY_FIELD.key(), "uid")               //设置主键
      .option(PRECOMBINE_FIELD.key(), "ts")               //数据更新时间戳的
      .option(PARTITIONPATH_FIELD.key(), "hudipartition") //hudi分区列
      .option("hoodie.table.name", tableName)             //hudi表名

      .option("hoodie.datasource.hive_sync.jdbcurl", "jdbc:hive2://hadoop100:10000") //hiveserver2地址
      .option("hoodie.datasource.hive_sync.username","oyl")                          //登入hiveserver2的用户
      .option("hoodie.datasource.hive_sync.password","123123")                       //登入hiveserver2的密码
      .option("hoodie.datasource.hive_sync.database", "hudi_hive")                   //设置hudi与hive同步的数据库
      .option("hoodie.datasource.hive_sync.table", tableName)                        //设置hudi与hive同步的表名
      .option("hoodie.datasource.hive_sync.partition_fields", "dt,dn")               //hive表同步的分区列
      .option("hoodie.datasource.hive_sync.partition_extractor_class", classOf[MultiPartKeysValueExtractor].getName) // 分区提取器 按/ 提取分区
      .option("hoodie.datasource.hive_sync.enable","true")                           //设置数据集注册并同步到hive
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      .mode(SaveMode.Append)
      .save(basePath)
  }

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("SparkOperatorHudiCOWSyncHive").setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    insertData(sparkSession);
    println("将数据以MOR模式写入hudi并同步到hive外部表............................")
    sparkSession.stop()
  }
}

3、插入数据执行结果

1)、hdfs数据

Hudi学习五:Hudi与Hive集成

2)、查询hive数据

出现两张表 hudi_mor_hive_ro,hudi_mor_hive_rt

Hudi学习五:Hudi与Hive集成

3)、查询hive表结构

rt结尾的表支持快照查询和增量查询,并且rt表将会查询表基本列数据和增量日志数据的合并视图,立马可以查询到修改后的数据。而ro表则只查询表中基本列数据并不会去查询增量日志里的数据。rt表采用HoodieParquetRealtimeInputFormat格式进行存储,ro表采用HoodieParquetInputFormat格式进行存储

CREATE EXTERNAL TABLE hudi_mor_hive_ro(
   _hoodie_commit_time string,
   _hoodie_commit_seqno string,
   _hoodie_record_key string,
   _hoodie_partition_path string,
   _hoodie_file_name string,
   ad_id string,
   birthday string,
   email string,
   fullname string,
   iconurl string,
   lastlogin string,
   mailaddr string,
   memberlevel string,
   password string,
   paymoney string,
   phone string,
   qq string,
   register string,
   regupdatetime string,
   uid string,
   unitname string,
   userip string,
   zipcode string,
   ts string,
   hudipartition string)
 PARTITIONED BY (
   dt string,
   dn string)
 ROW FORMAT SERDE
   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
 WITH SERDEPROPERTIES (
   'hoodie.query.as.ro.table'='true',
   'path'='/datas/hudi-warehouse/hudi_mor_hive')
 STORED AS INPUTFORMAT
   'org.apache.hudi.hadoop.HoodieParquetInputFormat'
 OUTPUTFORMAT
   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
 LOCATION
   'hdfs://hadoop100:9000/datas/hudi-warehouse/hudi_mor_hive'
 TBLPROPERTIES (
   'last_commit_time_sync'='20220618000319',
   'spark.sql.sources.provider'='hudi',
   'spark.sql.sources.schema.numPartCols'='2',
   'spark.sql.sources.schema.numParts'='1',
   'transient_lastDdlTime'='1655393789')

   CREATE EXTERNAL TABLE hudi_mor_hive_rt(
   _hoodie_commit_time string,
   _hoodie_commit_seqno string,
   _hoodie_record_key string,
   _hoodie_partition_path string,
   _hoodie_file_name string,
   ad_id string,
   birthday string,
   email string,
   fullname string,
   iconurl string,
   lastlogin string,
   mailaddr string,
   memberlevel string,
   password string,
   paymoney string,
   phone string,
   qq string,
   register string,
   regupdatetime string,
   uid string,
   unitname string,
   userip string,
   zipcode string,
   ts string,
   hudipartition string)
 PARTITIONED BY (
   dt string,
   dn string)
 ROW FORMAT SERDE
   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
 WITH SERDEPROPERTIES (
   'hoodie.query.as.ro.table'='false',
   'path'='/datas/hudi-warehouse/hudi_mor_hive')
 STORED AS INPUTFORMAT
   'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'
 OUTPUTFORMAT
   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
 LOCATION
   'hdfs://hadoop100:9000/datas/hudi-warehouse/hudi_mor_hive'
 TBLPROPERTIES (
   'last_commit_time_sync'='20220618000319',
   'spark.sql.sources.provider'='hudi',
   'spark.sql.sources.schema.numPartCols'='2',
   'spark.sql.sources.schema.numParts'='1',
   'transient_lastDdlTime'='1655393790')

4、修改数据

修改uid>0 and uid

1)、修改数据代码

/**
  * 修改数据,hudi支持行级更新
  */
def updateData(sparkSession: SparkSession) = {

  val tableName = "hudi_mor_hive"
  val basePath = "/datas/hudi-warehouse/hudi_mor_hive"

  //只查询20条数据,并进行修改,修改这20条数据的fullname值
  import org.apache.spark.sql.functions._
  val commitTime = System.currentTimeMillis().toString //生成提交时间

  //这里修改数据相当于行级别修改
  val resultDF = sparkSession.read.json("/hudi_test_datas/member.log")
    .withColumn("ts", lit(commitTime)) //添加ts时间戳
    .withColumn("hudipartition", concat_ws("/", col("dt"), col("dn")))
    .where("uid >=0  and uid

2)、修改后

(1)HDFS文件对比,修改的数据都写入到log文件里面了, 所以Merge on Read发生修改操作,是将变化数据写入行式增量日志

Hudi学习五:Hudi与Hive集成

Hudi学习五:Hudi与Hive集成

(2)hive表对比

hudi_mor_hive_ro表数据没有变化, 其实or结尾的表对应是读取优化查询,只查询最基本列数据,并不会看到被修改的数据。

Hudi学习五:Hudi与Hive集成

hudi_mor_hive_rt表数据是最新修改的数据(rt结尾的表为最新快照表)

Hudi学习五:Hudi与Hive集成

六、代码的POM文件


        bigdata-parent
        com.ouyangl
        1.0-SNAPSHOT

    4.0.0

    bigdata-hudi

        2.12.10
        2.12
        3.0.0
        2.7.3
        0.9.0

            org.scala-lang
            scala-library
            ${scala.version}

            org.apache.spark
            spark-core_${scala.binary.version}
            ${spark.version}

            org.apache.spark
            spark-sql_${scala.binary.version}
            ${spark.version}

            org.apache.hadoop
            hadoop-client
            ${hadoop.version}

            org.codehaus.janino
            janino
            3.0.8

            org.apache.hudi
            hudi-spark3-bundle_2.12
            ${hudi.version}

            org.apache.spark
            spark-avro_2.12
            ${spark.version}

            org.apache.hudi
            hudi-hadoop-mr-bundle

                    jackson-databind
                    com.fasterxml.jackson.core

                    com.fasterxml.jackson.core
                    jackson-annotations

            0.9.0
            provided

            org.apache.hive
            hive-exec
            2.3.6

            org.apache.hive
            hive-jdbc
            2.3.6

            org.apache.spark
            spark-hive_2.12
            3.0.0

            mysql
            mysql-connector-java
            5.1.27

        target/classes
        target/test-classes

                ${project.basedir}/src/main/resources

                org.apache.maven.plugins
                maven-compiler-plugin
                3.0

                    1.8
                    1.8
                    UTF-8

                net.alchim31.maven
                scala-maven-plugin
                3.2.0

                            compile
                            testCompile

Original: https://blog.csdn.net/NC_NE/article/details/125347370
Author: Hub-Link
Title: Hudi学习五:Hudi与Hive集成

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/817490/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球