dataframe

转:https://blog.csdn.net/weimingyu945/article/details/77981884

——————————————————————————————————-

基本操作:

创建和转换格式:

Pandas和Spark的DataFrame两者互相转换:

与Spark RDD的相互转换:

rdd_df = df.rdd df = rdd_df.toDF()

注:rdd转df前提是每个rdd的类型都是Row类型

fillna函数:

df.na.fill()

以原有列为基础添加列:

df = df.withColumn(‘count20’, df[“count”] – 20) # 新列为原有列的数据减去20

删除一列:

修改原有df[“xx”]列的所有值:

df = df.withColumn(“xx”, 1)

修改列的类型(类型投射):

df = df.withColumn(“year2”, df[“year1”].cast(“Int”))

合并2个表的join方法:

df_join = df_left.join(df_right, df_left.key == df_right.key, “inner”)

其中,方法可以为:inner, outer, left_outer, right_outer, leftsemi.

groupBy方法整合:

整合后GroupedData类型可用的方法(均返回DataFrame类型):

avg(*cols) —— 计算每组中一列或多列的平均值

count() —— 计算每组中一共有多少行,返回DataFrame有2列,一列为分组的组名,另一列为行总数

max(*cols) —— 计算每组中一列或多列的最大值

mean(*cols) —— 计算每组中一列或多列的平均值

min(*cols) —— 计算每组中一列或多列的最小值

sum(*cols) —— 计算每组中一列或多列的总和

【函数应用】将df的每一列应用函数f:

df.foreach(f) 或者 df.rdd.foreach(f)

【Map和Reduce应用】返回类型seqRDDs

df.map(func)
df.reduce(func)

解决toDF()跑出First 100 rows类型无法确定的异常,可以采用将Row内每个元素都统一转格式,或者判断格式处理的方法,解决包含None类型时转换成DataFrame出错的问题:

行元素查询操作:

像SQL那样打印列表前20元素(show函数内可用int类型指定要打印的行数):

df.show()
df.show(30)

以树的形式打印概要

df.printSchema()

获取头几行到本地:

list = df.head(3) # Example: [Row(a=1, b=1), Row(a=2, b=2), … …]
list = df.take(5) # Example: [Row(a=1, b=1), Row(a=2, b=2), … …]

输出list类型,list中每个元素是Row类:

list = df.collect()

注:此方法将所有数据全部导入到本地

查询总行数:

int_num = df.count()

查询某列为null的行:

from pyspark.sql.functions import isnull
df = df.filter(isnull(“col_a”))

列元素操作:

获取Row元素的所有列名:

r = Row(age=11, name=’Alice’)
print r.fields # [‘age’, ‘name’]

选择一列或多列:

排序:

df = df.sort(“age”, ascending=False)

过滤数据(filter和where方法相同):

df = df.filter(df[‘age’]>21)
df = df.where(df[‘age’]>21)

对null或nan数据进行过滤:
from pyspark.sql.functions import isnan, isnull
df = df.filter(isnull(“a”)) # 把a列里面数据为null的筛选出来(代表python的None类型)
df = df.filter(isnan(“a”)) # 把a列里面数据为nan的筛选出来(Not a Number,非数字数据)

SQL操作:

DataFrame注册成SQL的表:

df.createOrReplaceTempView(“TBL1”)

进行SQL查询(返回DataFrame):

conf = SparkConf()
ss = SparkSession.builder.appName(“APP_NAME”).config(conf=conf).getOrCreate()

df = ss.sql(“SELECT name, age FROM TBL1 WHERE age >= 13 AND age

Original: https://www.cnblogs.com/juan-F/p/11347541.html
Author: 骨灰盒少女
Title: dataframe

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/559985/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球