Spark3 读写 S3 Parquet, Hive, Hudi

Spark 读 S3 Parquet 写入 Hudi 表

Spark 读 S3 Parquet 写入 Hudi 表

关于S3,S3N和S3A的区别与联系

Spark 读写 S3 Parquet 文件

测试代码

pom.xml

配置文件

EMR Spark任务提交

spark-shell

spark-submit

Spark 读写 Hudi

本地测试

集群上测试

spark-shell

spark-sql

Spark-submit

Hive 中测试

问题解决

Hadoop-aws

EMR 对应版本

EMR Spark

EMR Hudi

首先是三种协议的访问大小有区别;

其次S3是block-based,s3n/s3a是 object-based。

最后S3A是apache推荐的访问方式,且S3访问方式将会慢慢被替代,AWS不赞成使用S3访问,且S3A更加稳定安全高效

S3 Block FileSystem (URI scheme: s3) A block-based filesystem backed by S3. Files are stored as blocks, just like they are in HDFS. This permits efficient implementation of renames. This filesystem requires you to dedicate a bucket for the filesystem – you should not use an existing bucket containing files, or write other files to the same bucket. The files stored by this filesystem can be larger than 5GB, but they are not interoperable with other S3 tools.

S3A (URI scheme: s3a) A successor to the S3 Native, s3n fs, the S3a: system uses Amazon’s libraries to interact with S3. This allows S3a to support larger files (no more 5GB limit), higher performance operations and more. The filesystem is intended to be a replacement for /successor to S3 Native: all objects accessible from s3n:// URLs should also be accessible from s3a simply by replacing the URL schema.

S3 Native FileSystem (URI scheme: s3n) A native filesystem for reading and writing regular files on S3. The advantage of this filesystem is that you can access files on S3 that were written with other tools. Conversely, other tools can access files written using Hadoop. The disadvantage is the 5GB limit on file size imposed by S3.

package org.zero
​
import com.amazonaws.auth.{ClasspathPropertiesFileCredentialsProvider, DefaultAWSCredentialsProviderChain}
import org.apache.log4j.{Level, Logger}
import org.slf4j.LoggerFactory
import org.utils.SparkUtils
import software.amazon.awssdk.auth.credentials.{EnvironmentVariableCredentialsProvider, ProfileCredentialsProvider}
​
object SparkS2Test {
  private var logger: org.slf4j.Logger = _
​
  def main(args: Array[String]): Unit = {
    logger = LoggerFactory.getLogger(this.getClass.getSimpleName)
    Logger.getLogger("org.apache.hadoop").setLevel(Level.INFO)
    Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
    Logger.getLogger("org.spark_project.jetty").setLevel(Level.WARN)
​
    val start = System.currentTimeMillis()
    logger.warn(s"=================== Spark 读取 S3 ===================")
​
    val spark = SparkUtils.getSparkSession(this.getClass.getSimpleName, "local[*]")
    val sc = spark.sparkContext
    sc.hadoopConfiguration.set("fs.s3a.access.key", "AKIA4ZNT6QH3L45V45VY")
    sc.hadoopConfiguration.set("fs.s3a.secret.key", "og8I6vB52vDhhb/So/r9ioHMvtbJ4EI2xdGPQIce")
    sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")
​
    val dataframe = spark
      .read
      .parquet("s3a://s3-datafacts-poc-001/dct/s3-datafacts-poc-001/dt=2022-05-09")
​
    val tmpCache = dataframe.cache()
    tmpCache.createOrReplaceTempView("parquet_tmp_view")
​
    val dataFrame2 = spark.sql("select * from parquet_tmp_view limit 10")
​
    dataFrame2.show
​
//    dataFrame2.write.parquet("F:\\tmp\\output")
​
    spark.stop()
​
    val end = System.currentTimeMillis()
    logger.warn(s"=================== 耗时: ${(end - start) / 1000} 秒 ===================")
  }
}
package org.utils
​
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SparkSession}
​
object SparkUtils {
  private val sparkConf: SparkConf = new SparkConf()
​
  def getSparkConf(appName: String, master: String): SparkConf = {
    sparkConf.setMaster(master).setAppName(appName)
  }
​
  def getSparkSession(appName: String, master: String): SparkSession = {
    sparkConf.setMaster(master).setAppName(appName)
    sparkSessionInit
  }
​
  lazy val sparkSessionInit: SparkSession = SparkSession.builder()
    .config(sparkConf)
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.io.compression.codec", "snappy")
    .config("spark.rdd.compress", "true")
    .config("spark.hadoop.parquet.writer.version", "v2")
    .config("spark.sql.parquet.enableVectorizedReader", "false")
    .config("spark.sql.parquet.compression.codec", "false")
    .config("spark.sql.parquet.compression.codec", "snappy")
    .config("spark.sql.parquet.filterPushdown", "true")
    .config("spark.sql.parquet.mergeSchema", "true")
    .config("spark.sql.parquet.binaryAsString", "true")
    .getOrCreate()
}

​

    4.0.0
​
    org.example
    spark-s3-hudi-test
    1.0-SNAPSHOT
​
    spark-s3-hudi-test
​

        UTF-8
        1.8
        1.8
        4.3.0
        3.8.1
        3.1.1
        2.12.13
        2.12
        3.1.2
        3.2.1
        2.10.0
        compile

​

            emr-6.5.0-artifacts
            EMR 6.5.0 Releases Repository

                true

                false

            https://s3.us-west-1.amazonaws.com/us-west-1-emr-artifacts/emr-6.5.0/repos/maven/

                software.amazon.awssdk
                bom
                2.17.186
                pom
                import

​

            software.amazon.awssdk
            s3

            software.amazon.awssdk
            kms

            software.amazon.awssdk
            s3control

            org.scala-lang
            scala-library
            ${scala.version}
            ${project.build.scope}

            org.apache.spark
            spark-core_${scala.binary.version}
            ${spark.version}
            ${project.build.scope}

            org.apache.spark
            spark-sql_${scala.binary.version}
            ${spark.version}
            ${project.build.scope}

            org.apache.spark
            spark-hive_${scala.binary.version}
            ${spark.version}
            ${spark.pom.scope}

            org.apache.hadoop
            hadoop-aws
            ${hadoop.version}

            org.apache.hadoop
            hadoop-client
            ${hadoop.version}

            com.fasterxml.jackson.core
            jackson-core
            ${fasterxml.jackson.version}

            com.fasterxml.jackson.core
            jackson-databind
            ${fasterxml.jackson.version}

            com.fasterxml.jackson.core
            jackson-annotations
            ${fasterxml.jackson.version}

            org.apache.parquet
            parquet-avro
            1.12.0

            org.apache.httpcomponents
            httpcore
            4.4.15

            org.apache.httpcomponents
            httpclient
            4.5.13

​

                net.alchim31.maven
                scala-maven-plugin
                ${scala.maven.plugin.version}

                            compile

                org.apache.maven.plugins
                maven-assembly-plugin
                ${maven.assembly.plugin.version}

                        jar-with-dependencies

                        make-assembly
                        package

                            single

创建 resources 目录,添加配置文件

core-site.xml


        fs.s3a.aws.credentials.provider

            org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

log4j.properties

################################################################################
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
 distributed with this work for additional information
 regarding copyright ownership.  The ASF licenses this file
 to you under the Apache License, Version 2.0 (the
 "License"); you may not use this file except in compliance
 with the License.  You may obtain a copy of the License at
#
     http://www.apache.org/licenses/LICENSE-2.0
#
 Unless required by applicable law or agreed to in writing, software
 distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 See the License for the specific language governing permissions and
limitations under the License.

################################################################################
log4j.rootLogger=info, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
#log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p - %m%n
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-10c %x - %m%n
spark-shell \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.sql.hive.convertMetastoreParquet=false" \
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
spark.conf.set("spark.sql.parquet.binaryAsString", "true")
​
sc.hadoopConfiguration.set("fs.s3a.access.key", "AKIA4ZNT6QH3L45V45VY")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "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")
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")

val df = spark.read.parquet("s3a://s3-datafacts-poc-001/dw/ods/test_s3_01/dt=2022-05-11")
df.show

将代码打包上传,不需要打包依赖

spark-submit \
--master local[2] \
--class org.zero.SparkS2Test \
/opt/jars/spark-s3-hudi-test-1.0-SNAPSHOT.jar
spark-submit \
--master yarn \
--deploy-mode client \
--executor-memory 1G \
--total-executor-cores 2 \
--class org.zero.SparkS2Test \
/opt/jars/spark-s3-hudi-test-1.0-SNAPSHOT.jar
spark-submit \
--master yarn \
--deploy-mode client \
--num-executors 2 \
--executor-cores 2 \
--executor-memory 1g \
--class org.zero.SparkS2Test \
s3://s3-datafacts-poc-001/spark-s3-hudi-test-1.0-SNAPSHOT.jar

如果使用AWS hudi.jar,从msater服务器下载 hudi 包,并导入本地 maven 仓库

/usr/lib/hudi/hudi-spark3-bundle_2.12-0.9.0-amzn-1.jar
​
mvn install:install-file -Dfile=D:\soft\hudi-spark3-bundle_2.12-0.9.0-amzn-1.jar -DgroupId=org.apache.hudi -DartifactId=hudi-spark3-bundle_2.12 -Dversion=0.9.0-amzn-1 -Dpackaging=jar

项目中添加依赖


            com.amazon.awssdk
            hudi-spark3-bundle_${scala.binary.version}
            0.9.0-amzn-1

package org.zero
​
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieIndexConfig._
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.{current_date, lit}
import org.slf4j.LoggerFactory
import org.utils.SparkUtils
​
​
object SparkS3HudiTest {
  private var logger: org.slf4j.Logger = _
​
  def main(args: Array[String]): Unit = {
    logger = LoggerFactory.getLogger(this.getClass.getSimpleName)
    Logger.getLogger("org.apache.hadoop").setLevel(Level.INFO)
    Logger.getLogger("org.apache.spark").setLevel(Level.INFO)
    Logger.getLogger("org.spark_project.jetty").setLevel(Level.DEBUG)
​
    val start = System.currentTimeMillis()
​
​
    val spark = SparkUtils.getSparkSession(this.getClass.getSimpleName, "local[1]")
    val sc = spark.sparkContext
    sc.hadoopConfiguration.set("fs.s3a.access.key", "AKIA4ZNT6QH3L45V45VY")
    sc.hadoopConfiguration.set("fs.s3a.secret.key", "og8I6vB52vDhhb/So/r9ioHMvtbJ4EI2xdGPQIce")
    sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")
    //    sc.hadoopConfiguration.set("fs.s3.connection.ssl.enabled", "false")
​
    // 读 s3
    //    val dataframe = spark
    //      .read
    //      .parquet("s3a://s3-datafacts-poc-001/dct/s3-datafacts-poc-001/dt=2022-05-09")
​
    //    val tmpCache = dataframe.cache()
    //    tmpCache.createOrReplaceTempView("parquet_tmp_view")
    //
    //    val dataFrame2 = spark.sql("select * from parquet_tmp_view limit 10")
    //
    //    dataFrame2.show
​
    // 写 s3
    //        val dataframe = spark
    //          .read
    //          .parquet("F:\\tmp\\test_table.parquet")
    //
    //        dataframe
    //          .write
    //          .mode(SaveMode.Overwrite)
    //          .save("s3a://s3-datafacts-poc-001/test/test_parquet")
​
    // 读 s3 多对象
    //    logger.warn(s"=================== Spark 读 S3 Parquet ===================")
    //    val dataframe2 = spark.read
    //      .parquet("s3a://s3-datafacts-poc-001/dw/ods/test_s3_01/dt=2022-05-1[1-3]")

    //    dataframe2.printSchema()
    //    dataframe2.show()

    //    val dataframe3 = dataframe2.withColumn("dt", current_date())
    //    dataframe2.withColumn("last_update_time" , unix_timestamp())
​
    // Spark Dataframe 写 Hudi
    //    logger.warn(s"=================== Hudi 参数配置 ===================")
    // hudi 参数设置
    //    val hudiOptions = Map[String, String](
    //      HoodieWriteConfig.TBL_NAME.key() -> "hudi_test_table",
    //      TABLE_TYPE.key() -> COW_TABLE_TYPE_OPT_VAL,
    //      RECORDKEY_FIELD.key() -> "id",
    //      PARTITIONPATH_FIELD.key() -> "dt",
    //      PRECOMBINE_FIELD.key() -> "time",
    //      BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key() -> "true",
    //      INDEX_TYPE.key()-> IndexType.BLOOM.name(),
    //      META_SYNC_ENABLED.key() -> "true",
    //      HIVE_SYNC_ENABLED.key() -> "true",
    //      HIVE_USER.key() -> "hive",
    //      HIVE_PASS.key() -> "hive",
    //      HIVE_DATABASE.key() -> "ods",
    //      HIVE_TABLE.key() -> "hudi_test_table",
    //      HIVE_URL.key() -> "jdbc:hive2://52.82.123.13:10000",
    //      //      HIVE_URL.key() -> "jdbc:hive2://172.31.194.132:10000",
    //      HIVE_PARTITION_FIELDS.key() -> "dt",
    //      HIVE_PARTITION_EXTRACTOR_CLASS.key() -> classOf[MultiPartKeysValueExtractor].getName,
    //      HIVE_AUTO_CREATE_DATABASE.key() -> "true"
    //    )
    //    println("===> \n" + hudiOptions)
​
    //    logger.warn(s"=================== Spark 写 S3 Hudi ===================")
    //    dataframe3.write
    //      .format("org.apache.hudi")
    //      .option(OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
    //      .options(hudiOptions)
    //      .mode(SaveMode.Overwrite)
    //      .save("s3://s3-datafacts-poc-001/dw/ods/hudi_test_table")
​
    // spark 读 hudi
    //    logger.warn(s"=================== Spark 读取 S3 Hudi ===================")
    //    val readDF = spark.read
    //      .format("org.apache.hudi")
    //      .load("s3://s3-datafacts-poc-001/dw/ods/hudi_test_table")
    //      .select("*")
    //      .sort("creation_date","id")
    //      .show(10)
​
    // Spark SQL 读 Hive 写 Hudi
    logger.warn(s"=================== SparkSQL 创建 Hive 表 ===================")
    spark.sql("set spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict")
    spark.sql("set spark.hadoop.hive.exec.compress.output=true")
    spark.sql("set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec")
    spark.sql("set mapreduce.output.fileoutputformat.compress.type=BLOCK")
​
    //    val hiveSourceDDL =
    //      """
    //        |create external table if not exists ods.test_parquet_to_hive_table(
    //        | id int,
    //        | name string,
    //        | age int,
    //        | job string,
    //        | address string,
    //        | company string,
    //        | email string,
    //        | url string,
    //        | phone string,
    //        | sfzh string,
    //        | chrome string,
    //        | ipv4 string,
    //        | ipv6 string,
    //        | date bigint,
    //        | time bigint,
    //        | mac_address string,
    //        | col_tinyint int,
    //        | col_smallint int,
    //        | col_mediumint int,
    //        | col_bigint bigint,
    //        | col_decimal double,
    //        | col_double double,
    //        | col_float double,
    //        | col_time bigint,
    //        | col_blob string,
    //        | col_text string
    //        |) partitioned by(dt string)
    //        |stored as parquet
    //        |location "s3://s3-datafacts-poc-001/dw/ods/test_parquet_to_hive_table"
    //        |""".stripMargin
    //    spark.sql(hiveSourceDDL)
​
    //    spark.sql("show databases").show
    spark.sql("use ods")
    //    spark.sql("show tables").show
    //    spark.sql("show create table test_parquet_to_hive_table").show
​
    logger.warn(s"=================== SparkSQL 创建 Hudi Sink 表 ===================")
​
    val hudiSinkDDL =
"""
        |create table if not exists ods.ods_hudi_sink_table (
        | id int,
        | name string,
        | age int,
        | job string,
        | address string,
        | company string,
        | email string,
        | url string,
        | phone string,
        | sfzh string,
        | chrome string,
        | ipv4 string,
        | ipv6 string,
        | date bigint,
        | time bigint,
        | mac_address string,
        | col_tinyint int,
        | col_smallint int,
        | col_mediumint int,
        | col_bigint bigint,
        | col_decimal double,
        | col_double double,
        | col_float double,
        | col_time bigint,
        | col_blob string,
        | col_text string,
        | dt date
        |) using hudi
        |partitioned by (dt)
        |tblproperties (
        |  type = 'cow',
        |  primaryKey = 'id',
        |  preCombineField = 'time',
        |  hoodie.index.type = 'GLOBAL_BLOOM',
        |  hiveSyncEnabled = 'true',
        |  hiveDatabase = 'ods',
        |  hiveUser = 'hive',
        |  hivePass = 'hive',
        |  hiveTable = 'ods_hudi_sink_table',
        |  hiveUrl = 'jdbc:hive2://52.82.123.13:10000',
        |  hivePartitionFields = 'dt'
        |)
        |location "s3a://s3-datafacts-poc-001/dw/ods/ods_hudi_sink_table";
        |""".stripMargin
​
    spark.sql(hudiSinkDDL)
    spark.sql("show tables").show
​
    logger.warn(s"=================== SparkSQL 读 Hive 写入 Hudi 表 ===================")
    spark.sql("insert into table ods_hudi_sink_table select * from test_parquet_to_hive_table")
​
    spark.sql("select * from ods_hudi_sink_table limit 10").show
​
    spark.stop()
​
    val end = System.currentTimeMillis()
    logger.warn(s"=================== 耗时: ${(end - start) / 1000} 秒 ===================")
  }
}

pom.xml


    4.0.0

    org.example
    spark-s3-hudi-test
    1.0-SNAPSHOT

    spark-s3-hudi-test

        UTF-8
        1.8
        1.8
        4.3.0
        3.8.1
        3.1.1
        2.12.13
        2.12
        3.1.2
        3.2.1
        0.9.0

        2.10.5

        compile

            com.amazonaws
            aws-java-sdk-s3
            1.12.217
            ${project.build.scope}

            com.amazonaws
            aws-java-sdk-s3control
            1.12.217
            ${project.build.scope}

            com.amazonaws
            aws-java-sdk-kms
            1.12.217
            ${project.build.scope}

            org.scala-lang
            scala-library
            ${scala.version}
            ${project.build.scope}

            org.apache.spark
            spark-core_${scala.binary.version}
            ${spark.version}
            ${project.build.scope}

            org.apache.spark
            spark-sql_${scala.binary.version}
            ${spark.version}
            ${project.build.scope}

            org.apache.spark
            spark-hive_${scala.binary.version}
            ${spark.version}
            ${project.build.scope}

            org.apache.spark
            spark-avro_${scala.binary.version}
            ${spark.version}
            ${project.build.scope}

            org.apache.hadoop
            hadoop-aws
            ${hadoop.version}
            ${project.build.scope}

            org.apache.hadoop
            hadoop-client
            ${hadoop.version}
            ${project.build.scope}

            org.apache.hudi
            hudi-spark3-bundle_${scala.binary.version}
            ${hoodie.version}
            ${project.build.scope}

            com.fasterxml.jackson.core
            jackson-core
            ${fasterxml.jackson.version}
            ${project.build.scope}

            com.fasterxml.jackson.core
            jackson-databind
            ${fasterxml.jackson.version}
            ${project.build.scope}

            com.fasterxml.jackson.core
            jackson-annotations
            ${fasterxml.jackson.version}
            ${project.build.scope}

            org.apache.httpcomponents
            httpcore
            4.4.11
            ${project.build.scope}

            org.apache.httpcomponents
            httpclient
            4.5.9
            ${project.build.scope}

                net.alchim31.maven
                scala-maven-plugin
                ${scala.maven.plugin.version}

                            compile

                org.apache.maven.plugins
                maven-assembly-plugin
                ${maven.assembly.plugin.version}

                        jar-with-dependencies

                        make-assembly
                        package

                            single

spark-shell \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.sql.hive.convertMetastoreParquet=false" \
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
spark.conf.set("spark.sql.parquet.binaryAsString", "true")
​
sc.hadoopConfiguration.set("fs.s3a.access.key", "AKIA4ZNT6QH3L45V45VY")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "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")
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")
sc.hadoopConfiguration.set("fs.s3a.fast.upload", "true")

导入包

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor

读 S3 parquet 文件写入 S3 Hudi 表

val df = spark.read.parquet("s3a://s3-datafacts-poc-001/dw/ods/test_s3_01/dt=2022-05-1[1-3]")
​
val df2 = df.withColumn("creation_date" , current_date())
​
val hudiOptions = Map[String, String](
  HoodieWriteConfig.TBL_NAME.key()  -> "hudi_test_table",
  TABLE_TYPE.key() -> COW_TABLE_TYPE_OPT_VAL,
  RECORDKEY_FIELD.key() -> "id",                    // 主键
  PARTITIONPATH_FIELD.key() -> "creation_date",     // 分区
  PRECOMBINE_FIELD.key() -> "time",                 // 数据更新时间的列名
  BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key() -> "true",     //当分区变更时,当前数据的分区目录是否变更
  INDEX_TYPE.key()-> IndexType.BLOOM.name(),        // 设置索引类型目前有HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM四种索引为了保证分区变更后能找到必须设置全局GLOBAL_BLOOM
  META_SYNC_ENABLED.key() -> "true",                // 同步元数据
  HIVE_SYNC_ENABLED.key() -> "true",                // 同步数据到Hive
  HIVE_USER.key() -> "hive",
  HIVE_PASS.key() -> "hive",
  HIVE_DATABASE.key() -> "ods",
  HIVE_TABLE.key()-> "hudi_test_table",
  HIVE_URL.key() -> "jdbc:hive2://172.31.194.132:10000",
  HIVE_PARTITION_FIELDS.key() -> "creation_date",
  HIVE_PARTITION_EXTRACTOR_CLASS.key() ->  classOf[MultiPartKeysValueExtractor].getName,
  HIVE_AUTO_CREATE_DATABASE.key() -> "true"
)
​
(df2.write.format("org.apache.hudi")
  .option(OPERATION.key(), DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
  .options(hudiOptions)
  .mode(SaveMode.Overwrite)
  .save("s3://s3-datafacts-poc-001/dw/ods/hudi_test_table"))
​
val df3 = spark.read
 .format("org.apache.hudi")
 .load("s3://s3-datafacts-poc-001/dw/ods/hudi_test_table")
 .select("*")
 .sort("creation_date", "id")

df3.show(10)
val updateDF = df2.limit(1).withColumn("creation_date", lit("date"))
​
(updateDF.write
    .format("org.apache.hudi")
    .option(OPERATION.key(), DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
    .options(hudiOptions)
    .mode(SaveMode.Append)
    .save("s3://s3-datafacts-poc-001/dw/ods/hudi_test_table"))

updateDF.select()

s3文件删除


aws s3 rm se3://s3-datafacts-poc-001/dw/ods/hudi_test_table

emrfs sync se3://s3-datafacts-poc-001/dw/ods

Spark-shell 中使用 SparkSql

spark-shell \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.sql.hive.convertMetastoreParquet=false" \
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/hive/lib/mysql-connector-java-5.1.49.jar
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
spark.conf.set("spark.sql.parquet.binaryAsString", "true")
​
sc.hadoopConfiguration.set("fs.s3a.access.key", "AKIA4ZNT6QH3L45V45VY")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "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")
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")
sc.hadoopConfiguration.set("fs.s3a.fast.upload", "true")

S3 Parquet文件导入Hive

spark.sql("show databases").show
​
+---------+
|namespace|
+---------+
|  default|
| dhw_demo|
|      ods|
|    test1|
|  test_db|
|     yszy|
+---------+

Parquet 数据源

val df = spark.read.parquet("s3a://s3-datafacts-poc-001/dw/ods/test_s3_01/dt=2022-05-1[1-3]")
​
val df2 = df.withColumn("dt" , current_date())
​
df2.printSchema
​
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- address: string (nullable = true)
 |-- company: string (nullable = true)
 |-- email: string (nullable = true)
 |-- url: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- sfzh: string (nullable = true)
 |-- chrome: string (nullable = true)
 |-- ipv4: string (nullable = true)
 |-- ipv6: string (nullable = true)
 |-- date: long (nullable = true)
 |-- time: long (nullable = true)
 |-- mac_address: string (nullable = true)
 |-- col_tinyint: integer (nullable = true)
 |-- col_smallint: integer (nullable = true)
 |-- col_mediumint: integer (nullable = true)
 |-- col_bigint: long (nullable = true)
 |-- col_decimal: double (nullable = true)
 |-- col_double: double (nullable = true)
 |-- col_float: double (nullable = true)
 |-- col_time: long (nullable = true)
 |-- col_blob: string (nullable = true)
 |-- col_text: string (nullable = true)
 |-- dt: date (nullable = false)

spark.sql("set spark.sql.debug.maxToStringFields=100")
df2.createOrReplaceTempView("parquet_tmp_table")
​
spark.sql("use ods")
spark.sql("show tables").show
​
spark.sql("desc formatted parquet_tmp_table").show

spark.sql("select * from parquet_tmp_table limit 10").show

Hive 建表 Sink表,当然,也可以在 spark-sql 中创建

beeline -n hive -u jdbc:hive2://52.82.123.13:10000/default
​
SET hive.exec.compress.output=true;
SET mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
SET mapred.output.compression.type=BLOCK;
​
alter table test_parquet_to_hive_table set tblproperties('external'='true');
drop table if exists test_parquet_to_hive_table;
​
create external table if not exists ods.test_parquet_to_hive_table(
 id int,
 name string,
 age int,
 job string,
 address string,
 company string,
 email string,
 url string,
 phone string,
 sfzh string,
 chrome string,
 ipv4 string,
 ipv6 string,
 date bigint,
 time bigint,
 mac_address string,
 col_tinyint int,
 col_smallint int,
 col_mediumint int,
 col_bigint bigint,
 col_decimal double,
 col_double double,
 col_float double,
 col_time bigint,
 col_blob string,
 col_text string
) partitioned by(dt string)
stored as parquet
location "s3://s3-datafacts-poc-001/dw/ods/test_parquet_to_hive_table";

写入Hive表

spark.sql("show create table ods.test_parquet_to_hive_table").show
​
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
spark.sql("set spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict")
​
spark.sql("insert overwrite ods.test_parquet_to_hive_table select * from parquet_tmp_table")
spark.sql("select * from ods.test_parquet_to_hive_table limit 10").show

读取 Hive 表写入Hudi

spark-sql \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/hive/lib/mysql-connector-java-5.1.49.jar
set spark.sql.debug.maxToStringFields=100;
set fs.s3a.access.key=xxxx;
set fs.s3a.secret.key=xxxx;
set fs.s3a.endpoint=s3.cn-northwest-1.amazonaws.com.cn;

创建 Hudi 表

create table if not exists ods.ods_hudi_sink_table (
 id int,
 name string,
 age int,
 job string,
 address string,
 company string,
 email string,
 url string,
 phone string,
 sfzh string,
 chrome string,
 ipv4 string,
 ipv6 string,
 date bigint,
 time bigint,
 mac_address string,
 col_tinyint int,
 col_smallint int,
 col_mediumint int,
 col_bigint bigint,
 col_decimal double,
 col_double double,
 col_float double,
 col_time bigint,
 col_blob string,
 col_text string,
 dt date
) using hudi
partitioned by (dt)
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'time',
  hoodie.index.type = 'GLOBAL_BLOOM',
  hiveSyncEnabled = 'true',
  hiveDatabase = 'ods',
  hiveUser = 'hive',
  hivePass = 'hive',
  hiveTable = 'ods_hudi_sink_table',
  hiveUrl = 'jdbc:hive2://52.82.123.13:10000',
  hivePartitionFields = 'dt'
)
location "s3://s3-datafacts-poc-001/dw/ods/ods_hudi_sink_table";

写入 Hudi

show tables;
​
#ods     hudi_test_table false
#ods     ods_hudi_sink_table     false
#ods     test_parquet_to_hive_table      false
#ods     test_tb_02      false
#Time taken: 0.021 seconds, Fetched 4 row(s)
​
insert into table ods_hudi_sink_table select * from test_parquet_to_hive_table;

select * from ods_hudi_sink_table limit 10;
spark-submit \
--master local[2] \
--class org.zero.SparkS3HudiTest \
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar \
/opt/jars/spark-s3-hudi-test-1.0-SNAPSHOT.jar

spark-submit \
--master yarn \
--deploy-mode client \
--executor-memory 1G \
--total-executor-cores 2 \
--class org.zero.SparkS2Test \
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar \
/opt/jars/spark-s3-hudi-test-1.0-SNAPSHOT.jar
beeline -n hive -u jdbc:hive2://52.82.123.13:10000/default
​
use ods;
​
show tables;
+-----------------------------+
|          tab_name           |
+-----------------------------+
| hudi_test_table             |
| ods_hudi_sink_table         |
| test_parquet_to_hive_table  |
+-----------------------------+
​
show create table hudi_test_table;
+----------------------------------------------------+
|                   createtab_stmt                   |
+----------------------------------------------------+
| CREATE EXTERNAL TABLE hudi_test_table(           |
|   _hoodie_commit_time string,                    |
|   _hoodie_commit_seqno string,                   |
|   _hoodie_record_key string,                     |
|   _hoodie_partition_path string,                 |
|   _hoodie_file_name string,                      |
|   id int,                                        |
|   name string,                                   |
|   age int,                                       |
|   job string,                                    |
|   address string,                                |
|   company string,                                |
|   email string,                                  |
|   url string,                                    |
|   phone string,                                  |
|   sfzh string,                                   |
|   chrome string,                                 |
|   ipv4 string,                                   |
|   ipv6 string,                                   |
|   date bigint,                                   |
|   time bigint,                                   |
|   mac_address string,                            |
|   col_tinyint int,                               |
|   col_smallint int,                              |
|   col_mediumint int,                             |
|   col_bigint bigint,                             |
|   col_decimal double,                            |
|   col_double double,                             |
|   col_float double,                              |
|   col_time bigint,                               |
|   col_blob string,                               |
|   col_text string)                               |
| PARTITIONED BY (                                   |
|   creation_date date)                            |
| ROW FORMAT SERDE                                   |
|   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  |
| WITH SERDEPROPERTIES (                             |
|   'hoodie.query.as.ro.table'='false',              |
|   'path'='s3://s3-datafacts-poc-001/dw/ods/hudi_test_table')  |
| STORED AS INPUTFORMAT                              |
|   'org.apache.hudi.hadoop.HoodieParquetInputFormat'  |
| OUTPUTFORMAT                                       |
|   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' |
| LOCATION                                           |
|   's3://s3-datafacts-poc-001/dw/ods/hudi_test_table' |
| TBLPROPERTIES (                                    |
|   'bucketing_version'='2',                         |
|   'last_commit_time_sync'='20220513101907',        |
|   'spark.sql.sources.provider'='hudi',             |
|   'spark.sql.sources.schema.numPartCols'='1',      |
|   'spark.sql.sources.schema.numParts'='1',         |
|   'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"integer","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"age","type":"integer","nullable":true,"metadata":{}},{"name":"job","type":"string","nullable":true,"metadata":{}},{"name":"address","type":"string","nullable":true,"metadata":{}},{"name":"company","type":"string","nullable":true,"metadata":{}},{"name":"email","type":"string","nullable":true,"metadata":{}},{"name":"url","type":"string","nullable":true,"metadata":{}},{"name":"phone","type":"string","nullable":true,"metadata":{}},{"name":"sfzh","type":"string","nullable":true,"metadata":{}},{"name":"chrome","type":"string","nullable":true,"metadata":{}},{"name":"ipv4","type":"string","nullable":true,"metadata":{}},{"name":"ipv6","type":"string","nullable":true,"metadata":{}},{"name":"date","type":"long","nullable":true,"metadata":{}},{"name":"time","type":"long","nullable":true,"metadata":{}},{"name":"mac_address","type":"string","nullable":true,"metadata":{}},{"name":"col_tinyint","type":"integer","nullable":true,"metadata":{}},{"name":"col_smallint","type":"integer","nullable":true,"metadata":{}},{"name":"col_mediumint","type":"integer","nullable":true,"metadata":{}},{"name":"col_bigint","type":"long","nullable":true,"metadata":{}},{"name":"col_decimal","type":"double","nullable":true,"metadata":{}},{"name":"col_double","type":"double","nullable":true,"metadata":{}},{"name":"col_float","type":"double","nullable":true,"metadata":{}},{"name":"col_time","type":"long","nullable":true,"metadata":{}},{"name":"col_blob","type":"string","nullable":true,"metadata":{}},{"name":"col_text","type":"string","nullable":true,"metadata":{}},{"name":"creation_date","type":"date","nullable":false,"metadata":{}}]}',  |
|   'spark.sql.sources.schema.partCol.0'='creation_date',  |
|   'transient_lastDdlTime'='1652437162')            |
+----------------------------------------------------+

select * from hudi_test_table limit 10;

问题1

2022-05-10 14:27:07,535 DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList – No credentials provided by SimpleAWSCredentialsProvider: org.apache.hadoop.fs.s3a.CredentialInitializationException: Access key or secret key is unset org.apache.hadoop.fs.s3a.CredentialInitializationException: Access key or secret key is unset at org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider.getCredentials(SimpleAWSCredentialsProvider.java:68) at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:137) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1166) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762) at …2022-05-10 14:27:07,535 DEBUG org.apache.hadoop.fs.s3a.AWSCredentialProviderList – No credentials provided by EnvironmentVariableCredentialsProvider: com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)) com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY)) at com.amazonaws.auth.EnvironmentVariableCredentialsProvider.getCredentials(EnvironmentVariableCredentialsProvider.java:50) at org.apache.hadoop.fs.s3a.AWSCredentialProviderList.getCredentials(AWSCredentialProviderList.java:137) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.getCredentialsFromContext(AmazonHttpClient.java:1166) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.runBeforeRequestHandlers(AmazonHttpClient.java:762) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:724) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) at …

方法1:


        fs.s3a.aws.credentials.provider
        com.amazonaws.auth.profile.ProfileCredentialsProvider

Linux

~/.aws/credentials
[default]
aws_access_key_id = your_access_key_id
aws_secret_access_key = your_secret_access_key
​
export AWS_ACCESS_KEY_ID=your_access_key_id
export AWS_SECRET_ACCESS_KEY=your_secret_access_key

Windows

C:\Users\LJH\.aws\credentials
[default]
aws_access_key_id = your_access_key_id
aws_secret_access_key = your_secret_access_key
​
set AWS_ACCESS_KEY_ID=AKIA4ZNT6QH3L45V45VY
set AWS_SECRET_ACCESS_KEY=og8I6vB52vDhhb/So/r9ioHMvtbJ4EI2xdGPQIce

方法2:

在 resources 下创建配置文件 core-site.xml


        fs.s3a.aws.credentials.provider
        org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

        fs.s3a.access.key
        AWS access key ID.

            Omit for IAM role-based or provider-based authentication.

        AKIA4ZNT6QH3L45V45VY

        fs.s3a.secret.key
        AWS secret key.

            Omit for IAM role-based or provider-based authentication.

        og8I6vB52vDhhb/So/r9ioHMvtbJ4EI2xdGPQIce

方法3:


        fs.s3a.aws.credentials.provider
        org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

val sc = spark.sparkContext
sc.hadoopConfiguration.set("fs.s3a.access.key", "AKIA4ZNT6QH3L45V45VY")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "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")

问题2

Exception in thread “main” org.apache.hadoop.fs.s3a.AWSRedirectException: getFileStatus on s3a://s3-datafacts-poc-001/dct/s3-datafacts-poc-001/dt=2022-05-09: com.amazonaws.services.s3.model.AmazonS3Exception: The bucket is in this region: cn-northwest-1. Please use this region to retry the request (Service: Amazon S3; Status Code: 301; Error Code: 301 Moved Permanently; Request ID: C1AV4REN3P1VGWY7; S3 Extended Request ID: 61G1/2kXI1/mqclSY6UUjFTzm4ANAtuqMiInnat0VZi/8vYVmyxkZ4GzuTnmjeAcwD02yWHl2eE=), S3 Extended Request ID: 61G1/2kXI1/mqclSY6UUjFTzm4ANAtuqMiInnat0VZi/8vYVmyxkZ4GzuTnmjeAcwD02yWHl2eE=:301 Moved Permanently: The bucket is in this region: cn-northwest-1. Please use this region to retry the request (Service: Amazon S3; Status Code: 301; Error Code: 301 Moved Permanently; Request ID: C1AV4REN3P1VGWY7; S3 Extended Request ID: 61G1/2kXI1/mqclSY6UUjFTzm4ANAtuqMiInnat0VZi/8vYVmyxkZ4GzuTnmjeAcwD02yWHl2eE=) at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:217) at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:151) at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2239) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) at … Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The bucket is in this region: cn-northwest-1. Please use this region to retry the request (Service: Amazon S3; Status Code: 301; Error Code: 301 Moved Permanently; Request ID: C1AV4REN3P1VGWY7; S3 Extended Request ID: 61G1/2kXI1/mqclSY6UUjFTzm4ANAtuqMiInnat0VZi/8vYVmyxkZ4GzuTnmjeAcwD02yWHl2eE=), S3 Extended Request ID: 61G1/2kXI1/mqclSY6UUjFTzm4ANAtuqMiInnat0VZi/8vYVmyxkZ4GzuTnmjeAcwD02yWHl2eE= at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1640) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058) at …

val sc = spark.sparkContext
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3a.cn-northwest-1.amazonaws.com.cn")

Linux

~/.aws/config

[default]
region = cn-northwest-1

export AWS_REGION=cn-northwest-1

Windows

C:\Users\USERNAME\.aws\config

[default]
region = cn-northwest-1

set AWS_REGION=cn-northwest-1

问题3

Exception in thread “main” java.nio.file.AccessDeniedException: s3a://s3-datafacts-poc-001/dct/s3-datafacts-poc-001/dt=2022-05-09: getFileStatus on s3a://s3-datafacts-poc-001/dct/s3-datafacts-poc-001/dt=2022-05-09: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: CB7P31ZHFVCTBZWM; S3 Extended Request ID: Xch4CDPv15SyJQzkaRS6WpJci4o9DR9W2yQcPALY7IgEF/hJutaedBZ3ft5FlaJg8hHrAaPvhNY=), S3 Extended Request ID: Xch4CDPv15SyJQzkaRS6WpJci4o9DR9W2yQcPALY7IgEF/hJutaedBZ3ft5FlaJg8hHrAaPvhNY=:403 Forbidden at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:230) at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:151) at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2239) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) at a:47) … Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: CB7P31ZHFVCTBZWM; S3 Extended Request ID: Xch4CDPv15SyJQzkaRS6WpJci4o9DR9W2yQcPALY7IgEF/hJutaedBZ3ft5FlaJg8hHrAaPvhNY=), S3 Extended Request ID: Xch4CDPv15SyJQzkaRS6WpJci4o9DR9W2yQcPALY7IgEF/hJutaedBZ3ft5FlaJg8hHrAaPvhNY= at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1640) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1058) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) at …

sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3a.cn-northwest-1.amazonaws.com.cn")

问题4

2022-05-10 14:38:15,669 DEBUG org.apache.hadoop.metrics2.impl.MetricsConfig – Could not locate file hadoop-metrics2.properties org.apache.commons.configuration2.ex.ConfigurationException: Could not locate: org.apache.commons.configuration2.io.FileLocator@75de29c0[fileName=hadoop-metrics2.properties,basePath=

在 resources 下创建配置文件 core-site.xml


        fs.s3a.aws.credentials.provider
        org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider

问题5

Caused by: java.lang.UnsupportedOperationException: Unsupported encoding: DELTA_BINARY_PACKED at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.initDataReader(VectorizedColumnReader.java:783) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readPageV2(VectorizedColumnReader.java:833) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.access$100(VectorizedColumnReader.java:54) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader$1.visit(VectorizedColumnReader.java:756) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader$1.visit(VectorizedColumnReader.java:742) at org.apache.parquet.column.page.DataPageV2.accept(DataPageV2.java:141) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readPage(VectorizedColumnReader.java:742) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:261) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:283) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:181) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:37) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:173) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93) at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:503) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755) at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:118) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1423) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1350) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1414) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1237) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:384) at org.apache.spark.rdd.RDD.iterator(RDD.scala:335) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

SparkSession = SparkSession.builder()
    .config("spark.sql.parquet.enableVectorizedReader", "false")
    .getOrCreate()

问题6

Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: Q3SD2NMRY1JHEGHE; S3 Extended Request ID: YW7hSvgBhRgUkInd9coS/BFfnO61SFclCZj4sVYPbY8YYy6zH1n5d8tHFU8I0f1AE7L4YfOZH18=; Proxy: null) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1862) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1415) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1384) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1154) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:811) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:779) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:753) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:713) at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:695) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:559) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:539) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5437) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5384) at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1445) at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:1381) at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$verifyBucketExists$1(S3AFileSystem.java:381) at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) … 95 more

sc.hadoopConfiguration.set("fs.s3a.access.key", "AKIA4ZNT6QH3L45V45VY")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "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")
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.cn-northwest-1.amazonaws.com.cn")

问题7

Caused by: java.io.FileNotFoundException: No such file or directory: s3a://s3-datafacts-poc-001/dw/ods/myhudidataset/.hoodie/hoodie.properties at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:716) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:906) at org.apache.hudi.common.fs.HoodieWrapperFileSystem.open(HoodieWrapperFileSystem.java:462) at org.apache.hudi.common.config.DFSPropertiesConfiguration.visitFile(DFSPropertiesConfiguration.java:106) … 91 more

删除 S3 上 hudi 表未成功创建的目录


            org.apache.spark
            spark-avro_${scala.binary.version}
            ${spark.version}
            ${project.build.scope}

问题8

Caused by: java.sql.SQLException: Error while processing statement: FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. org.apache.hadoop.fs.s3a.AWSBadRequestException: doesBucketExist on s3-datafacts-poc-001: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: S38N9ZTM2GHJSAY7; S3 Extended Request ID: gBLBCPfbJjdFYrRPkG6qCEHu3al+LQPr8dQ3TBhocD/E3Cq3Nc5KWZE9ub9vtk7dt1mWJRgQJmc=; Proxy: null), S3 Extended Request ID: gBLBCPfbJjdFYrRPkG6qCEHu3al+LQPr8dQ3TBhocD/E3Cq3Nc5KWZE9ub9vtk7dt1mWJRgQJmc=:400 Bad Request: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID: S38N9ZTM2GHJSAY7; S3 Extended Request ID: gBLBCPfbJjdFYrRPkG6qCEHu3al+LQPr8dQ3TBhocD/E3Cq3Nc5KWZE9ub9vtk7dt1mWJRgQJmc=; Proxy: null) at org.apache.hive.jdbc.HiveStatement.waitForOperationToComplete(HiveStatement.java:385) at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:254) at org.apache.hudi.hive.ddl.JDBCExecutor.runSQL(JDBCExecutor.java:57) … 55 more

Hudi同步数据到Hive,无法同步,需要指定hive user

HIVE_USER.key() -> "hive",
HIVE_PASS.key() -> "",

配置协议为 s3

spark.write
..

.save("s3://s3-datafacts-poc-001/dw/ods/hudi_test_table")

问题9

Caused by: java.io.FileNotFoundException: No such file or directory: s3a://s3-datafacts-poc-001/dw/ods/hudi_test_table/.hoodie/.temp/20220513161709/2022-05-13 at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2310) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2204) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2143) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerListFiles(S3AFileSystem.java:3148) at org.apache.hadoop.fs.s3a.S3AFileSystem.listFiles(S3AFileSystem.java:3129) at org.apache.hudi.table.marker.DirectWriteMarkers.lambda$createdAndMergedDataPaths$69cdea3b$1(DirectWriteMarkers.java:136) at org.apache.hudi.client.common.HoodieSparkEngineContext.lambda$flatMap$7d470b86$1(HoodieSparkEngineContext.java:78) at org.apache.spark.api.java.JavaRDDLike.$anonfun$flatMap$1(JavaRDDLike.scala:125)

本地windows下运行,在s3上创建的目录为

s3-datafacts-poc-001/dw/ods/hudi_test_table/.hoodie\.temp/20220513161709/2022-05-13/
windows下写入路径为 \ ,导致路径不正确,无法本地执行。

问题10

Caused by: java.lang.IllegalArgumentException: BlockAlignedAvroParquetWriter does not support scheme s3n at org.apache.hudi.common.fs.HoodieWrapperFileSystem.getHoodieScheme(HoodieWrapperFileSystem.java:159) at org.apache.hudi.common.fs.HoodieWrapperFileSystem.convertToHoodiePath(HoodieWrapperFileSystem.java:132) at org.apache.hudi.io.storage.HoodieParquetWriter.

spark.write
  ...

.save("s3://s3-datafacts-poc-001/dw/ods/hudi_test_table")

Original: https://blog.csdn.net/lingeio/article/details/124817817
Author: 訾零
Title: Spark3 读写 S3 Parquet, Hive, Hudi

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/818015/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球