Spark SQL(二):DataFrame APIs

文章目录

一.主要DataFrame APIs

函数概述DataFrame.agg(exprs)对不带组的整个DataFrame进行聚合(df.groupBy().agg()的缩写)。DataFrame.alias(alias)返回一个设置了别名的新DataFrame。DataFrame.collect()以Row列表的形式返回所有记录。DataFrame.columns以列表形式返回所有列名。DataFrame.count ()返回此Dataframe中的行数。DataFrame.describe(cols)计算数字列和字符串列的基本统计信息。DataFrame.distinct()返回一个新的DataFrame,其中包含该DataFrame中不同的行。DataFrame.drop(cols)返回删除指定列的新DataFrame。DataFrame.dropna([how, thresh, subset])返回一个新的DataFrame,省略带空值的行。DataFrame.dtypes以列表形式返回所有列名及其数据类型。DataFrame.fillna(value[, subset])替换空值,na.fill()的别名。DataFrame.filter(condition)使用给定条件筛选行。DataFrame.first ()将第一行作为row返回。DataFrame.groupBy(cols)使用指定的列对DataFrame进行分组,以便对它们运行聚合。DataFrame.head([n])返回前n行。DataFrame.join(other[, on, how])使用给定的Join表达式加入另一个DataFrame。DataFrame.limit(num)将结果计数限制为指定的数字。DataFrame.printSchema()以树格式输出模式。DataFrame.replace(to_replace[, value, subset])返回一个新的DataFrame,将一个值替换为另一个值。DataFrame.select(cols)投射一组表达式并返回一个新的DataFrame。DataFrame.show([n, truncate, vertical])将前n行打印到控制台。DataFrame.sort(cols, *kwargs)返回一个按指定列排序的新DataFrame。DataFrame.tail(num)以Row列表的形式返回最后一个num行。DataFrame.take(num)返回第一个num行作为Row列表。DataFrame.toDF(cols)返回带有新指定列名的新DataFrameDataFrame.toJSON([use_unicode])将DataFrame转换为字符串的RDD。DataFrame.toPandas()返回这个DataFrame的内容为Pandas panda .DataFrame。DataFrame.withColumn(colName, col)通过添加列或替换具有相同名称的现有列,返回一个新的DataFrame。

二.部分DataFrame APIs

1.DataFrame.agg

(1)概述
其作用:对不带组的整个DataFrame进行聚合(df.groupBy().agg()的缩写)

DataFrame.agg(*exprs)

(2)实例

df.agg({"age": "max"}).show()

from pyspark.sql import functions as F
df.agg(F.min(df.age)).show()

运行结果:
数据源

Spark SQL(二):DataFrame APIs
Spark SQL(二):DataFrame APIs

2.DataFrame.alias

(1)概述
其作用:返回一个设置了别名的新DataFrame。

DataFrame.alias(alias)

参数:
alisa:str
为DataFrame设置的别名。

(2)实例

from pyspark.sql.functions import *
df_t1 = df.alias('df_t1')
df_t2 = df.alias('df_t2')
joined_df = df_t1.join(df_t2,col("df_t1.name") == col("df_t2.name"), 'inner')
joined_df.select('df_t1.name','df_t2.name','df_t2.age').sort(desc("df_t1.age")).show()

运行结果:

Spark SQL(二):DataFrame APIs

3.DataFrame.colRegex

(1)概述
其作用:根据指定为正则表达式的列名选择列,并将其作为列返回。

DataFrame.colRegex(colName)

参数:
colName:str
字符串,作为正则表达式指定的列名

(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',13),('Y2',23),('Y3',33)],['name','age'])
df.select(df.colRegex("(name)?+.+")).show()

运行结果:

Spark SQL(二):DataFrame APIs

4.DataFrame.collect

(1)概述
其作用:以Row列表的形式返回所有记录。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',13),('Y2',23),('Y3',33)],['name','age'])
df.collect()

运行结果:

Spark SQL(二):DataFrame APIs

5.DataFrame.columns

(1)概述
其作用:以列表形式返回所有列名。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',13),('Y2',23),('Y3',33)],['name','age'])
df.columns

运行结果:

Spark SQL(二):DataFrame APIs

6.DataFrame.count

(1)概述
其作用:返回此 DataFrame 中的行数。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',13),('Y2',23),('Y3',33)],['name','age'])
df.count()

运行结果:

Spark SQL(二):DataFrame APIs

7.DataFrame.createGlobalTempView

(1)概述
其作用:使用此 DataFrame 创建全局临时视图。

此临时视图的生存期绑定到此 Spark 应用程序。如果目录中已经存在视图名称,则抛出 TempTableAlreadyExistsException。

(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',13),('Y2',23),('Y3',33)],['name','age'])
df.createGlobalTempView('Y')
df1 = spark.sql('select * from global_temp.Y')
sorted(df.collect())==sorted(df1.collect())

运行结果:

Spark SQL(二):DataFrame APIs

8.DataFrame.createOrReplaceGlobalTempView

(1)概述
其作用:使用给定的名称创建或替换全局临时视图。

这个临时视图的生命周期与这个Spark应用程序绑定。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',13),('Y2',23),('Y3',33)],['name','age'])
df.createOrReplaceGlobalTempView('Y')
df2 = df.filter(df.age>23)
df2.createOrReplaceGlobalTempView('Y')
df3 = spark.sql('select * from global_temp.Y')
sorted(df3.collect())==sorted(df2.collect())

运行结果:

Spark SQL(二):DataFrame APIs

9.DataFrame.createOrReplaceTempView

(1)概述
其作用:使用此 DataFrame 创建或替换本地临时视图。

这个临时表的生存期绑定到用于创建这个表的 SparkSession
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1', 13), ('Y2', 23), ('Y3', 33)], ['name', 'age'])
df.createOrReplaceTempView('Y')
df2 = df.filter(df.age > 23)
df2.createOrReplaceTempView('Y')
df3 = spark.sql("select * from Y")
sorted(df3.collect()) == sorted(df2.collect())

运行结果:

Spark SQL(二):DataFrame APIs

10.DataFrame.createTempView

(1)概述
其作用:使用此 DataFrame 创建本地临时视图。

此临时表的生存期与用于创建此 DataFrame 的 SparkSession 绑定。如果目录中已经存在视图名称,则抛出 TempTableAlreadyExistsException。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1', 13), ('Y2', 23), ('Y3', 33)], ['name', 'age'])
df.createTempView('Y')

df1 = spark.sql("select * from Y")
sorted(df.collect()) == sorted(df1.collect())

运行结果:

Spark SQL(二):DataFrame APIs

11.DataFrame.crossJoin

(1)概述
其作用:返回带有另一个DataFrame的笛卡尔积。

DataFrame.crossJoin(other)

参数:
other:DataFrame
笛卡尔积的右边。

(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',13),('Y2',23),('Y3',33)],['name','age'])
df1 = spark.createDataFrame([('Y1',178),('Y2',156),('Y3',189)],['name','height'])
df.show()
df1.show()
df.crossJoin(df1.select('height')).select("name", "age", "height").show()

运行结果:

Spark SQL(二):DataFrame APIs
Spark SQL(二):DataFrame APIs

12.DataFrame.cube

(1)概述
其作用:使用指定的列为当前的 DataFrame 创建一个多维数据集,这样我们就可以对它们运行聚合。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',13),('Y2',23),('Y3',33)],['name','age'])
df.cube('name',df.age).count().orderBy('name','age').show()

运行结果:

Spark SQL(二):DataFrame APIs

13.DataFrame.describe

(1)概述
其作用:计算数字列和字符串列的基本统计信息。

这包括count、mean、stddev、min和max。如果没有给出列,则该函数计算所有数字或字符串列的统计信息。

(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',13),('Y2',23),('Y3',33)],['name','age'])
df.describe(['age']).show()

运行结果:

Spark SQL(二):DataFrame APIs

14.DataFrame.distinct

(1)概述
其作用:返回一个新的DataFrame,其中包含该DataFrame中不同的行。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',13),('Y2',23),('Y3',33)],['name','age'])
df.show()
df.distinct().show()

运行结果:

Spark SQL(二):DataFrame APIs

15.DataFrame.drop

(1)概述
其作用:返回删除指定列的新DataFrame。如果模式不包含给定的列名,则此操作为不可操作的。

DataFrame.drop(*cols)

参数:
cols: str or :class: Column
列或要删除的列的名称

(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',13),('Y2',23),('Y3',33)],['name','age'])
df.drop(df.age).show()

运行结果:

Spark SQL(二):DataFrame APIs

16.DataFrame.dropDuplicates

(1)概述
其作用:返回删除了重复行的新数据帧,可以选择仅考虑某些列。

对于静态批处理数据帧,它只会删除重复的行。对于流式数据帧,它会将触发器中的所有数据保持为中间状态,以删除重复的行。您可以使用withWatermark()限制复制数据的延迟时间,系统将相应地限制状态。此外,为了避免任何重复的可能性,比水印更早的数据将被删除。

drop_duplicates()是dropDuplicates()的别名。

(2)实例

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',23,180),('Y2',23,181),('Y1',33,180),('Y1',23,180)],['name','age','height'])
df.show()
df.dropDuplicates().show()
df.dropDuplicates(['name','height']).show()

运行结果:

Spark SQL(二):DataFrame APIs

17.DataFrame.dropna

(1)概述
其作用:返回一个新的DataFrame,省略带空值的行。
DataFrame.dropna()和DataFrameNaFunctions.drop()是彼此的别名。

DataFrame.dropna(how='any', thresh=None, subset=None)

参数:
how:str(可选)
“any “或”all “。如果’ any ‘,则删除包含空值的行。如果是’ all ‘,只在该行的所有值为null时,删除该行。
thresh:int(可选)
默认 None ,如果指定,则删除具有小于thresh非空值的行。这会覆盖how参数。
subset:str,tuple or list(可选)
要考虑的可选列名列表。

(2)实例

import numpy as np
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',23,180),('Y2',23,181),('Y1',33,180),('Y1',23,180)],['name','age','height']).toPandas()
df.age[1] =np.nan
df1=spark.createDataFrame(df)
df1.show()
df1.dropna('any').show()

运行结果:

Spark SQL(二):DataFrame APIs

18.DataFrame.dtypes

(1)概述
其作用:以列表形式返回所有列名及其数据类型。
(2)实例

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',23,180),('Y2',23,181),('Y1',33,180),('Y1',23,180)],['name','age','height'])
df.dtypes

运行结果:

Spark SQL(二):DataFrame APIs

19.DataFrame.exceptAll

(1)概述
其作用:返回一个新的DataFrame,包含该DataFrame中的行,但不包含另一个DataFrame中的行,同时保留重复的行。

这相当于SQL中的EXCEPT ALL。作为SQL中的标准,这个函数按位置(而不是按名称)解析列。
(2)实例

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame([('Y1',23),('Y2',23),('Y4',33),('Y1',23)],['name','age'])
df2 = spark.createDataFrame([('Y1',23),('Y2',23)],['name','age'])
df1.exceptAll(df2).show()

运行结果:

Spark SQL(二):DataFrame APIs

20.DataFrame.explain

(1)概述
其作用:将(逻辑和物理)计划打印到控制台以进行调试。

DataFrame.explain(extended=None, mode=None)

extended:bool(可选)
默认为False。如果为False,则仅打印物理计划。如果这是一个没有指定模式的字符串,它将按照指定的模式工作。
mode:str(可选)
指定计划的预期输出格式。
simple:只打印一个实际的计划。
extended:打印逻辑和物理计划。
codegen:打印物理计划和生成的代码(如果可用)。
cost:打印逻辑计划和统计数据(如果可用)。
formatted:将解释输出分成两个部分:物理计划大纲和节点细节。

(2)实例

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',23),('Y2',23),('Y4',33),('Y1',23)],['name','age'])
df.explain()

运行结果:

Spark SQL(二):DataFrame APIs

21.DataFrame.fillna

(1)概述
其作用:替换空值,na.fill()的别名。
DataFrame.fillna()和DataFrameNaFunctions.fill()是彼此的别名。

DataFrame.fillna(value, subset=None)

参数:value:int,float,string,bool or dict
值替换空值。如果值是一个dict,则忽略子集,value必须是从列名(字符串)到替换值的映射。替换值必须是int、float、boolean或string。

(2)实例

import numpy as np
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',23,180),('Y2',23,181),('Y1',33,180),('Y1',23,180)],['name','age','height']).toPandas()
df.age[1] =np.nan
df.height[2]=np.nan
df1=spark.createDataFrame(df)
df1.show()
df1.fillna({'age': 999, 'height': 999}).show()

运行结果:

Spark SQL(二):DataFrame APIs

22.DataFrame.filter

(1)概述
其作用:使用给定条件筛选行。
其中where()是filter()的别名。

DataFrame.filter(condition)

参数:
condition:column or str
一个types.BooleanType的列或SQL表达式的字符串。

(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',13),('Y2',23),('Y3',33)],['name','age'])
df.filter(df.age>23).show()

运行结果:

Spark SQL(二):DataFrame APIs

23.DataFrame.first

(1)概述
其作用:将第一行作为row返回。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',13),('Y2',23),('Y3',33)],['name','age'])
df.first()

运行结果:

Spark SQL(二):DataFrame APIs

24.DataFrame.groupBy

(1)概述
其作用:使用指定的列对DataFrame进行分组,以便对它们运行聚合。
groupby()是groupBy()的别名。

DataFrame.groupBy(*cols)

参数:
cols:list,str or column
要分组的列。每个元素应该是一个列名(字符串)或一个表达式(列)。

(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',13),('Y2',23),('Y1',33)],['name','age'])
df.groupBy('name').agg({'age': 'avg'}).show()

运行结果:

Spark SQL(二):DataFrame APIs

25.DataFrame.head

(1)概述
其作用:返回前n行。

DataFrame.head(n=None)

参数:
n:int (可选)
默认的1。返回的行数。
注意:
这个方法只应该在结果数组很小的情况下使用,因为所有的数据都被加载到驱动程序的内存中。

(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',13),('Y2',23),('Y1',33)],['name','age'])
df.head(2)

运行结果:

Spark SQL(二):DataFrame APIs

26.DataFrame.hint

(1)概述
其作用:指定当前DataFrame上的一些提示。

DataFrame.hint(name, *parameters)

参数:
name:str
提示的名称。
parameters:str, list, float or int
可选参数。

(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',13),('Y2',23),('Y3',33)],['name','age'])
df1 = spark.createDataFrame([('Y1',167),('Y2',156),('Y3',180)],['name','height'])
df.join(df1.hint("broadcast"), "name").show()

运行结果:

Spark SQL(二):DataFrame APIs

27.DataFrame.intersectAll

(1)概述

其作用:返回一个新的DataFrame,包含这个DataFrame和另一个DataFrame中的行,同时保留重复的行。

这相当于SQL中的INTERSECT ALL。作为SQL中的标准,这个函数按位置(而不是按名称)解析列。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',13),('Y2',23),('Y3',33)],['name','age'])
df1 = spark.createDataFrame([('Y1',13),('Y2',23)],['name','age'])
df.intersectAll(df1).sort('name','age').show()

运行结果:

Spark SQL(二):DataFrame APIs

28.DataFrame.join

(1)概述
其作用:使用给定的连接表达式与另一个DataFrame连接。

DataFrame.join(other, on=None, how=None)

参数:
other:DataFrame
连接的右侧
on:str, list or Column(可选)
用于连接列名、列名列表、连接表达式(column)或列列表的字符串。如果on是一个字符串或指示连接列名称的字符串列表,则连接列必须在两边都存在,这将执行相等连接。
how:str(可选)
默认内部。必须是以下选项之一:inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti.

(2)实例

from pyspark.sql import SparkSession
from pyspark.sql.functions import desc

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',13),('Y2',23),('Y3',33)],['name','age'])
df1 = spark.createDataFrame([('Y1',13,168),('Y2',23,189)],['name','age','height'])
cond = [df.name == df1.name,df.age==df1.age]
df.join(df1,cond,'outer').select(df.name,df1.age,df1.height).sort(desc('height')).show()

运行结果:

Spark SQL(二):DataFrame APIs

29.DataFrame.limit

(1)概述
其作用:将结果计数限制为指定的数字。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',13),('Y2',23),('Y3',33)],['name','age'])
df.limit(2).show()

运行结果:

Spark SQL(二):DataFrame APIs

30.DataFrame.mapInPandas

(1)概述
其作用:使用 Python 本机函数在当前的 DataFrame 中映射一个批处理迭代器,该函数接受并输出 pandas DataFrame,并将结果作为 DataFrame 返回。

函数应采用pandas.DataFrames的迭代器,并返回pandas.DataFrames的另一个迭代器。所有列作为pandas.DataFrames的迭代器一起传递给函数,返回的pandas.DataFrames迭代器作为DataFrame组合。每个pandas.DataFrame大小都可以由spark.sql.execution.arrow.maxRecordsPerBatch控制。

DataFrame.mapInPandas(func, schema)

参数:
func:function
一个接受pandas迭代器的Python本机函数。DataFrames,并输出一个pandas.DataFrames迭代器。
schema:pyspark.sql.types.DataType or str
PySpark中func的返回类型。该值可以是pyspark.sql.types.DataType对象,也可以是ddl格式的类型字符串。

(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',13),('Y2',23),('Y3',33)],['name','age'])
def func(iterator):
    for i in iterator:
        yield i[i.name == 'Y1']
df.mapInPandas(func,df.schema).show()

运行结果:

Spark SQL(二):DataFrame APIs

31.DataFrame.orderBy

(1)概述
其作用:返回一个按指定列排序的新DataFrame。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',23),('Y2',13),('Y3',33)],['name','age'])
df.orderBy(desc('age')).show()

运行结果:

Spark SQL(二):DataFrame APIs

32.DataFrame.printSchema

(1)概述
其作用:以树格式打印架构。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',23),('Y2',13),('Y3',33)],['name','age'])
df.printSchema()

运行结果:

Spark SQL(二):DataFrame APIs

33.DataFrame.repartition

(1)概述
其作用:返回一个由给定分区表达式分区的新DataFrame。生成的DataFrame是散列分区的。

DataFrame.repartition(numPartitions, *cols)

参数:
numPartitions:int
可以是一个int类型来指定分区的目标数量,也可以是一个Column。如果它是Column,它将被用作第一个分区列。如果未指定,则使用默认的分区数。
cols:str or Column
分区列。
在1.6版更改:添加可选参数来指定分区列。如果指定了分区列,还使numPartitions成为可选的。

(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',22),('Y2',11),('Y3',33),('Y4',44),('Y5',55)],['name','age'])
data = df.repartition(3,'age')
data.show()
data.rdd.getNumPartitions()

运行结果:

Spark SQL(二):DataFrame APIs

34.DataFrame.repartitionByRange

(1)概述
其作用:返回一个由给定分区表达式分区的新DataFrame。产生的DataFrame是范围分区的。

必须指定至少一个分区表达式。如果没有明确指定排序顺序,则假定”升序空优先”。

由于性能原因,此方法使用抽样来估计范围。因此,输出可能不一致,因为采样可能返回不同的值。样本大小可以通过配置spark.sql.execution.rangeExchange.sampleSizePerPartition控制。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',22),('Y2',11),('Y3',33),('Y4',44),('Y5',55)],['name','age'])
data = df.repartitionByRange(2,'age')
data.show()
data.rdd.getNumPartitions()

运行结果:

Spark SQL(二):DataFrame APIs

35.DataFrame.replace

(1)概述
其作用:返回一个新的DataFrame,将一个值替换为另一个值。

DataFrame.replace()和DataFrameNaFunctions.replace()是彼此的别名。要替换的值和值必须具有相同的类型,并且只能是数字、布尔值或字符串。值可以没有。替换时,新值将转换为现有列的类型。对于数字替换,所有要替换的值都应具有唯一的浮点表示形式。如果发生冲突(例如{42: -1, 42.0: 1})和任意替换将被使用。

DataFrame.replace(to_replace, value=<no value>, subset=None)

参数:
to_replace:bool, int, float, string, list or dict
要替换的值。如果该值是dict,则该值将被忽略或可以忽略,并且to_replace必须是值和替换之间的映射。

(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',22),('Y2',11),('Y3',33)],['name','age'])
df.show()
df.replace(33,66).show()

运行结果:

Spark SQL(二):DataFrame APIs

36.DataFrame.rollup

(1)概述
其作用:使用指定的列为当前DataFrame创建多维汇总,以便对它们运行聚合。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',22),('Y2',11),('Y3',33)],['name','age'])
df.rollup('name',df.age).count().orderBy('age').show()

运行结果:

Spark SQL(二):DataFrame APIs

37.DataFrame.sameSemantics

(1)概述
其作用:当两个 DataFrames 中的逻辑查询计划相等并因此返回相同的结果时,返回 True。

这里的相等比较通过容忍外观上的差异(如属性名)来简化。
这个API可以非常快地比较两个DataFrame,但仍然可以返回False的DataFrame返回相同的结果,例如,从不同的计划。以缓存为例,这种假否定语义很有用。

这个API是一个开发人员API。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df1 = spark.range(5)
df2 = spark.range(5)
df1.show()
df1.withColumn("id", df1.id * 2).sameSemantics(df2.withColumn("id", df2.id * 2))

运行结果:

Spark SQL(二):DataFrame APIs

38.DataFrame.sample

(1)概述
其作用:返回此DataFrame的采样子集。

DataFrame.sample(withReplacement=None, fraction=None, seed=None)

参数:
withReplacement:bool(可选)
样本是否替换(默认为False)。
fraction:float
要生成的行的百分数,范围[0.0,1.0]。
seed:int(可选)
采样种子(默认为随机种子)。

这并不能保证提供给定DataFrame总计数的精确部分。
分数是必需的,替换和种子是可选的。

(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.range(5)
df.show()
df.sample(fraction=0.5,seed=2).count()

运行结果:

Spark SQL(二):DataFrame APIs

39.DataFrame.schema

(1)概述
其作用:以pyspark.sql.types.StructType的形式返回该DataFrame的模式。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',22),('Y2',11),('Y3',33)],['name','age'])
df.schema

运行结果:

Spark SQL(二):DataFrame APIs

40.DataFrame.select

(1)概述
其作用:投射一组表达式并返回一个新的DataFrame。

DataFrame.select(*cols)

参数:
cols:str,Column or list
列名(字符串)或表达式(列)。如果其中一个列名是’ * ‘,该列将被展开以包含当前DataFrame中的所有列。

(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',22),('Y2',11),('Y3',33)],['name','age'])
df.select('name').show()

运行结果:

Spark SQL(二):DataFrame APIs

41.DataFrame.selectExpr

(1)概述
其作用:投射一组SQL表达式并返回一个新的DataFrame。

这是接受SQL表达式的select()的变体。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',22),('Y2',-11),('Y3',33)],['name','age'])
df.selectExpr('name',"age * 2","abs(age)").show()

运行结果:

Spark SQL(二):DataFrame APIs

42.DataFrame.semanticHash

(1)概述
其作用:根据此DataFrame返回逻辑查询计划的哈希码。

与标准哈希代码不同,哈希是根据通过容忍外观差异(如属性名)而简化的查询计划计算的。
这个API是一个开发人员API。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',22),('Y2',-11),('Y3',33)],['name','age'])
df.semanticHash()

运行结果:

Spark SQL(二):DataFrame APIs

43.DataFrame.show

(1)概述
其作用:将前n行打印到控制台。

DataFrame.show(n=20, truncate=True, vertical=False)

参数:
n:int(可选)
要显示的行数。
truncate:bool or int(可选)
如果设置为True,则默认截断长度超过20个字符的字符串。如果设置为大于1的数字,则将长字符串截断为长度截断并将单元格右对齐。
vertical:bool(可选)
如果设置为True,则垂直打印输出行(每列值一行)。

(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',22),('Y2',-11),('Y3',33)],['name','age'])
df.show()

运行结果:

Spark SQL(二):DataFrame APIs

44.DataFrame.sort

(1)概述
其作用:返回一个按指定列排序的新DataFrame。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',22),('Y2',-11),('Y3',33)],['name','age'])
df.sort(df.age.desc()).show()

运行结果:

Spark SQL(二):DataFrame APIs

45.DataFrame.storageLevel

(1)概述
其作用:获取DataFrame的当前存储级别。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',22),('Y2',-11),('Y3',33)],['name','age'])
df.storageLevel

运行结果:

Spark SQL(二):DataFrame APIs

46.DataFrame.summary

(1)概述
其作用:计算数值列和字符串列的指定统计信息。可用的统计数据有:- count – mean – stddev – min – max -任意指定的近似百分比(例如,75%)

如果没有给出统计信息,则此函数计算count、mean、stddev、min、近似四分位数(百分比分别为25%、50%和75%)和max。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',22),('Y2',-11),('Y3',33)],['name','age'])
df.summary().show()

运行结果:

Spark SQL(二):DataFrame APIs

47.DataFrame.tail

(1)概述
其作用:以Row列表的形式返回最后一个num行。

运行tail需要将数据移动到应用程序的驱动进程中,并且使用非常大的num会导致驱动进程使用OutOfMemoryError崩溃。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',22),('Y2',-11),('Y3',33)],['name','age'])
df.tail(2)

运行结果:

Spark SQL(二):DataFrame APIs

48.DataFrame.take

(1)概述
其作用:返回第一个num行作为Row列表。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',22),('Y2',-11),('Y3',33)],['name','age'])
df.take(2)

运行结果:

Spark SQL(二):DataFrame APIs

49.DataFrame.toDF

(1)概述
其作用:返回带有新指定列名的新DataFrame
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',22),('Y2',-11),('Y3',33)])
df.toDF('name','age').show()

运行结果:

Spark SQL(二):DataFrame APIs

50.DataFrame.toJSON

(1)概述
其作用:将DataFrame转换为字符串的RDD。

每一行作为返回的RDD中的一个元素被转换成一个JSON文档。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',22),('Y2',-11),('Y3',33)],['name','age'])
df.toJSON().first()

运行结果:

Spark SQL(二):DataFrame APIs

51.DataFrame.toLocalIterator

(1)概述
其作用:返回一个迭代器,该迭代器包含该DataFrame中的所有行。迭代器将消耗与该DataFrame中最大分区相同的内存。使用预取,它可能会消耗多达2个最大分区的内存。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',22),('Y2',-11),('Y3',33)],['name','age'])
list(df.toLocalIterator())

运行结果:

Spark SQL(二):DataFrame APIs

52.DataFrame.toPandas

(1)概述
其作用:返回这个DataFrame的内容为Pandas panda .DataFrame。

这仅在安装了Pandas并可用时可用。
这个方法只应该在Pandas的DataFrame很小的情况下使用,因为所有的数据都被加载到驱动程序的内存中。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',22),('Y2',-11),('Y3',33)],['name','age'])
df.toPandas()

运行结果:

Spark SQL(二):DataFrame APIs

53.DataFrame.transform

(1)概述
其作用:返回一个新的DataFrame。用于链接自定义转换的简洁语法。

DataFrame.transform(func)

参数:
func:function
一个接受并返回DataFrame的函数。

(2)实例

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([(1, 1.0), (2, 2.0)], ["int", "float"])
def cast_all_to_int(df):
    return df.select([col(col_name).cast('int') for col_name in df.columns])
def sort_columns_asc(df):
    return df.select(*sorted(df.columns))
df.transform(cast_all_to_int).transform(sort_columns_asc).show()

运行结果:

Spark SQL(二):DataFrame APIs

54.DataFrame.unionByName

(1)概述
其作用:返回一个新的DataFrame,其中包含该DataFrame和另一个DataFrame中的行并集。

这与SQL中的UNION ALL和UNION DISTINCT都不同。要执行sql风格的集合合并(执行元素重复删除),请使用此函数,然后使用distinct()。

这个函数和union()的区别在于,这个函数通过名称(而不是位置)解析列。

当参数allowMissingColumns为True时,这个DataFrame和其他DataFrame中的列名集可以不同;缺少的列将用null填充。此外,该DataFrame缺失的列将被添加到联合结果的模式的末尾。
(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col3"])
df1.unionByName(df2, allowMissingColumns=True).show()

运行结果:

Spark SQL(二):DataFrame APIs

55.DataFrame.withColumn

(1)概述
其作用:通过添加列或替换具有相同名称的现有列,返回一个新的DataFrame。

列表达式必须是这个DataFrame上的表达式;试图从其他DataFrame中添加列将引发错误。

DataFrame.withColumn(colName, col)

参数:
colName:str
字符串,新列的名称。
col:Column
新列的Column表达式

这种方法在内部引入了投影。因此,多次调用它,例如,通过循环,为了添加多个列可能会产生大的计划,这可能会导致性能问题,甚至StackOverflowException。为了避免这种情况,一次对多个列使用select()。

(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',22),('Y2',-11),('Y3',33)],['name','age'])
df.withColumn('new_age',df.age+4).show()

运行结果:

Spark SQL(二):DataFrame APIs

56.DataFrame.withColumnRenamed

(1)概述
其作用:通过重命名现有列返回一个新的DataFrame。如果模式不包含给定列名,则此操作为no-op。

DataFrame.withColumnRenamed(existing, new)

参数:
existing:str
字符串,要重命名的现有列的名称。
new:str
字符串,列的新名称。

(2)实例

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([('Y1',22),('Y2',-11),('Y3',33)],['name','age'])
df.withColumnRenamed('age','new_age').show()

运行结果:

Spark SQL(二):DataFrame APIs

Original: https://blog.csdn.net/yuanfate/article/details/120810452
Author: 奋斗的源
Title: Spark SQL(二):DataFrame APIs

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/678029/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球