Spark学习记录

SpringStrongGuo

  • *Hadoop与Spark

Hadoop主要解决,海量数据的存储和海量数据的分析计算。

Spark主要解决海量数据的分析计算。

  • Spark运行模式***

1)Local:运行在一台机器上。 测试用。

2)Standalone:是Spark自身的一个调度系统。 对集群性能要求非常高时用。国内很少使用。

3)Yarn:采用Hadoop的资源调度器。 国内大量使用。

4)Mesos:国内很少使用。

  • * Spark常用端口号

1)4040 spark-shell任务端口

2)7077 内部通讯端口。 类比Hadoop的8020/9000

3)8080 查看任务执行情况端口。 类比Hadoop的8088

4)18080 历史服务器。类比Hadoop的19888

注意:由于Spark只负责计算,所有并没有Hadoop中存储数据的端口50070

  • * Spark运行架构

组件:

(1)Driver

Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。

Driver 在 Spark 作业执行时主要负责:➢ 将用户程序转化为作业(job)➢ 在 Executor 之间调度任务(task)➢ 跟踪 Executor 的执行情况➢ 通过 UI 展示查询运行情况

实际上,我们无法准确地描述 Driver 的定义,因为在整个的编程过程中没有看到任何有关 Driver 的字眼。所以简单理解,所谓的 Driver 就是驱使整个应用运行起来的程序,也称之为 Driver 类。

(2)Executor

Spark Executor 是集群中工作节点(Worker)中的一个 JVM 进程,负责在 Spark 作业 中运行具体任务(Task),任务彼此之间相互独立。Spark 应用启动时,Executor 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有 Executor 节点发生了 故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点 上继续运行。

Executor 有两个核心功能:

➢ 负责运行组成 Spark 应用的任务,并将结果返回给驱动器进程

➢ 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存 式存储。

RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存 数据加速运算。

(3)Master & Worker

Spark 集群的独立部署环境中,不需要依赖其他的资源调度框架,自身就实现了资源调 度的功能,所以环境中还有其他两个核心组件:Master 和 Worker,这里的 Master 是一个进 程,主要负责资源的调度和分配,并进行集群的监控等职责,类似于 Yarn 环境中的 RM, 而 Worker 呢,也是进程,一个 Worker 运行在集群中的一台服务器上,由 Master 分配资源对 数据进行并行的处理和计算,类似于 Yarn 环境中 NM。

(4)ApplicationMaster

Hadoop 用户向 YARN 集群提交应用程序时,提交程序中应该包含 ApplicationMaster,用 于向资源调度器申请执行任务的资源容器 Container,运行用户自己的程序任务 job,监控整 个任务的执行,跟踪整个任务的状态,处理任务失败等异常情况。

说的简单点就是,ResourceManager(资源)和 Driver(计算)之间的解耦合靠的就是 ApplicationMaster

  • * Spark作业提交流程

Spark学习记录
  • * RDD五大属性

Spark学习记录

分区列表

RDD 数据结构中存在分区列表,用于执行任务时并行计算,是实现分布式计算的重要属性。

Spark学习记录

分区计算函数

Spark 在计算时,是使用分区函数对每一个分区进行计算

Spark学习记录

RDD 之间的依赖关系

RDD 是计算模型的封装,当需求中需要将多个计算模型进行组合时,就需要将多个 RDD 建 立依赖关系

Spark学习记录

分区器(可选)

当数据为 KV 类型数据时,可以通过设定分区器自定义数据的分区

Spark学习记录

首选位置(可选)

计算数据时,可以根据计算节点的状态选择不同的节点位置进行计算

Spark学习记录
  • * RDD编程-RDD的创建

1) 从集合(内存)中创建 RDD

//创建rdd,分区
    val rdd1 = sc.parallelize(List(1, 2, 3, 4))
    val rdd1 = sc.makeRDD(List(1, 2, 3, 4), 2)

从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法

//从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法
def makeRDD[T: ClassTag](
 seq: Seq[T],
 numSlices: Int = defaultParallelism): RDD[T] = withScope {
 parallelize(seq, numSlices)
}

2) 从外部存储(文件)创建 RDD

由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集, 比如 HDFS、HBase 等

val sparkConf =new SparkConf().setMaster("local[*]").setAppName("spark")
val sparkContext = new SparkContext(sparkConf)
val fileRDD: RDD[String] = sparkContext.textFile("input")
fileRDD.collect().foreach(println)
sparkContext.stop()
  • RDD编程-并行度与分区***

读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的 Spark 核心源码如下:

def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
 (0 until numSlices).iterator.map { i =>
 val start = ((i * length) / numSlices).toInt
 val end = (((i + 1) * length) / numSlices).toInt
 (start, end)
 }

读取文件数据时,数据是按照 Hadoop 文件读取的规则进行切片分区,而切片规则和数 据读取的规则有些差异,具体 Spark 核心源码如下

public InputSplit[] getSplits(JobConf job, int numSplits)
 throws IOException {
 long totalSize = 0; // compute total size
 for (FileStatus file: files) { // check we have valid files
 if (file.isDirectory()) {
 throw new IOException("Not a file: "+ file.getPath());
 }
 totalSize += file.getLen();
 }
 long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
 long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.

 FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

 ...

 for (FileStatus file: files) {

 ...

 if (isSplitable(fs, path)) {
 long blockSize = file.getBlockSize();
 long splitSize = computeSplitSize(goalSize, minSize, blockSize);
 ...

 }
 protected long computeSplitSize(long goalSize, long minSize,
 long blockSize) {
 return Math.max(minSize, Math.min(goalSize, blockSize));
 }
  • RDD编程-转换算子
  • **单value
    *


+ ** map与mapPartitions映射
*

map:将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换

mapPartitions:将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处 理,哪怕是过滤数据。

package com.hadoop100.sparkcore.rdd.transform

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author : GuoSpringStrong
 * @date : Created in 2022/3/1 10:34
 * @description :
 */
object RDDOperatorTransform01 {

  def main(args: Array[String]): Unit = {
    //准备环境
    val conf = new SparkConf().setMaster("local").setAppName("Operator")
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(List(1, 2, 3, 4))

    //map
    val value = rdd.map(x => x * 2)

    //mapPartitions,已分区为单位进行转换操作,会将整个分区的数据加载到内存进行引用,但处理完的数据不会立即释放掉存在对象的引用,
    //内存小数据量大时,会出现内存溢出
    val value1 = rdd.mapPartitions(
      iter => {
        println("=======")
        iter.map(_ * 2)
      }
    )

    value.collect().foreach(println)
    value1.collect().foreach(println)

    sc.stop()

  }

}


+
* **map 和 mapPartitions 的区别
*

➢ 数据处理角度

Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子 是以分区为单位进行批处理操作。

➢ 功能的角度

Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。 MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变, 所以可以增加或减少数据

➢ 性能的角度

Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处 理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能 不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用 map 操作。

+ ** **mapPartitionsWithIndex分区号***

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处 理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。

package com.hadoop100.sparkcore.rdd.transform

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author : GuoSpringStrong
 * @date : Created in 2022/3/1 11:02
 * @description :
 */
object RDDOperatorTransform04 {

  def main(args: Array[String]): Unit = {
    //准备环境
    val conf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(List(1, 2, 3, 4), 2)

    //mapPartitionsWithIndex,多一个分区号参数
    val value = rdd.mapPartitionsWithIndex(
      (index, iter) => {
        iter.map(
          num => (index, num)
        )
      }
    )

    value.collect().foreach(println)

    sc.stop()

  }

}
+ **flatMap扁平映射*

将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

package com.hadoop100.sparkcore.rdd.transform

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author : GuoSpringStrong
 * @date : Created in 2022/3/1 11:17
 * @description :
 */
object RDDOperatorTransform_flatMap {

  def main(args: Array[String]): Unit = {
    //准备环境
    val conf = new SparkConf().setMaster("local").setAppName("Operator")
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(
      List(List(1, 2), 3, List(4, 5))
    )

    val value = rdd.flatMap(
      x => {
        x match {
          case list: List[_] => list
          case data => List(data)
        }
      }
    )

    value.collect().foreach(println)

    sc.stop()

  }

}
+ ** **glom同一类型***

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

package com.hadoop100.sparkcore.rdd.transform

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author : GuoSpringStrong
 * @date : Created in 2022/3/1 11:35
 * @description :
 */
object RDDOperatorTransform_glom {
  def main(args: Array[String]): Unit = {
    //准备环境
    val conf = new SparkConf().setMaster("local").setAppName("Operator")
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(
      List(1, 2, 3, 4, 5), 2
    )

    val glomrdd = rdd.glom()
    val value: RDD[Int] = glomrdd.map(
      array => {
        array.max
      }
    )

    println(value.collect().sum)

    sc.stop()
  }

}
+ ** **groupBy分组***

将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样 的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中

package com.hadoop100.sparkcore.rdd.transformimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.rdd.RDD/** * @author : GuoSpringStrong * @date : Created in 2022/3/1 14:02 * @description :  */object RDDOperatorTransform_groupBy {  def main(args: Array[String]): Unit = {    //准备环境val conf = new SparkConf().setMaster("local").setAppName("Operator")    val sc = new SparkContext(conf)    val rdd = sc.parallelize(      List(1, 2, 3, 4, 5), 2)    val rdd1 = sc.textFile("datas/apache.log")    //groupBy将每一个数据进行分组判断,根据返回的分组key进行分组,相同key分到一组val value = rdd.groupBy(      _ % 2)    val value1 = rdd1.map(      line => {        val date = line.split("")        val time = date(3).split(":")        (time(1), 1)      }    ).groupBy(_._1)    value1.map {      case (hour, iter) => {        (hour, iter.size)      }    }.collect().foreach(println)    value.collect().foreach(println)    sc.stop()  }}
+ **filter过滤*

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。 当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出 现数据倾斜。

package com.hadoop100.sparkcore.rdd.transform

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
 * @author : GuoSpringStrong
 * @date : Created in 2022/3/1 14:23
 * @description :
 */
object RDDOperatorTransform_filter {
  def main(args: Array[String]): Unit = {

    //准备环境
    val conf = new SparkConf().setMaster("local").setAppName("Operator")
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(
      List(1, 2, 3, 4, 5), 2
    )

    rdd.filter(
      _ % 2 == 0
    ).collect().foreach(println)

    val rdd1 = sc.textFile("datas/apache.log")

    rdd1.filter(
      line => {
        val data = line.split(" ")(3).split(":")(0)
        data == "17/05/2015"
      }
    ).collect().foreach(println)

    sc.stop()
  }

}
+ **sample随机抽取*

根据指定的规则从数据集中抽取数据

//第一个参数,抽取数据后是否放回//第二个参数,数据源中每个数据的中奖率//第三个参数,随机算法的种子
package com.hadoop100.sparkcore.rdd.transformimport org.apache.spark.{SparkConf, SparkContext}/** * @author : GuoSpringStrong * @date : Created in 2022/3/1 15:04 * @description :  */object RDDOperatorTransform_sample {  def main(args: Array[String]): Unit = {    //准备环境    val conf = new SparkConf().setMaster("local").setAppName("Operator")    val sc = new SparkContext(conf)    val rdd = sc.parallelize(      List(1, 2, 3, 4, 5, 1, 2, 3, 6, 7, 8, 9, 10), 2    )    //第一个参数,抽取数据后是否放回    //第二个参数,数据源中每个数据的中奖率    //第三个参数,随机算法的种子    rdd.sample(      true, 0.4    ).collect().foreach(println)    sc.stop()  }}
+ **distinct去重*

将数据集中重复的数据去重

package com.hadoop100.sparkcore.rdd.transform

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author : GuoSpringStrong
 * @date : Created in 2022/3/1 15:23
 * @description :
 */
object RDDOperatorTransform_distinct {
  def main(args: Array[String]): Unit = {
    //准备环境
    val conf = new SparkConf().setMaster("local").setAppName("Operator")
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(
      List(1, 2, 3, 4, 5, 1, 2, 3, 6, 7, 8, 9, 10), 2
    )

    rdd.distinct().collect().foreach(println)

    sc.stop()

  }
+ **coalesce缩减分区*

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率

当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce 方法,收缩合并分区,减少 分区的个数,减小任务调度成本

package com.hadoop100.sparkcore.rdd.transform

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author : GuoSpringStrong
 * @date : Created in 2022/3/1 15:32
 * @description :
 */
object RDDOperatorTransform_coalesce {
  def main(args: Array[String]): Unit = {
    //准备环境
    val conf = new SparkConf().setMaster("local").setAppName("Operator")
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(
      List(1, 2, 3, 4, 5, 1, 2, 3, 6, 7, 8, 9, 10), 4
    )

    rdd.coalesce(1).collect().foreach(println)

    sc.stop()
  }}
+ **repartition扩大分区*

该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为 true。无论是将分区数多的 RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition 操作都可以完成,因为无论如何都会经 shuffle 过程。

扩大分区

+ **sortBy排序*

该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理 的结果进行排序,默认为升序排列。排序后新产生的 RDD 的分区数与原 RDD 的分区数一 致。中间存在 shuffle 的过程

package com.hadoop100.sparkcore.rdd.transform

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author : GuoSpringStrong
 * @date : Created in 2022/3/1 15:55
 * @description :
 */
object RDDOperatorTransform_sortBy {
  def main(args: Array[String]): Unit = {
    //准备环境
    val conf = new SparkConf().setMaster("local").setAppName("Operator")
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(
      List(10, 8, 3, 4, 5, 1, 2, 3, 6, 7, 8, 9, 1), 2
    )

    rdd.sortBy(
      n => n
    ).collect().foreach(println)

    sc.stop()
  }

}
+ **pipe外部脚本*

通过 shell 命令管道 RDD 的每个分区,例如 Perl 或 bash 脚本。 RDD 元素被写入进程的标准输入,输出到标准输出的行作为字符串的 RDD 返回。

package com.hadoop100.sparkcore.rdd.transform

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author : GuoSpringStrong
 * @date : Created in 2022/3/1 16:25
 * @description :
 */
object RDDOperatorTransform_pipe {

  def main(args: Array[String]): Unit = {
    //准备环境
    val conf = new SparkConf().setMaster("local").setAppName("Operator")
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(
      List(1, 2, 3, 4, 5, 1, 2, 3, 6, 7, 8, 9, 10), 1
    )

    rdd.pipe(
      "datas/pipe_shell.sh"
    ).collect().foreach(println)

    sc.stop()
  }

}
#!/bin/sh
echo "Running shell script"
while read LINE; do
   echo ${LINE}!

done
  • 双vlaue(两个数据源)
    • intersection交集***

对源 RDD 和参数 RDD 求交集后返回一个新的 RDD

+ ** **union并集***

对源 RDD 和参数 RDD 求并集后返回一个新的 RDD

+ ** **subtract差集

以一个 RDD 元素为主,去除两个 RDD 中重复元素,将其他元素保留下来。求差集

+ ** **zip拉链***

将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个 RDD 中的元素,Value 为第 2 个 RDD 中的相同位置的元素。

package com.hadoop100.sparkcore.rdd.transform.twovalue

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author : GuoSpringStrong
 * @date : Created in 2022/3/1 16:41
 * @description :
 */
object RDDOperatorTransform_intersection {
  def main(args: Array[String]): Unit = {
    //准备环境
    val conf = new SparkConf().setMaster("local").setAppName("Operator")
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(
      List(10, 8, 4, 5, 1, 2, 6, 7, 8, 1)
    )
    val rdd1 = sc.parallelize(
      List(3, 6, 7, 8, 9, 1)
    )

    //交集
    rdd.intersection(rdd1).collect().foreach(println)
    //并集
    rdd.union(rdd1).collect().foreach(println)
    //差集
    rdd.subtract(rdd1).collect().foreach(println)
    //拉链
    rdd.zip(rdd1).collect().foreach(println)

    sc.stop()
  }

}
  • Key-Value
    • partitionBy***

将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner

package com.hadoop100.sparkcore.rdd.transform.keyvalue

import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

/**
 * @author : GuoSpringStrong
 * @date : Created in 2022/3/2 8:27
 * @description :
 */
object RDDOperatorTransform_partitionBy {

  def main(args: Array[String]): Unit = {
    //准备环境
    val conf = new SparkConf().setMaster("local").setAppName("Operator")
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(
      Array((1, "aaa"), (2, "bbb"), (3, "ccc"), (1, "ddd")), 3
    ).partitionBy(
      //默认分区器
      new HashPartitioner(2)
    ).saveAsTextFile("output")

    sc.stop()
  }

}
+ ** **reduceByKey***

可以将数据按照相同的 Key 对 Value 进行聚合

package com.hadoop100.sparkcore.rdd.transform.keyvalue

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author : GuoSpringStrong
 * @date : Created in 2022/3/2 8:58
 * @description :
 */
object RDDOperatorTransform_wordCount {

  def main(args: Array[String]): Unit = {
    //环境连接
    val conf = new SparkConf().setMaster("local").setAppName("wordCount")
    val sc = new SparkContext(conf)

    val rdd = sc.textFile("datas/test2.txt")

    rdd.flatMap(
      _.split(" ")
    ).map(
      (_, 1)
    ).reduceByKey(
      _ + _
    ).collect().foreach(println)

    sc.stop()
  }
}
+ ** **groupByKey***

将数据源的数据根据 key 对 value 进行分组

*

+ * reduceByKey 和 groupByKey 的区别

从 shuffle 的角度

reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey 可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的 数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较 高。

从功能的角度:

reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚 合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那 么还是只能使用 groupByKey

+ ** **aggregateByKey***

将数据根据不同的规则进行分区内计算和分区间计算

package com.hadoop100.sparkcore.rdd.transform.keyvalue

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author : GuoSpringStrong
 * @date : Created in 2022/3/2 11:09
 * @description :
 */
object RDDOperatorTransform_aggregateByKey {
  def main(args: Array[String]): Unit = {
    //准备环境
    val conf = new SparkConf().setMaster("local").setAppName("Operator")
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(
      List(("aaa", 1), ("aaa", 2), ("aaa", 3), ("aaa", 4)), 2
    )

    //aggregateByKey(初始值,列表(分区内计算规则,分区间计算规则))
    rdd.aggregateByKey(0)(
      (x, y) => math.max(x, y),
      (x, y) => x + y
    ).collect().foreach(println)

    sc.stop()
  }

}
+ ** **foldByKey***

当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey

+ ** **combineByKey***

最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于 aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

+ ** **sortByKey****
+ ** **mapValues****
+ ** **join***

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的 (K,(V,W))的 RDD

package com.hadoop100.sparkcore.rdd.transform.keyvalue

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @author : GuoSpringStrong
 * @date : Created in 2022/3/2 16:22
 * @description :
 */
object RDDOperatorTransform_join {
  def main(args: Array[String]): Unit = {
    //准备环境
    val conf = new SparkConf().setMaster("local").setAppName("Operator")
    val sc = new SparkContext(conf)

    val rdd = sc.parallelize(
      List(("a", 1), ("a", 2), ("b", 3), ("c", 4)), 2
    )
    val rdd1 = sc.parallelize(
      List(("b", 9), ("a", 6), ("d", 3), ("e", 8)), 2
    )

    rdd.join(rdd1).collect().foreach(println)
rdd.cogroup(rdd1).collect().foreach(println)
sc.stop() } }
+ ** **cogroup***

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD

  • RDD编程-行动算子
  • reduce***

聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据

  • *collect

在驱动程序中,以数组 Array 的形式返回数据集的所有元素

  • *count

返回 RDD 中元素的个数

  • *first

返回 RDD 中的第一个元素

  • *take

返回一个由 RDD 的前 n 个元素组成的数组

  • *takeOrderd

返回该 RDD 排序后的前 n 个元素组成的数组

  • *aggregate

分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

  • *fold

折叠操作,aggregate 的简化版操作

  • *countByKey

统计每种 key 的个数

  • *save

将数据保存到不同格式的文件中

  • *foreach

分布式遍历 RDD 中的每一个元素,调用指定函数

Original: https://www.cnblogs.com/springstrong/p/15949209.html
Author: SpringStrong
Title: Spark学习记录

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/565257/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球