RDD、DataFrame、Dataset三者三者之间转换

转化:

RDD、DataFrame、Dataset三者有许多共性,有各自适用的场景常常需要在三者之间转换
DataFrame/Dataset转RDD:

这个转换很简单

val rdd1=testDF.rdd
val rdd2=testDS.rdd
RDD转DataFrame:

import spark.implicits._
val testDF = rdd.map {line=>
      (line._1,line._2)
    }.toDF("col1","col2")

一般用元组把一行的数据写在一起,然后在toDF中指定字段名
RDD转Dataset:

import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = rdd.map {line=>
      Coltest(line._1,line._2)
    }.toDS

可以注意到,定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可
Dataset转DataFrame:

这个也很简单,因为只是把case class封装成Row

import spark.implicits._
val testDF = testDS.toDF
DataFrame转Dataset:

import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = testDF.as[Coltest]

这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便
特别注意:

在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然toDF、toDS无法使用

package dataframe

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

//
// Explore interoperability between DataFrame and Dataset. Note that Dataset
// is covered in much greater detail in the ‘dataset’ directory.

//
object DatasetConversion {

case class Cust(id: Integer, name: String, sales: Double, discount: Double, state: String)

case class StateSales(state: String, sales: Double)

def main(args: Array[String]) {
val spark =
SparkSession.builder()
.appName(“DataFrame-DatasetConversion”)
.master(“local[4]”)
.getOrCreate()

import spark.implicits._

// create a sequence of case class objects
// (we defined the case class above)
val custs = Seq(
Cust(1, “Widget Co”, 120000.00, 0.00, “AZ”),
Cust(2, “Acme Widgets”, 410500.00, 500.00, “CA”),
Cust(3, “Widgetry”, 410500.00, 200.00, “CA”),
Cust(4, “Widgets R Us”, 410500.00, 0.0, “CA”),
Cust(5, “Ye Olde Widgete”, 500.00, 0.0, “MA”)
)

// Create the DataFrame without passing through an RDD
val customerDF : DataFrame = spark.createDataFrame(custs)
//
// println(“ DataFrame schema”)
//
// customerDF.printSchema()
//
// println(“
DataFrame contents”)
//
// customerDF.show()

// +—+—————+——–+——–+—–+
//| id| name| sales|discount|state|
//+—+—————+——–+——–+—–+
//| 1| Widget Co|120000.0| 0.0| AZ|
//| 2| Acme Widgets|410500.0| 500.0| CA|
//| 3| Widgetry|410500.0| 200.0| CA|
//| 4| Widgets R Us|410500.0| 0.0| CA|
//| 5|Ye Olde Widgete| 500.0| 0.0| MA|
//+—+—————+——–+——–+—–+

//
// println(“*** Select and filter the DataFrame”)
//
val smallerDF =
customerDF.select(“sales”, “state”).filter($”state”.equalTo(“CA”))
//
// smallerDF.show()

//
// +——–+—–+
//| sales|state|
//+——–+—–+
//|410500.0| CA|
//|410500.0| CA|
//|410500.0| CA|
//+——–+—–+

///////////////////////////////////////////////////////////////////////////////////

// Convert it to a Dataset by specifying the type of the rows — use a case
// class because we have one and it’s most convenient to work with. Notice
// you have to choose a case class that matches the remaining columns.

// BUT also notice that the columns keep their order from the DataFrame —
// later you’ll see a Dataset[StateSales] of the same type where the
// columns have the opposite order, because of the way it was created.

val customerDS : Dataset[StateSales] = smallerDF.as[StateSales]
//
// println(“ Dataset schema”)
//
// customerDS.printSchema()
//
// println(“
Dataset contents”)
//
// customerDS.show()

// Select and other operations can be performed directly on a Dataset too,
// but be careful to read the documentation for Dataset — there are
// “typed transformations”, which produce a Dataset, and
// “untyped transformations”, which produce a DataFrame. In particular,
// you need to project using a TypedColumn to gate a Dataset.

// val verySmallDS : Dataset[Double] = customerDS.select($”sales”.as[Double])
//
// println(“*** Dataset after projecting one column”)
//
// verySmallDS.show()

//
//+——–+
//| sales|
//+——–+
//|410500.0|
//|410500.0|
//|410500.0|
//+——–+

// If you select multiple columns on a Dataset you end up with a Dataset
// of tuple type, but the columns keep their names.

val tupleDS : Dataset[(String, Double)] =
customerDS.select($”state”.as[String], $”sales”.as[Double])
//
// println(“*** Dataset after projecting two columns — tuple version”)
//
// tupleDS.show()

//
//+—–+——–+
//|state| sales|
//+—–+——–+
//| CA|410500.0|
//| CA|410500.0|
//| CA|410500.0|
//+—–+——–+

// You can also cast back to a Dataset of a case class. Notice this time
// the columns have the opposite order than the last Dataset[StateSales]
// val betterDS: Dataset[StateSales] = tupleDS.as[StateSales]
//
// println(“*** Dataset after projecting two columns — case class version”)
//
// betterDS.show()

//
//+—–+——–+
//|state| sales|
//+—–+——–+
//| CA|410500.0|
//| CA|410500.0|
//| CA|410500.0|
//+—–+——–+

// Converting back to a DataFrame without making other changes is really easy
// val backToDataFrame : DataFrame = tupleDS.toDF()
//
// println(“*** This time as a DataFrame”)
//
// backToDataFrame.show()
//

//+—–+——–+
//|state| sales|
//+—–+——–+
//| CA|410500.0|
//| CA|410500.0|
//| CA|410500.0|
//+—–+——–+

//
// // While converting back to a DataFrame you can rename the columns
val renamedDataFrame : DataFrame = tupleDS.toDF(“MyState”, “MySales”)

println(“*** Again as a DataFrame but with renamed columns”)

renamedDataFrame.show()

// +——-+——–+
//|MyState| MySales|
//+——-+——–+
//| CA|410500.0|
//| CA|410500.0|
//| CA|410500.0|
//+——-+——–+

Original: https://www.cnblogs.com/alamps/p/8333959.html
Author: alamps
Title: RDD、DataFrame、Dataset三者三者之间转换

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/560184/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球