1. 思考 : 依赖的本质是什么呢?作用是什么?
1. 在我们初始化RDD的时候, 需要指定 RDD的依赖关系
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]] => 指定依赖关系
) extends Serializable with Logging
RDD的依赖 是一个Dependency的实现类, 用来表示 子RDD 和 父RDD 间的关系(分区与分区间对应关系)
2.class hierarchy
Dependency (org.apache.spark) // base class, 定义了rdd方法,表示 依赖的父RDD
ShuffleDependency (org.apache.spark) // 父RDD类型为 RDD[k,v]型, 涉及到了 根据key shuffleWrite和 和 shuffleRead
NarrowDependency (org.apache.spark) // 父RDD分区个数 必定 大于等于 子RDD个数, 涉及到了 分区的合并 或 1对1, 允许流水线执行
PruneDependency (org.apache.spark.rdd)
OneToOneDependency (org.apache.spark) // 父RDD 分区个数 = 子RDD个数 分区个数 (1 对 1)
RangeDependency (org.apache.spark) // 父RDD 分区个数 >= 子RDD个数 分区个数 (多 对 1) 分区合并
3.构造器
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
@transient private val _rdd: RDD[_ // 依赖的 父RDD (必须为 key-value型RDD)
val partitioner: Partitioner, // 指定 分区器(shuffle write 时,根据key分区)
val serializer: Serializer = SparkEnv.get.serializer, // 指定 序列化方式
val keyOrdering: Option[Ordering[K]] = None, // 指定 key 排序方式
val aggregator: Option[Aggregator[K, V, C]] = None, // 指定 合并规则
val mapSideCombine: Boolean = false, // 指定 是否开启 map端合并,默认不开启
val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor) // 指定 ShuffleMapTask 中 shuffle write的处理器
extends Dependency[Product2[K, V]] {
}
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
class RangeDependency[T]( rdd: RDD[T] // 父RDD
, inStart: Int // 范围开始
, outStart: Int // 范围结束
, length: Int) // 指定范围长度(分区数)
extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = {
if (partitionId >= outStart && partitionId < outStart + length) {
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}
4.通过 RDD实现类 查看 依赖关系
案例1 HadoopRDD
//1.案例1 HadoopRDD
class HadoopRDD[K, V](
sc: SparkContext,
broadcastedConf: Broadcast[SerializableConfiguration],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int)
extends RDD[(K, V)](sc, Nil)
说明 :
1.实现类 HadoopRDD并没有重写 getDependencies方法,说明他使用父类Rdd的 getDependencies方法
2.HadoopRDD 继承的是 下面这种否则方式,并且指定的依赖关系为 Nil
//主构造器
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
)
3.在查看父类RDD getDependencies方法定义,直接过去构造器中的deps
protected def getDependencies: Seq[Dependency[_]] = deps
4.综上所述 HadoopRDD 的依赖关系为 Nil
案例2 MapPartitionsRDD
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false,
isFromBarrier: Boolean = false,
isOrderSensitive: Boolean = false)
extends RDD[U](prev)
说明 :
1.实现类 MapPartitionsRDD并未重写 getDependencies方法,说明他使用父类Rdd的 getDependencies方法
2.MapPartitionsRDD 继承的是 下面这种否则方式,并且指定的依赖关系为 Nil
//辅助构造器
def this(@transient oneParent: RDD[_]) =
this(oneParent.context, List(new OneToOneDependency(oneParent)))
3.综上所述 MapPartitionsRDD 的依赖关系为 OneToOneDependency
案例3 ShuffledRDD
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
@transient var prev: RDD[_ <: Product2[K, V]],
part: Partitioner)
extends RDD[(K, C)](prev.context, Nil)
说明 :
1.实现类 ShuffledRDD 重写了 getDependencies方法
override def getDependencies: Seq[Dependency[_]]
List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
2.综上所述 ShuffledRDD 的依赖关系为 ShuffleDependency
Original: https://www.cnblogs.com/bajiaotai/p/16693971.html
Author: 学而不思则罔!
Title: Dependency 源码
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/683779/
转载文章受原作者版权保护。转载请注明原作者出处!