【spark sql】spark(八)sparkSQL概述:dataFrame、DataSet、UDF、SparkSQL数据源

通过本文介绍sparksql的dataFrame、DataSet、UDF、SparkSQL数据源,来对sparksql有一个完整的了解

文章目录

一. spark sql概述

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame和DataSet。 Spark
SQL是将Spark SQL转换成RDD,然后提交到集群中运行。

1. spark sql的由来

spark sql 前生是shark,但shark在兼容hive方面依赖hive过多,比如hive的语法解析器、查询优化器等。

spark sql在hive兼容方面仅依赖HQL parser、Hive Metastore、Hive SerDe。简单的说,从HQL被解析成抽象语法树后,就全部有Spark SQL接管了。其中 执行计划生成和执行计划的优化都由Catalyst负责。

2. spark sql的运行流程

通用的sql执行计划

  1. 词法和语法解析,并生成逻辑计划
  2. 绑定:sql和实际的数据表绑定
  3. 优化Optimze:生成最优执行计划
  4. 执行并返回查询数据。

描述SparkSQL的执行过程之前先看下SparkSQL的构成:

core:处理数据的输入和输出,并将数据源转换为DataFrame
Catalyst:处理查询语句的整个过程:包括Sql的解析、绑定、优化、物理计划等。
Hive:对hive数据的处理
Hive-thrift server:提供client、JDBC等入口。

spark sql对sql语句的处理类似于上述关系型数据库的方法,其中:

spark sql先将sql语句进行解析形成一个Tree,然后使用Rule对Tree进行绑定、优化等处理。
sparkSQL查询优化器是catalyst,它负责对查询语句的解析、绑定、优化和生成物理计划等。

过程如下:

【spark sql】spark(八)sparkSQL概述:dataFrame、DataSet、UDF、SparkSQL数据源

1. 语法和词法解析:比如解析上图sql中哪些是关键词、表达式、projection(select选择的列的集合)、datasource等,并判断sql语法是否规范,并形成逻辑计划。

2. 绑定: Analyzer使用Analysis Rules + 元数据,将sql和数据库中的数据字典(列、表、视图等)进行绑定,如果projection和datasource都在,则表示这个sql是可以执行的。

3. 优化: Optimizer使用Optimization Rules,将绑定的逻辑计划进行例如:合并、列裁剪、过滤器下推的优化工作后生成优化的逻辑计划。

4. 形成物理计划: 对优化后的逻辑计划进行转换形成可执行的物理计划。根据性能统计数据,选择最佳的物理执行计划(costModel),生成物理执行计划树,得到SparkPlan。

5. 执行: 进行preparation规则进行处理,最后调用spark plan的execute执行计算RDD。

; 3. spark sql的原理ing

SparkContext运行原理分析

spark sql的过程大致分为这几个过程:
1.通过Sqlparse 转成unresolvedLogicplan
2.通过Analyzer转成 resolvedLogicplan
3.通过optimizer转成 optimzedLogicplan
4.通过sparkplanner转成physicalLogicplan
5.通过prepareForExecution 转成executable logicplan
6.通过toRDD等方法执行executedplan去调用tree的doExecute

总体执行流程如下图:从(SQL,Dataset, dataframe)开始,依次经过未解析的逻辑计划—>解析的逻辑计划—>优化的逻辑计划—>物理计划—>根据cost based优化,选取一条物理计划进行执行。

【spark sql】spark(八)sparkSQL概述:dataFrame、DataSet、UDF、SparkSQL数据源

; 二. sparkSQL的编程抽象:DataFrame和Dataset

1. DataFrame

1.1. 概述

DataFrame: 类似于数据库中的一张数据表,但只描述了字段名不知道每个字段的数据类型。

DataFrame和RDD的区别

  • RDD不了解数据结构,DataFrame是为数据提供了Schema的视图
    【spark sql】spark(八)sparkSQL概述:dataFrame、DataSet、UDF、SparkSQL数据源

编程入口: 在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口

dataframe的解析:

每一行的数据类型都是Row,导致每一列的值,只有通过解析才能获取。也可以通过字段的索引来访问数据(从0开始)

【spark sql】spark(八)sparkSQL概述:dataFrame、DataSet、UDF、SparkSQL数据源

其他场景

如果要写一些适配性很强的函数时,如果使用Dataset,行的类型又不确定,可能是各种case class,无法实现适配,这时候用DataFrame即Dataset[Row]就能比较好的解决问题。

; 1.2 创建DataFrame的三种方式

通过Spark的数据源进行创建;
从一个存在的RDD进行转换;
还可以从HiveTable进行查询返回。

如下从json数据源中创建

json格式的源文件可以直接转为DataFrame格式的数据

scala> spark.read.json("file:/home/hadoop/tmpdata/test.json")
scala> res0.show
+---+----------+
|age|      name|
+---+----------+
| 20|    gaogao|
| 21|liangliang|
| 16|  xiaoxiao|
| 18|    lanlan|
+---+----------+

1.3 RDD与DataFrame相互转换

注意:如果需要RDD与DF或DS之间操作,那么都需要引入 import spark.implicits._ 【spark不是包名,而是sparkSession对象的名称】

1.3.1 单列和多列

单列:

【spark sql】spark(八)sparkSQL概述:dataFrame、DataSet、UDF、SparkSQL数据源
多列:
scala> sc.makeRDD(Array((1,"xiaoxiao",23),(2,"lanlan",21),(3,"liaoliao",15)))
scala> res23.toDF("id","name","age")
scala> res24.show
1.3.2 RDD到dataframe

【spark sql】spark(八)sparkSQL概述:dataFrame、DataSet、UDF、SparkSQL数据源

sparkSQL能够自动将包含case类的RDD通过toDF/toDS转换为DataFrame或DataSet

; 1.3.3 case class

【spark sql】spark(八)sparkSQL概述:dataFrame、DataSet、UDF、SparkSQL数据源

2. DataSet

2.1 概述

DataSet是分布式的数据集合,DataSet和java对象类似:关心数据的结构和属性。相比DataFrame,Dataset提供了编译时类型检查。

Spark2.0合并DataSet和DataFrame数据集合API,DataFrame变成DataSet的子集。

使用API尽量使用DataSet ,不行再选用DataFrame,其次选择RDD。

2.2 DataSet的转换

2.2.1 RDD转DataSet

【spark sql】spark(八)sparkSQL概述:dataFrame、DataSet、UDF、SparkSQL数据源

sparkSQL能够自动将包含case类的RDD通过toDF/toDS转换为DataFrame或DataSet

; 2.2.2 Seq序列到DataSet

【spark sql】spark(八)sparkSQL概述:dataFrame、DataSet、UDF、SparkSQL数据源
2.2.3. DataFrame到DataSet

【spark sql】spark(八)sparkSQL概述:dataFrame、DataSet、UDF、SparkSQL数据源
; 2.2.4. DataSet到RDD
scala> val toRDD=ds.rdd
scala> toRDD.foreach(println)
P1(11,11)
P1(22,22)
P1(33,13)

3. RDD、DataFrame和DataSet小结

3.1. 三者相互转换

【spark sql】spark(八)sparkSQL概述:dataFrame、DataSet、UDF、SparkSQL数据源
如果同样的数据都给到这三个数据结构,计算之后都会给出相同的结果。但执行效率和执行方式是不同的。

; 3.2. 三者的相同点

  1. 三者都有惰性机制
  2. 三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出。
  3. 三者都有partition的概念
  4. 在对DataFrame和Dataset进行操作许多操作都需要这个包(import spark.implicits._)进行支持
  5. DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型

三. spark sql 的UDF函数

1. 实现UDF

【spark sql】spark(八)sparkSQL概述:dataFrame、DataSet、UDF、SparkSQL数据源

; 2. (对于强数据类型)实现UDAF函数

【spark sql】spark(八)sparkSQL概述:dataFrame、DataSet、UDF、SparkSQL数据源

UDAF函数

【spark sql】spark(八)sparkSQL概述:dataFrame、DataSet、UDF、SparkSQL数据源
【spark sql】spark(八)sparkSQL概述:dataFrame、DataSet、UDF、SparkSQL数据源

四. spark sql 数据源

1. Parquet

Spark SQL的默认数据源为Parquet格式,Parquet是一种列式存储格式。

【spark sql】spark(八)sparkSQL概述:dataFrame、DataSet、UDF、SparkSQL数据源

数据保存模式

"error"(default)  : 如果文件存在,则报错
"append" : 追加
"overwrite" : 覆写
"ignore"  : 数据存在,则忽略

2. json文件

自动推断:sparkSql能够自动推断JSON数据集的结构,并将它加载成DataFrame
加载:通过SparkSession.read.json()加载一个JSON文件

Spark中JSON文件的格式:不是一个传统的JSON文件,每一行都得是一个JSON串

{“name”:”Michael”}
{“name”:”Andy”, “age”:30}

读取和保存方式:

【spark sql】spark(八)sparkSQL概述:dataFrame、DataSet、UDF、SparkSQL数据源

; 3. JDBC数据源


val connectionProperties = new Properties()
connectionProperties.put("user", "root")
connectionProperties.put("password", "000000")
val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop102:3306/rdd", "rddtable", connectionProperties)

jdbcDF2.write.jdbc("jdbc:mysql://hadoop102:3306/rdd", "db", connectionProperties)

4. Hive数据库

spark使用内置hive

    展示表
    scala> spark.sql("show tables").show
    +--------+---------+-----------+
    |database|tableName|isTemporary|
    +--------+---------+-----------+
    创建临时表
    scala> jdbcDF.createTempView("stu1")
    scala> spark.sql("show tables").show
    +--------+---------+-----------+
    |database|tableName|isTemporary|
    +--------+---------+-----------+
    |        |     stu1|       true|
    +--------+---------+-----------+
    创建表并导入数据
    scala> spark.sql("create table hhh(id int)")
    scala> spark.sql("load data local inpath '/home/hadoop/tmpdata/h1' into table hhh")
    scala> spark.sql("select * from hhh").show
    +----+
    |  id|
    +----+
    |   1|
    |   2|
    |   3|
    |null|
    +----+

代码中集成Hive

(1)添加依赖:


<dependency>
    <groupId>org.apache.sparkgroupId>
    <artifactId>spark-hive_2.11artifactId>
    <version>2.1.1version>
dependency>

<dependency>
    <groupId>org.apache.hivegroupId>
    <artifactId>hive-execartifactId>
    <version>1.2.1version>
dependency>

(2)使用hive源


val warehouseLocation: String = new File("spark-warehouse").getAbsolutePath

val spark = SparkSession.builder().appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()

参考:
SparkSql运行原理详细解析

Original: https://blog.csdn.net/hiliang521/article/details/127183722
Author: roman_日积跬步-终至千里
Title: 【spark sql】spark(八)sparkSQL概述:dataFrame、DataSet、UDF、SparkSQL数据源

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/818216/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球