Dependency 源码

1. 思考 : 依赖的本质是什么呢?作用是什么?

1. 在我们初始化RDD的时候, 需要指定 RDD的依赖关系
            abstract class RDD[T: ClassTag](
            @transient private var _sc: SparkContext,
            @transient private var deps: Seq[Dependency[_]]  => 指定依赖关系
            ) extends Serializable with Logging

       RDD的依赖 是一个Dependency的实现类, 用来表示 子RDD 和 父RDD 间的关系(分区与分区间对应关系)

2.class hierarchy

Dependency (org.apache.spark)                   // base class, 定义了rdd方法,表示 依赖的父RDD
    ShuffleDependency (org.apache.spark)        // 父RDD类型为 RDD[k,v]型, 涉及到了 根据key shuffleWrite和 和 shuffleRead
    NarrowDependency (org.apache.spark)         // 父RDD分区个数 必定 大于等于 子RDD个数, 涉及到了 分区的合并 或 1对1, 允许流水线执行
        PruneDependency (org.apache.spark.rdd)
        OneToOneDependency (org.apache.spark)   // 父RDD 分区个数 = 子RDD个数 分区个数  (1 对 1)
        RangeDependency (org.apache.spark)      // 父RDD 分区个数 >= 子RDD个数 分区个数 (多 对 1) 分区合并

3.构造器

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ // 依赖的 父RDD (必须为 key-value型RDD)
    val partitioner: Partitioner,                                     // 指定 分区器(shuffle write 时,根据key分区)
    val serializer: Serializer = SparkEnv.get.serializer,             // 指定 序列化方式
    val keyOrdering: Option[Ordering[K]] = None,                      // 指定 key 排序方式
    val aggregator: Option[Aggregator[K, V, C]] = None,               // 指定 合并规则
    val mapSideCombine: Boolean = false,                              // 指定 是否开启 map端合并,默认不开启
    val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor)   // 指定 ShuffleMapTask 中 shuffle write的处理器
  extends Dependency[Product2[K, V]] {

}

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

class RangeDependency[T]( rdd: RDD[T]       // 父RDD
                        , inStart: Int      // 范围开始
                        , outStart: Int     // 范围结束
                        , length: Int)      // 指定范围长度(分区数)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}

4.通过 RDD实现类 查看 依赖关系

案例1 HadoopRDD

//1.案例1 HadoopRDD
class HadoopRDD[K, V](
    sc: SparkContext,
    broadcastedConf: Broadcast[SerializableConfiguration],
    initLocalJobConfFuncOpt: Option[JobConf => Unit],
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int)
  extends RDD[(K, V)](sc, Nil)

说明 :
    1.实现类 HadoopRDD并没有重写 getDependencies方法,说明他使用父类Rdd的 getDependencies方法
    2.HadoopRDD 继承的是 下面这种否则方式,并且指定的依赖关系为 Nil
        //主构造器
        abstract class RDD[T: ClassTag](
            @transient private var _sc: SparkContext,
            @transient private var deps: Seq[Dependency[_]]
        )
    3.在查看父类RDD getDependencies方法定义,直接过去构造器中的deps
    protected def getDependencies: Seq[Dependency[_]] = deps
    4.综上所述 HadoopRDD 的依赖关系为 Nil

案例2 MapPartitionsRDD

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false,
    isFromBarrier: Boolean = false,
    isOrderSensitive: Boolean = false)
  extends RDD[U](prev)

说明 :
    1.实现类 MapPartitionsRDD并未重写 getDependencies方法,说明他使用父类Rdd的 getDependencies方法
    2.MapPartitionsRDD 继承的是 下面这种否则方式,并且指定的依赖关系为 Nil
    //辅助构造器
    def this(@transient oneParent: RDD[_]) =
        this(oneParent.context, List(new OneToOneDependency(oneParent)))
    3.综上所述  MapPartitionsRDD 的依赖关系为 OneToOneDependency

案例3 ShuffledRDD

class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient var prev: RDD[_ <: Product2[K, V]],
    part: Partitioner)
  extends RDD[(K, C)](prev.context, Nil)

说明 :
    1.实现类 ShuffledRDD 重写了 getDependencies方法
     override def getDependencies: Seq[Dependency[_]]
         List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
    2.综上所述 ShuffledRDD 的依赖关系为 ShuffleDependency

Original: https://www.cnblogs.com/bajiaotai/p/16693971.html
Author: 学而不思则罔!
Title: Dependency 源码

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/683779/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球