JAVA SparkSQL初始和创建DataFrame的几种方式

一、前述

1、SparkSQL介绍

Hive是Shark的前身,Shark是SparkSQL的前身,SparkSQL产生的根本原因是其完全脱离了Hive的限制。

  • SparkSQL支持查询原生的RDD。 RDD是Spark平台的核心概念,是Spark能够高效的处理大数据的各种场景的基础。
  • 能够在Scala中写SQL语句。支持简单的SQL语法检查,能够在Scala中写Hive语句访问Hive数据,并将结果取回作为RDD使用。

2、Spark on Hive和Hive on Spark

  • Spark on Hive: Hive只作为储存角色, Spark负责sql解析优化,执行。 Hive on Spark: Hive即作为存储又 *负责sql的解析优化,Spark负责执行。

二、基础概念

1、DataFrame

DataFrame也是一个分布式数据容器。与RDD类似,然而 DataFrame更像传统数据库的二维表格, 除了数据以外,还掌握数据的结构信息,即 schema 同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上 看, DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。

DataFrame的底层封装的是RDD,只不过RDD的泛型是Row类型。

2、SparkSQL的数据源

SparkSQL的数据源可以是JSON类型的字符串,JDBC,Parquent,Hive,HDFS等。

3、SparkSQL底层架构

首先拿到sql后解析一批未被解决的逻辑计划,再经过分析得到分析后的逻辑计划,再经过一批优化规则转换成一批最佳优化的逻辑计划,再经过SparkPlanner的策略转化成一批物理计划,

随后经过消费模型转换成一个个的Spark任务执行。

4、谓词下推(predicate Pushdown)

三、创建DataFrame的几种方式

1、读取json格式的文件创建DataFrame

  • json文件中的json数据不能嵌套json格式数据。
  • DataFrame是一个一个Row类型的RDD,df.rdd()/df.javaRdd()。
  • 可以两种方式读取json格式的文件。
  • df.show()默认显示前20行数据。
  • DataFrame原生API可以操作DataFrame(不方便)。
  • *注册成临时表时,表中的列默认按ascii顺序显示列。

2、通过json格式的RDD创建DataFrame

java代码:

3、非json格式的RDD创建DataFrame(重要)

1) 通过反射的方式将非json格式的RDD转换成DataFrame(不建议使用)

  • 自定义类要可序列化
  • 自定义类的访问级别是Public
  • RDD转成DataFrame后会根据映射将字段按Assci码排序
  • 将DataFrame转换成RDD时获取字段两种方式, 一种是df.getInt(0)下标获取(不推荐使用),另一种是df.getAs(“列名”)获取(推荐使用)
  • *关于序列化问题:

1.反序列化时serializable 版本号不一致时会导致不能反序列化。
2.子类中实现了serializable接口,父类中没有实现,父类中的变量不能被序列化,序列化后父类中的变量会得到null。
注意:父类实现serializable接口,子类没有实现serializable接口时,子类可以正常序列化
3.被关键字transient修饰的变量不能被序列化。
4.静态变量不能被序列化,属于类,不属于方法和对象,所以不能被序列化。
另外:一个文件多次writeObject时,如果有相同的对象已经写入文件,那么下次再写入时,只保存第二次写入的引用,读取时,都是第一次保存的对象。

java代码:

2) 动态创建Schema将非json格式的RDD转换成DataFrame(建议使用)

4、读取parquet文件创建DataFrame

注意:

  • 可以将DataFrame存储成parquet文件。保存成parquet文件的方式有两种
    *

java;gutter:true;
df.write().mode(SaveMode.Overwrite).format("parquet").save("./sparksql/parquet");
df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");

  • SaveMode指定文件保存时的模式。

Overwrite:覆盖

Append:追加

ErrorIfExists:如果存在就报错

Ignore:如果存在就忽略

java代码:

5、读取JDBC中的数据创建DataFrame(MySql为例)

两种方式创建DataFrame

java代码:

常见的action操作

1、count
count:对dataset中的记录数进行统计个数的操作

2、first
first:获取数据集中的第一条数据

3、reduce
reduce:对数据集中的所有数据进行归约的操作,多条变成一条

4、show
show,默认将dataset数据打印前20条

5、take
take,从数据集中获取指定条数

6、collect
collect:将分布式存储在集群上的分布式数据集(比如dataset),中的所有数据都获取到driver端来

7、foreach
foreach:遍历数据集中的每一条数据,对数据进行操作,这个跟collect不同,collect是将数据获取到driver端进行操作
foreach是将计算操作推到集群上去分布式执行
foreach(println(_))这种,真正在集群中执行的时候,是没用的,因为输出的结果是在分布式的集群中的,我们是看不到的

Original: https://www.cnblogs.com/Allen-rg/p/11375754.html
Author: 静悟生慧
Title: JAVA SparkSQL初始和创建DataFrame的几种方式

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/560035/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球