pyspark-02 DataFrame语法

初始化spark环境

创建spark的上下文环境, 然后就可以调用不同的函数进行操作

进行一些基础配置的环境创建

spark = SparkSession\
        .builder\
        .appName(app_name) \
        .config("spark.master", "yarn") \
        .config('spark.driver.memory', '8g')\
        .config("spark.executor.instances", 60) \
        .config('spark.executor.cores', 4)\
        .config('spark.executor.memory', '8g')\
        .getOrCreate()
  • spark.driver.memory:driver端内存
  • spark.executor.instances:executor数量
  • spark.executor.cores:每个executor的进程数
  • spark.executor.memory:每个executor的内存

一般上面4个参数就可以控制整个程序的资源大小

自定义python环境

有些executer可能python环境不一致,或者某些包也没有安装,需要在整个任务统一python环境。

1. 打包本地python环境
tar -zcvf py36.tar.gz /home/worker/envs/py36

2. 上传hdfs
hadoop fs -put py36.tar.gz hdfs:

3. spark初始化指定环境
spark = SparkSession\
        .builder\
        .appName(app_name) \
        .config('spark.driver.memory', '8g')\
        .config('spark.executor.cores', 4)\
        .config('spark.executor.memory', '8g')\
        .config("spark.yarn.dist.archives", "hdfs://path1/path2/pyenv/py36.tar.gz#python3env")\
        .config("spark.pyspark.python", "./python3env/py36/bin/python")\
        .getOrCreate()

常用语法

记录一些常用语法

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import udf

rdd = spark.sparkContext.parallelize([
    (1, 2., 'sdsd|sdsd:sdsd', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (1, 3., '20202_19001', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (1, 3., '20202_19001', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (1, 3., '20202_19001', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (1, 3., '20202_19001', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (1, 3., '20202_19001', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (1, 3., '20202_19001', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (1, 3., '20202_19001', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (1, 3., '20202_19001', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (1, 3., '20202_19001', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (2, 3., '20202_19001', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (2, 3., '20202_19001', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (2, 3., '20202_19001', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (2, 3., '20202_19001', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., '', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
])
df = spark.createDataFrame(rdd,
                           schema=['a', 'b', 'c', 'd', 'e'])
df.show()

+---+---+--------------+----------+-------------------+
|  a|  b|             c|         d|                  e|
+---+---+--------------+----------+-------------------+
|  1|2.0|sdsd|sdsd:sdsd|2000-01-01|2000-01-01 12:00:00|
|  1|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  1|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  1|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  1|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  1|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  1|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  1|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  1|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  1|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  2|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  2|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  2|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  2|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|              |2000-03-01|2000-01-03 12:00:00|
+---+---+--------------+----------+-------------------+


sdf = df.select(['a', 'b'])  # 选择指定列
sdf = df.select([F.col('a').alias('new_a'), 'b'])  # 选择指定列并重命名
sdf = df.select([F.col('a').cast('string'), 'b'])  # 选择指定列并指定类型

sdf.drop('a', 'b')  # 删除多列

去重
sdf = sdf.drop_duplicates(subset=['a'])

根据条件过滤
sdf.filter(F.col('a').isNotNull() | F.col('b').isNotNull())

填充空值,注意类型要一致
sdf.fillna(0, subset=['adcode'])  # 对指定的字段填充
sdf.fillna({'adcode': 0, 'name': '缺失'})  # 传一个dict进去,对指定的字段分别填充

抽样
sdf.sample(withReplacement=True, fraction=0.1)  # 有放回随机抽样10%
sdf.sampleBy('id', fractions=0.1)  # 根据id列分层抽样10%

建立索引
F.monotonically_increasing_id()函数只能保证生成的索引是单调递增且唯一的,
但不能保证索引的连续性和确定性(即每次运行时为每行生成的索引不保证相同),
这是因为对于分区存储的DataFrame,spark无法获得全局的宏观顺序,
该函数是通过DataFrame的分区ID来为每个分区内的数据生成一段连续单调递增的索引,但不同分区之间不能保证连续性,
所以该函数最多支持十亿个分区
df.select(F.monotonically_increasing_id().alias('index'),
          'a',
          'b').show()
+-----------+---+---+
|      index|  a|  b|
+-----------+---+---+
|25769803776|  1|2.0|
|51539607552|  1|3.0|
|77309411328|  3|4.0|
+-----------+---+---+

pyspark dataframe 转为 pandas dataframe
df_pd = df.toPandas()

指定分隔符划分后, 取不同的元素,一般用于统计分布后取top3的结果

df = spark.createDataFrame(data=[(1, '食品,上装,玩偶,潮玩'),
                                       (2, '上装,裤袜,球鞋,居家'),
                                       (3, '居家,球鞋,酒水饮料,潮玩')],
                                 schema=['id', 'type'])

df.show()
+---+-----------------------+
| id|                   type|
+---+-----------------------+
|  1|    食品,上装,玩偶,潮玩|
|  2|    上装,裤袜,球鞋,居家|
|  3|居家,球鞋,酒水饮料,潮玩|
+---+-----------------------+

split
df = df.withColumn('new_type', F.split('type', ','))
df = df.withColumn('type1', F.col('new_type').getItem(0))
df = df.withColumn('type2', F.col('new_type').getItem(1))
df = df.withColumn('type3', F.col('new_type').getItem(2))
df.show()

+---+-----------------------+----------------------------+-----+-----+--------+
| id|                   type|                    new_type|type1|type2|   type3|
+---+-----------------------+----------------------------+-----+-----+--------+
|  1|    食品,上装,玩偶,潮玩|    [食品, 上装, 玩偶, 潮玩]| 食品| 上装|    玩偶|
|  2|    上装,裤袜,球鞋,居家|    [上装, 裤袜, 球鞋, 居家]| 上装| 裤袜|    球鞋|
|  3|居家,球鞋,酒水饮料,潮玩|[居家, 球鞋, 酒水饮料, 潮玩]| 居家| 球鞋|酒水饮料|
+---+-----------------------+----------------------------+-----+-----+--------+

针对一个list类型的列,把里面每一个元素转为一行

df1 = df.select('id', 'new_type')
df1.show()
+---+----------------------------+
| id|                    new_type|
+---+----------------------------+
|  1|    [食品, 上装, 玩偶, 潮玩]|
|  2|    [上装, 裤袜, 球鞋, 居家]|
|  3|[居家, 球鞋, 酒水饮料, 潮玩]|
+---+----------------------------+

df1.select('id', F.explode('new_type').alias('new_type_explode')).show()
+---+----------------+
| id|new_type_explode|
+---+----------------+
|  1|            食品|
|  1|            上装|
|  1|            玩偶|
|  1|            潮玩|
|  2|            上装|
|  2|            裤袜|
|  2|            球鞋|
|  2|            居家|
|  3|            居家|
|  3|            球鞋|
|  3|        酒水饮料|
|  3|            潮玩|
+---+----------------+
df = df.withColumn('c', F.regexp_replace('c',  r'[\||:]', '_'))  # 正则替换
df.show()
+---+---+--------------+----------+-------------------+
|  a|  b|             c|         d|                  e|
+---+---+--------------+----------+-------------------+
|  1|2.0|sdsd_sdsd_sdsd|2000-01-01|2000-01-01 12:00:00|
|  1|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  1|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  1|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  1|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  1|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  1|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  1|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  1|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  1|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  2|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  2|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  2|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  2|3.0|   20202_19001|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|              |2000-03-01|2000-01-03 12:00:00|
+---+---+--------------+----------+-------------------+

if-else判断
F.lit()是定义一个常量
sdf_new = sdf.withColumn('type', F.when(F.col('b') < 130000, '直辖市').\
                                   when(F.col('b') >= 710000, '特别行政区').\
                                   otherwise(F.lit('常规省份')))

保留2位小数
df.withColumn('new_b', F.format_number(F.col('b'), 2))

进行split
df = df.withColumn('new_c', F.split(F.col('c'), '_'))

基础运算
df = df.withColumn("a", F.col('b')/F.col('c'))

sdf.join(sdf2, on=['id', 'name'], how='left')  # 多字段join
sdf.join(sdf2, sdf.id == sdf2.adcode, how='left')  # 混合字段join
sdf.join(sdf2, [sdf.id == sdf2.adcode, sdf.name == sdf2.region], how='left')  # 多字段混合字段join

小表关联
将该小表广播(broadcast)到所有executor中再进行连接,
sdf.join(F.broadcast(sdf2), on='id', how='left')

笛卡尔连接
sdf1.crossJoin(sdf2)

并集
sdf.union(sdf1).union(sdf2)

差集,
sdf1.subtract(sdf2)

交集
sdf1.intersect(sdf2)

聚合统计
temp_df = df.groupby('a').agg(F.count('c').alias('c_count'),
                              F.max('c').alias('c_max'),
                              F.min('c').alias('c_min'))

聚合成list
temp_df = df.groupby('a').agg(F.collect_list('c').alias('c_list'),
                              F.collect_set('c').alias('c_set'))

多列应用同一个聚合函数(根据a聚合,分别求b和c的最大值)
expressions = [F.max(col).alias('%s_max'%(col)) for col in ['b','c']]
df.groupby('a').agg(*expressions)

分组排序,对用户按登录时间排序,然后取每个用户最近一条登录记录

df = spark.createDataFrame(data=[(1, 20220101),
                                 (1, 20220102),
                                 (1, 20220103),
                                 (1, 20220104),
                                 (2, 20220102),
                                 (2, 20220107),
                                 (3, 20220101),
                                 (3, 20220103)],
                                 schema=['id', 'login_date'])

df.show()
+---+----------+
| id|login_date|
+---+----------+
|  1|  20220101|
|  1|  20220102|
|  1|  20220103|
|  1|  20220104|
|  2|  20220102|
|  2|  20220107|
|  3|  20220101|
|  3|  20220103|
+---+----------+

from pyspark.sql import Window
win_func = Window.partitionBy('id').orderBy(F.desc('login_date'))

df = df.withColumn('rn', F.row_number().over(win_func))
df = df.filter(F.col('rn') == 1)
df.show()
+---+----------+---+
| id|login_date| rn|
+---+----------+---+
|  1|  20220104|  1|
|  2|  20220107|  1|
|  3|  20220103|  1|
+---+----------+---+

pyspark有2个方式提供分区,

df.coalesce(100)  # 不shuffle生成100个分区
df.reparation(100)  # shuffle生成100个分区
df.reparation(100, 'id')  # shuffle后根据id列的hash值生成100个分区
df.repartitionByRange(100, 'id')  # shuffle后根据id列的取值生成100个分区

读写hdfs

save
df.coalesce(1).write.option("sep", "#").option("header", "true").csv('demo_data',mode='overwrite')

read
df = spark. \
     read.format('csv') \
     .option('delimiter', '#') \
     .load(hdfs_data_path, header=False, inferSchema="true")

Original: https://blog.csdn.net/CoutEndl/article/details/126442204
Author: Evangelion-02
Title: pyspark-02 DataFrame语法

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/678335/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球