Spark操作MySQL,Hive并写入MySQL数据库

最近一个项目,需要操作近70亿数据进行统计分析。如果存入MySQL,很难读取如此大的数据,即使使用搜索引擎,也是非常慢。经过调研决定借助我们公司大数据平台结合Spark技术完成这么大数据量的统计分析。

为了后期方便开发人员开发,决定写了几个工具类,屏蔽对MySQL及Hive的操作代码,只需要关心业务代码的编写。

工具类如下:

一. Spark操作MySQL

  1. 根据sql语句获取Spark DataFrame:

  2. 将Spark DataFrame 写入MySQL数据库表

  3. 根据条件删除MySQL表数据

  4. 保存DataFrame 到 MySQL中,如果表不存在的话,会自动创建

二、操作Spark

  1. 切换Spark环境

定义环境Profile.scala

定义SparkUtil.scala

import com.dmall.scf.Profile
import com.dmall.scf.dto.{Env, MySqlConfig}
import org.apache.spark.sql.{DataFrame, Encoder, SparkSession}

import scala.collection.JavaConversions._

/**
 * @descrption Spark工具类
 * scf
 * @author wangxuexing
 * @date 2019/12/23
 */
object SparkUtils {

//开发环境

undefined

val DEV_URL = “jdbc:mysql://IP:PORT/db_name?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false&useSSL=false”

undefined

val DEV_USER = “user”

undefined

val DEV_PASSWORD = “password”

undefined

//生产测试环境

undefined

val PROD_TEST_URL = “jdbc:mysql://IP:PORT/db_name?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false&zeroDateTimeBehavior=convertToNull&useSSL=false”

undefined

val PROD_TEST_USER = “user”

undefined

val PROD_TEST_PASSWORD = “password”

undefined

//生产环境

undefined

val PROD_URL = “jdbc:mysql://IP:PORT/db_name?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false&useSSL=false”

undefined

val PROD_USER = “user”

undefined

val PROD_PASSWORD = “password”

def env = Profile.currentEvn

  /**
   * 获取环境设置
   * @return
   */
  def getEnv: Env ={
    env match {
      case Profile.DEV => Env(MySqlConfig(DEV_URL, DEV_USER, DEV_PASSWORD), SparkUtils.getDevSparkSession)
      case Profile.PROD =>
        Env(MySqlConfig(PROD_URL,PROD_USER,PROD_PASSWORD), SparkUtils.getProdSparkSession)
      case Profile.PROD_TEST =>
        Env(MySqlConfig(PROD_TEST_URL, PROD_TEST_USER, PROD_TEST_PASSWORD), SparkUtils.getProdSparkSession)
      case _ => throw new Exception("无法获取环境")
    }
  }

  /**
   * 获取生产SparkSession
   * @return
   */
  def getProdSparkSession: SparkSession = {
    SparkSession
      .builder()
      .appName("scf")
      .enableHiveSupport()//激活hive支持
      .getOrCreate()
  }

  /**
   * 获取开发SparkSession
   * @return
   */
  def getDevSparkSession: SparkSession = {
    SparkSession
      .builder()
      .master("local[*]")
      .appName("local-1576939514234")
      .config("spark.sql.warehouse.dir", "C:\\data\\spark-ware")//不指定,默认C:\data\projects\parquet2dbs\spark-warehouse
      .enableHiveSupport()//激活hive支持
      .getOrCreate();
  }

  /**
   * DataFrame 转 case class
   * @param df DataFrame
   * @tparam T case class
   * @return
   */
  def dataFrame2Bean[T: Encoder](df: DataFrame, clazz: Class[T]): List[T] = {
    val fieldNames = clazz.getDeclaredFields.map(f => f.getName).toList
    df.toDF(fieldNames: _*).as[T].collectAsList().toList
  }
}

三、定义Spark操作流程

从MySQL或Hive读取数据->逻辑处理->写入MySQL

  1. 定义处理流程

SparkAction.scala

  1. 实现流程

KanbanAction.scala

  1. 实现具体业务逻辑

具体项目源码请参考:

Original: https://www.cnblogs.com/barrywxx/p/12325202.html
Author: BarryW
Title: Spark操作MySQL,Hive并写入MySQL数据库

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/8832/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

发表回复

登录后才能评论
免费咨询
免费咨询
扫码关注
扫码关注
联系站长

站长Johngo!

大数据和算法重度研究者!

持续产出大数据、算法、LeetCode干货,以及业界好资源!

2022012703491714

微信来撩,免费咨询:xiaozhu_tec

分享本页
返回顶部