【spark】【hive】spark读取hive数据再存回hive

1、maven依赖pom.xml文件

xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0modelVersion>
    <artifactId>spark-hive-to-hiveartifactId>
    <groupId>com.xiaostudy.sparkgroupId>
    <version>1.0-SNAPSHOTversion>

    <properties>
        <spark.version>2.3.2spark.version>
        <hadoop.version>2.7.2hadoop.version>
        <hippo.version>5.1.3.0hippo.version>
        <scala.version>2.11.12scala.version>
        <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
        <maven.compiler.source>1.8maven.compiler.source>
        <maven.compiler.target>1.8maven.compiler.target>
    properties>

    <dependencies>

        <dependency>
            <groupId>com.google.guavagroupId>
            <artifactId>guavaartifactId>
            <version>15.0version>
        dependency>

        <dependency>
            <groupId>com.alibabagroupId>
            <artifactId>fastjsonartifactId>
            <version>1.2.67version>
        dependency>

        <dependency>
            <groupId>org.apache.hadoopgroupId>
            <artifactId>hadoop-commonartifactId>
            <version>${hadoop.version}version>
        dependency>

        <dependency>
            <groupId>org.apache.hadoopgroupId>
            <artifactId>hadoop-hdfsartifactId>
            <version>${hadoop.version}version>
        dependency>

        <dependency>
            <groupId>org.apache.sparkgroupId>
            <artifactId>spark-core_2.11artifactId>
            <version>${spark.version}version>
            <scope>providedscope>
        dependency>

        <dependency>
            <groupId>org.apache.sparkgroupId>
            <artifactId>spark-sql_2.11artifactId>
            <version>${spark.version}version>
            <scope>providedscope>
        dependency>

        <dependency>
            <groupId>org.apache.sparkgroupId>
            <artifactId>spark-streaming_2.11artifactId>
            <version>${spark.version}version>
            <scope>providedscope>
        dependency>

        <dependency>
            <groupId>io.codis.jodisgroupId>
            <artifactId>jodisartifactId>
            <version>0.5.1version>
        dependency>

        <dependency>
            <groupId>joda-timegroupId>
            <artifactId>joda-timeartifactId>
            <version>2.9.9version>
        dependency>

        <dependency>
            <groupId>org.elasticsearch.clientgroupId>
            <artifactId>elasticsearch-rest-clientartifactId>
            <version>7.7.0version>
        dependency>

    dependencies>

    <repositories>
        <repository>
            <id>centralid>
            <url>https://repo1.maven.org/maven2url>
        repository>
    repositories>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.pluginsgroupId>
                <artifactId>maven-compiler-pluginartifactId>
                <version>3.2version>
                <configuration>
                    <source>1.8source>
                    <target>1.8target>
                configuration>
            plugin>
            <plugin>
                <groupId>org.apache.maven.pluginsgroupId>
                <artifactId>maven-assembly-pluginartifactId>
                <version>3.1.0version>
                <configuration>
                    <appendAssemblyId>falseappendAssemblyId>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependenciesdescriptorRef>
                    descriptorRefs>
                    <archive>
                        <manifest>
                            <addClasspath>trueaddClasspath>
                            <classpathPrefix>lib/classpathPrefix>
                            <mainClass>com.xiaostudy.spark.SparkHiveToHivemainClass>
                        manifest>
                    archive>
                configuration>
                <executions>
                    <execution>
                        <id>make-assemblyid>
                        <phase>packagephase>
                        <goals>
                            <goal>singlegoal>
                        goals>
                    execution>
                executions>
            plugin>
        plugins>
    build>

project>

2、项目结构

【spark】【hive】spark读取hive数据再存回hive

3、任务入口SparkHiveToHive类

package com.xiaostudy.spark;

import com.xiaostudy.spark.entity.YcEntity;
import com.xiaostudy.spark.util.RowUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.text.ParseException;

public class SparkHiveToHive {
    private static final Logger logger = LoggerFactory.getLogger(SparkHiveToHive.class);

    public static void main(String[] args) throws ParseException {
        hiveToHive();
    }

    private static boolean hiveToHive() throws ParseException {
        String viewName = "viewName";
        String querySql = "select sjsj,ds from test.hive_table group by sjsj,ds";

        try {
            String warehouseLocation = new File("/user/hive/warehouse").getAbsolutePath();
            SparkSession spark = SparkSession
                    .builder()
                    .appName("SparkHiveToHive")
                    .config("spark.querySql.warehouse.dir", warehouseLocation)
                    .config("spark.port.maxRetries", "100")
                    .enableHiveSupport()
                    //.master("local[2]")
                    .getOrCreate();

            spark.sql("show databases").show();
            Dataset rowDataset = spark.sql(querySql);

            rowDataset.show(5);
            logger.info(String.format("rowDataset.count():%d", rowDataset.count()));
            JavaRDD rowJavaRDD = rowDataset.toJavaRDD()
                    .map(row -> RowUtils.setYcEntity(row))
                    .filter(ycEntity -> null != ycEntity && null != ycEntity.getDs());

            Dataset dataFrame = spark.createDataFrame(rowJavaRDD, YcEntity.class);
            dataFrame.createOrReplaceTempView(viewName);
            String insertSql = String.format("insert into test.hive_yc partition(dt=20210422) select sjsj,ds from %s", viewName);
            spark.sql(insertSql);

            spark.stop();
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

}

4、逻辑处理并返回实体类

package com.xiaostudy.spark.util;

import com.xiaostudy.spark.entity.YcEntity;
import org.apache.spark.sql.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RowUtils {

    private static final Logger logger = LoggerFactory.getLogger(RowUtils.class);

    public static YcEntity setYcEntity(Row row) {
        YcEntity ycEntity = new YcEntity();
        try {

            // 逻辑处理,这里例子

            String sjsj = row.getAs("sjsj");
            String ds = row.getAs("ds");

            if (null == ds || ds.trim().length() ) {
                return ycEntity;
            }

            ycEntity.setSjsj(sjsj);
            ycEntity.setDs(ds);

            return ycEntity;
        } catch (Exception e) {
            logger.error("程序异常");
            logger.error(e.getMessage(), e);
            return new YcEntity();
        }
    }
}

5、实体类

package com.xiaostudy.spark.entity;

public class YcEntity {

    //数据日期 yyyy-MM-dd HH-mm-ss
    private String sjsj;

    private String ds;

    public String getSjsj() {
        return sjsj;
    }

    public void setSjsj(String sjsj) {
        this.sjsj = sjsj;
    }

    public String getDs() {
        return ds;
    }

    public void setDs(String ds) {
        this.ds = ds;
    }
}

6、运行命令,例如:

/.../spark/bin/spark-submit --name SparkHiveToHive --master yarn --deploy-mode client --conf spark.dynamicAllocation.enabled=false --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 --class com.xiaostudy.spark.SparkHiveToHive /.../spark-hive-to-hive.jar

Original: https://www.cnblogs.com/xiaostudy/p/14690452.html
Author: xiaostudy
Title: 【spark】【hive】spark读取hive数据再存回hive

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/6775/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

免费咨询
免费咨询
扫码关注
扫码关注
联系站长

站长Johngo!

大数据和算法重度研究者!

持续产出大数据、算法、LeetCode干货,以及业界好资源!

2022012703491714

微信来撩,免费咨询:xiaozhu_tec

分享本页
返回顶部