dataframe常用API(python)

文章目录

创建dataframe

spark3推荐使用sparksession来创建spark会话,然后利用使用sparksession创建出来的application来创建dataframe。
下面是两种创建方式,效果是相同的:

conf = SparkConf().setAppName('featureEngineering').setMaster('local')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
Samples = spark.read.format('csv').option('header', 'true').load(ResourcesPath)
spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()

对于所有的spark功能,sparksession类都是入口,所以创建基础的sparksession只需要使用sparksession.builder()。

select

if __name__ == '__main__':
    movieResourcesPath = r"E:\projects\SparrowRecSys-master\src\main\resources\webroot\sampledata\smallratings.csv"
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    movieSamples = spark.read.format('csv').option("sep",",").option('header', 'true').load(movieResourcesPath)
    movieSamples.select("movieId").show()

dataframe常用API(python)

collect

collect方法将已经存储的dataframe数据从存储器中收集回来,并返回一个数组,包括datafame中的所有行。但是,当数据集很大或者分区数据集很大时,很容易让驱动器崩溃。数据收集到驱动器中进行计算,就不是分布式并行计算了,而是串行计算,会更慢,所以,除了常看小数据,一般吧建议使用。

count

count方法用来计算数据集dataframe中行的个数,返回dataframe集合的行数。

if __name__ == '__main__':
    movieResourcesPath = r"E:\projects\SparrowRecSys-master\src\main\resources\webroot\sampledata\smallratings.csv"
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    movieSamples = spark.read.format('csv').option("sep",",").option('header', 'true').load(movieResourcesPath)
    print(movieSamples.count())

dataframe常用API(python)

limit

limit()限制输出,只保留Top_N,不是Action操作。

if __name__ == '__main__':
    movieResourcesPath = r"E:\projects\SparrowRecSys-master\src\main\resources\webroot\sampledata\smallratings.csv"
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    movieSamples = spark.read.format('csv').option("sep",",").option('header', 'true').load(movieResourcesPath)
    movieSamples.limit(5).show()

效果和下面的代码是一样的

if __name__ == '__main__':
    movieResourcesPath = r"E:\projects\SparrowRecSys-master\src\main\resources\webroot\sampledata\smallratings.csv"
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    movieSamples = spark.read.format('csv').option("sep",",").option('header', 'true').load(movieResourcesPath)
    movieSamples.show(5)

dataframe常用API(python)

distinct

distinct方法用来去除数据集中的重复项,返回一个不包含重复记录的dataframe。这里的重复项指的是两行的数据完全相同。
该方法和dropDuplicates()方法不传入指定字段时的结果相同。

if __name__ == '__main__':
    movieResourcesPath = r"E:\projects\SparrowRecSys-master\src\main\resources\webroot\sampledata\test.csv"
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    movieSamples = spark.read.format('csv').option("sep",",").option('header', 'true').load(movieResourcesPath)
    movieSamples1=movieSamples.distinct()
    movieSamples1.show()

filter

filter方法是一个常用的方法,用条件来过滤数据集,如果想选择某列中大于或小于某数的数据,就可以使用filter方法。

if __name__ == '__main__':
    movieResourcesPath = r"E:\projects\SparrowRecSys-master\src\main\resources\webroot\sampledata\test.csv"
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    movieSamples = spark.read.format('csv').option("sep",",").option('header', 'true').load(movieResourcesPath)
    movieSamples1=movieSamples.filter("id>3")
    movieSamples1.show()

dataframe常用API(python)

flatMap&map

flatMap是对dataframe中的数据进行整体操作的一个特殊方法。flatMap方法首先将函数应用于此数据集的所有元素,然后将结果展平,从而返回一个新的数据集。

map方法可以对dataframe数据集中的数据进行逐个操作,他与flatMap的不同之处在于,flatMap是将数据集中的数据作为一个整体去处理,之后再对其中的数据做计算,map则是直接对数据集中的数据做单独处理。

在这里插入代码片

groupBy & agg

groupBy 方法是将传入的数据进行分组,依据是作为参数传入的计算方法。一般与agg配合使用,例如groupBy(“id”).agg({“vale”:”max”}表示按照id进行分组,在每一组中选出Vale最大的值。max可替换成其他的函数,比如max,min,mean,sum,count等等。

if __name__ == '__main__':
    movieResourcesPath = r"E:\projects\SparrowRecSys-master\src\main\resources\webroot\sampledata\test.csv"
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    movieSamples = spark.read.format('csv').option("sep",",").option('header', 'true').load(movieResourcesPath)
    movieSamples.groupBy("id").agg({"vale":"max"}).show()

dataframe常用API(python)

drop

drop方法从数据集中删除某列,然后返回dataFrame类型。

if __name__ == '__main__':
    movieResourcesPath = r"E:\projects\SparrowRecSys-master\src\main\resources\webroot\sampledata\smallratings.csv"
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    movieSamples = spark.read.format('csv').option("sep",",").option('header', 'true').load(movieResourcesPath)
    movieSamples.drop("moiveId")

sort

sort方法对已有的dataframe重新排序,并将重新排序后的数据生成一个新的dataframe

if __name__ == '__main__':
    movieResourcesPath = r"E:\projects\SparrowRecSys-master\src\main\resources\webroot\sampledata\test.csv"
    spark = SparkSession.builder.\
        appName("test").\
        master("local[*]").\
        getOrCreate()
    movieSamples = spark.read.format('csv').option("sep",",").option('header', 'true').load(movieResourcesPath)
    movieSamples.sort("id").show()

dataframe常用API(python)

F.()

数据类型转换

F.col("movieId").cast(IntegerType())

F.avg
F.count

归一化

MinMaxScaler(inputCol="avgRatingVec", outputCol="scaleAvgRating")

管道

pipelineStage = [ratingScaler]
featurePipeline = Pipeline(stages=pipelineStage)
movieProcessedFeatures = featurePipeline.fit(movieFeatures).transform(movieFeatures)

Original: https://blog.csdn.net/weixin_42385782/article/details/127306409
Author: bugmaker.
Title: dataframe常用API(python)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/678486/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球