Spark快速上手(4)Spark核心编程-Spark分区器(Partitioner)@(RDD-K_V)

HashPartitioner分区原理是对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则余数+分区的个数,最后返回的值就是这个key所属的分区ID,当key为null值是返回0。
源码在org.apache.spark包下:
origin code:

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions
  // 根据键的值来判断在哪一个分区
  def getPartition(key: Any): Int = key match {
    case null => 0   // 键为null始终在0分区
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) // 键不为0,根据键的hashCode值和分区数进行计算
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

…………
}
// 底层实质:取模运算
def nonNegativeMod(x: Int, mod: Int): Int = {
   val rawMod = x % mod
   rawMod + (if (rawMod < 0) mod else 0)
}

HashPartitioner分区的实现可能会导致数据倾斜,极端情况下会导致某些分区拥有RDD的所有数据。而RangePartitioner分区器则尽量保证各个分区数据均匀,而且分区和分区之间是有序的,也就是说令一个分区中的元素均比另一个分区中的元素小或者大;但是分区内的元素是不能保证顺序的。简单地说就是将一定范围内的数据映射到一个分区内。
sortByKey底层使用的数据分区器就是RangePartitioner分区器,该分区器的实现方式主要通过两个步骤实现:
①先从整个RDD中抽取样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[key]类型的数组变量rangeBounds;
②判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标。该分区器要求RDD中的key类型必须是可排序的。
origin code:

class RangePartitioner[K : Ordering : ClassTag, V](
    partitions: Int,
    rdd: RDD[_ <: product2[k, v]], private var ascending: boolean="true," val samplepointsperpartitionhint: int="20)" extends partitioner { a constructor declared in order to maintain backward compatibility for java, when we add the 4th parameter samplepointsperpartitionhint. see spark-22160. this is added make sure from bytecode point of view, there still 3-arg ctor. def this(partitions: int, rdd: rdd[_ <: boolean)="{" this(partitions, rdd, ascending, samplepointsperpartitionhint="20)" } allow partitions="0," which happens sorting an empty rdd under default settings. require(partitions>= 0, s"Number of partitions cannot be negative but found $partitions.")
  require(samplePointsPerPartitionHint > 0,
    s"Sample points per partition must be greater than 0 but found $samplePointsPerPartitionHint")

  // &#x83B7;&#x53D6;RDD&#x4E2D;key&#x7C7B;&#x578B;&#x6570;&#x636E;&#x7684;&#x6392;&#x5E8F;&#x5668;
  private var ordering = implicitly[Ordering[K]]

  // An array of upper bounds for the first (partitions - 1) partitions
  private var rangeBounds: Array[K] = {
    if (partitions <= 1) { 如果给定的分区数是一个的情况下,直接返回一个空的集合,表示数据不进行分区 array.empty } else this is the sample size we need to have roughly balanced output partitions, capped at 1m. cast double avoid overflowing ints or longs 给定总的数据抽样大小,最多1m的数据量(10^6),最少20倍的rdd分区数量,也就是每个rdd分区至少抽取20条数据 val samplesize="math.min(samplePointsPerPartitionHint.toDouble" * 1e6) assume input partitions are and over-sample a little bit. 计算每个分区抽样的数据量大小,假设输入数据每个分区分布的比较均匀 对于超大数据集(分区数量超过5万的)乘以3会让数据稍微增大一点,对于分区数低于5万的数据集,每个分区抽取数据量为60条也不算多 samplesizeperpartition="math.ceil(3.0" rdd.partitions.length).toint 从rdd中抽取数据,返回值:(总rdd数据量,array[分区id, 当前分区的数据量, 当前分区抽取的数据]) (numitems, sketched)="RangePartitioner.sketch(rdd.map(_._1)," samplesizeperpartition) if (numitems="=" 0l) 如果总的数据量为0(rdd为空),那么直接返回一个空的数组 partition contains much more than average number of items, re-sample from it ensure that enough items collected partition. 计算总样本数量和总记录数的占比,占比最大为1.0 fraction="math.min(sampleSize" math.max(numitems, 1l), 1.0) 保存样本数据的集合buffer candidates="ArrayBuffer.empty[(K," float)] 保存数据分布不均衡的分区id(数据量超过fraction比率的分区) imbalancedpartitions="mutable.Set.empty[Int]" 计算抽取出来的样本数据 sketched.foreach case (idx, n, sample)>
          if (fraction * n > sampleSizePerPartition) {
            // &#x5982;&#x679C;fraction&#x4E58;&#x4EE5;&#x5F53;&#x524D;&#x5206;&#x533A;&#x4E2D;&#x7684;&#x6570;&#x636E;&#x91CF;&#x5927;&#x4E8E;&#x4E4B;&#x524D;&#x8BA1;&#x7B97;&#x7684;&#x6BCF;&#x4E2A;&#x5206;&#x533A;&#x7684;&#x62BD;&#x6837;&#x6570;&#x636E;&#x5927;&#x5C0F;&#xFF0C;&#x90A3;&#x4E48;&#x8868;&#x793A;&#x5F53;&#x524D;&#x5206;&#x533A;&#x62BD;&#x53D6;&#x7684;&#x6570;&#x636E;&#x592A;&#x5C11;&#x4E86;&#xFF0C;&#x8BE5;&#x5206;&#x533A;&#x6570;&#x636E;&#x5206;&#x5E03;&#x4E0D;&#x5747;&#x8861;&#xFF0C;&#x9700;&#x8981;&#x91CD;&#x65B0;&#x62BD;&#x53D6;
            imbalancedPartitions += idx
          } else {
            // &#x5F53;&#x524D;&#x5206;&#x533A;&#x4E0D;&#x5C5E;&#x4E8E;&#x6570;&#x636E;&#x5206;&#x5E03;&#x4E0D;&#x5747;&#x8861;&#x7684;&#x5206;&#x533A;&#xFF0C;&#x8BA1;&#x7B97;&#x5360;&#x6BD4;&#x6743;&#x91CD;&#xFF0C;&#x5E76;&#x6DFB;&#x52A0;&#x5230;candidates&#x96C6;&#x5408;&#x4E2D;
            // The weight is 1 over the sampling probability.

            val weight = (n.toDouble / sample.length).toFloat
            for (key <- sample) { candidates +="((key," weight)) } 对数据分布不均衡的rdd分区,重新进行数据抽样 if (imbalancedpartitions.nonempty) re-sample imbalanced partitions with the desired sampling probability. 获取数据分布不均衡的rdd分区,并构成rdd val partitionpruningrdd(rdd.map(_._1), imbalancedpartitions.contains) 随机种子 seed="byteswap32(-rdd.id" - 1) 利用rdd的sample抽样函数api进行数据抽样 resampled="imbalanced.sample(withReplacement" = false, fraction, seed).collect() weight="(1.0" fraction).tofloat ++="reSampled.map(x"> (x, weight))
        }
        // &#x5C06;&#x6700;&#x7EC8;&#x7684;&#x62BD;&#x6837;&#x6570;&#x636E;&#x8BA1;&#x7B97;&#x51FA;rangeBounds
        RangePartitioner.determineBounds(candidates, math.min(partitions, candidates.size))
      }
    }
  }

  // &#x4E0B;&#x4E00;&#x4E2A;RDD&#x7684;&#x5206;&#x533A;&#x6570;&#x91CF;&#x662F;rangeBounds&#x6570;&#x7EC4;&#x4E2D;&#x5143;&#x7D20;&#x6570;&#x91CF;+1&#x4E2A;
  def numPartitions: Int = rangeBounds.length + 1

  // &#x4E8C;&#x5206;&#x67E5;&#x627E;&#x5668;&#xFF0C;&#x5185;&#x90E8;&#x4F7F;&#x7528;Java&#x4E2D;&#x7684;Arrays&#x63D0;&#x4F9B;&#x7684;&#x4E8C;&#x5206;&#x67E5;&#x627E;&#x65B9;&#x6CD5;
  private var binarySearch: ((Array[K], K) => Int) = CollectionsUtils.makeBinarySearch[K]

  // &#x6839;&#x636E;RDD&#x7684;key&#x503C;&#x8FD4;&#x56DE;&#x5BF9;&#x5E94;&#x7684;&#x5206;&#x533A;id&#xFF0C;&#x4ECE;0&#x5F00;&#x59CB;
  def getPartition(key: Any): Int = {
    // &#x5F3A;&#x5236;&#x8F6C;&#x6362;key&#x7C7B;&#x578B;&#x4E3A;RDD&#x4E2D;&#x539F;&#x672C;&#x7684;&#x6570;&#x636E;&#x7C7B;&#x578B;
    val k = key.asInstanceOf[K]
    var partition = 0
    if (rangeBounds.length <= 128 128) { if we have less than partitions naive search 如果分区数据小于等于128个,那么直接本地循环寻找当前k所属的分区下标 while (partition < rangebounds.length && ordering.gt(k, rangebounds(partition))) partition +="1" } else determine which binary method to use only once. 如果分区数量大于128个,那么使用二分查找方法寻找对应k所属的下标 但是如果k在rangebounds中没有出现,实质上返回的是一个负数(范围)或者是一个超过rangebounds大小的数(最后一个分区,比所有的数据都大) k) binarysearch either returns the match location or -[insertion point]-1 0)> rangeBounds.length) {
        partition = rangeBounds.length
      }
    }
    // &#x6839;&#x636E;&#x6570;&#x636E;&#x6392;&#x5E8F;&#x662F;&#x5347;&#x5E8F;&#x8FD8;&#x662F;&#x964D;&#x5E8F;&#x8FDB;&#x884C;&#x6570;&#x636E;&#x7684;&#x6392;&#x5217;&#xFF0C;&#x9ED8;&#x8BA4;&#x4E3A;&#x5347;&#x5E8F;
    if (ascending) {
      partition
    } else {
      rangeBounds.length - partition
    }
  }

  override def equals(other: Any): Boolean = other match {
    case r: RangePartitioner[_, _] =>
      r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending
    case _ =>
      false
  }

  override def hashCode(): Int = {
    val prime = 31
    var result = 1
    var i = 0
    while (i < rangeBounds.length) {
      result = prime * result + rangeBounds(i).hashCode
      i += 1
    }
    result = prime * result + ascending.hashCode
    result
  }

  @throws(classOf[IOException])
  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
    val sfactory = SparkEnv.get.serializer
    sfactory match {
      case js: JavaSerializer => out.defaultWriteObject()
      case _ =>
        out.writeBoolean(ascending)
        out.writeObject(ordering)
        out.writeObject(binarySearch)

        val ser = sfactory.newInstance()
        Utils.serializeViaNestedStream(out, ser) { stream =>
          stream.writeObject(scala.reflect.classTag[Array[K]])
          stream.writeObject(rangeBounds)
        }
    }
  }

  @throws(classOf[IOException])
  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
    val sfactory = SparkEnv.get.serializer
    sfactory match {
      case js: JavaSerializer => in.defaultReadObject()
      case _ =>
        ascending = in.readBoolean()
        ordering = in.readObject().asInstanceOf[Ordering[K]]
        binarySearch = in.readObject().asInstanceOf[(Array[K], K) => Int]

        val ser = sfactory.newInstance()
        Utils.deserializeViaNestedStream(in, ser) { ds =>
          implicit val classTag = ds.readObject[ClassTag[Array[K]]]()
          rangeBounds = ds.readObject[Array[K]]()
        }
    }
  }
}
</=></-></=></:>

将一定范围内的数映射到某一个分区内,在实现中,分界(rangeBounds)算法用到了水塘抽样算法。RangePartitioner的重点在于构建rangeBounds数组对象,主要步骤是:

RangePartitioner的sketch函数的作用是对RDD中的数据按照需要的样本数据量进行数据抽取,主要调用SamplingUtils类的reservoirSampleAndCount方法对每个分区进行数据抽取,抽取后计算出整体所有分区的数据量大小;reserviorSampleAndCount方法的抽取方式是先从迭代器中获取样本数量个数据(顺序获取),然后对剩余的数据进行判断,替换之前的样本数据,最终达到数据抽样的效果。RangePartitioner的determineBounds函数的作用是根据样本数据记忆权重大小确定数据边界。

RangePartitioner的determineBounds函数的作用是根据样本数据记忆权重大小确定数据边界,源代码如下:
origin code:

/**
   * Determines the bounds for range partitioning from candidates with weights indicating how many
   * items each represents. Usually this is 1 over the probability used to sample this candidate.

   *
   * @param candidates unordered candidates with weights
   * @param partitions number of partitions
   * @return selected bounds
   */
  def determineBounds[K : Ordering : ClassTag](
      candidates: ArrayBuffer[(K, Float)],
      partitions: Int): Array[K] = {
    val ordering = implicitly[Ordering[K]]
    // &#x6309;&#x7167;&#x6570;&#x636E;&#x8FDB;&#x884C;&#x6392;&#x5E8F;&#xFF0C;&#x9ED8;&#x8BA4;&#x5347;&#x5E8F;&#x6392;&#x5E8F;
    val ordered = candidates.sortBy(_._1)
    // &#x83B7;&#x53D6;&#x603B;&#x7684;&#x6837;&#x672C;&#x6570;&#x636E;&#x5927;&#x5C0F;
    val numCandidates = ordered.size
    // &#x8BA1;&#x7B97;&#x603B;&#x7684;&#x6743;&#x91CD;&#x5927;&#x5C0F;
    val sumWeights = ordered.map(_._2.toDouble).sum
    // &#x8BA1;&#x7B97;&#x6B65;&#x957F;
    val step = sumWeights / partitions
    var cumWeight = 0.0
    var target = step
    val bounds = ArrayBuffer.empty[K]
    var i = 0
    var j = 0
    var previousBound = Option.empty[K]
    while ((i < numCandidates) && (j < partitions - 1)) {
      // &#x83B7;&#x53D6;&#x6392;&#x5E8F;&#x540E;&#x7684;&#x7B2C;i&#x4E2A;&#x6570;&#x636E;&#x53CA;&#x6743;&#x91CD;
      val (key, weight) = ordered(i)
      // &#x7D2F;&#x8BA1;&#x6743;&#x91CD;
      cumWeight += weight
      if (cumWeight >= target) {
        // Skip duplicate values.

        // &#x6743;&#x91CD;&#x5DF2;&#x7ECF;&#x8FBE;&#x5230;&#x4E00;&#x4E2A;&#x6B65;&#x957F;&#x7684;&#x8303;&#x56F4;&#xFF0C;&#x8BA1;&#x7B97;&#x51FA;&#x4E00;&#x4E2A;&#x5206;&#x533A;id&#x7684;&#x503C;
        if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {// &#x4E0A;&#x4E00;&#x4E2A;&#x8FB9;&#x754C;&#x503C;&#x4E3A;&#x7A7A;&#xFF0C;&#x6216;&#x8005;&#x5F53;&#x524D;&#x8FB9;&#x754C;&#x503C;key&#x6570;&#x636E;&#x5927;&#x4E8E;&#x4E0A;&#x4E00;&#x4E2A;&#x8FB9;&#x754C;&#x7684;&#x503C;&#xFF0C;&#x90A3;&#x4E48;&#x5F53;&#x524D;key&#x6709;&#x6548;&#xFF0C;&#x8FDB;&#x884C;&#x8BA1;&#x7B97;
          // &#x6DFB;&#x52A0;&#x5F53;&#x524D;key&#x5230;&#x8FB9;&#x754C;&#x96C6;&#x5408;&#x4E2D;
          bounds += key
          // &#x7D2F;&#x8BA1;target&#x6B65;&#x957F;&#x754C;&#x9650;
          target += step
          // &#x5206;&#x533A;&#x6570;&#x91CF;&#x52A0;1
          j += 1
          // &#x4E0A;&#x4E00;&#x4E2A;&#x8FB9;&#x754C;&#x7684;&#x503C;&#x91CD;&#x7F6E;&#x4E3A;&#x5F53;&#x524D;&#x8FB9;&#x754C;&#x7684;&#x503C;
          previousBound = Some(key)
        }
      }
      i += 1
    }
    // &#x8FD4;&#x56DE;&#x7ED3;&#x679C;
    bounds.toArray
  }

自定义分区器是需要继承org.apache.spark.Partitioner类并实现以下三个方法:

e.g.1

// CustomPartitioner
import org.apache.spark.Partitioner

/**
 * @param numPartition &#x5206;&#x533A;&#x6570;&#x91CF;
 */
class CustomPartitioner(numPartition: Int) extends Partitioner{
    // &#x8FD4;&#x56DE;&#x5206;&#x533A;&#x7684;&#x603B;&#x6570;
    override def numPartitions: Int = numPartition

    // &#x6839;&#x636E;&#x4F20;&#x5165;&#x7684; key &#x8FD4;&#x56DE;&#x5206;&#x533A;&#x7684;&#x7D22;&#x5F15;
    override def getPartition(key: Any): Int = {
        key.toString.toInt % numPartition
    }
}

// CustomPartitionerDemo
import com.work.util.SparkUtil
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

object CustomPartitionerDemo {
    def main(args: Array[String]): Unit = {
        val sc: SparkContext = SparkUtil.getSparkContext()
        println("=================== &#x539F;&#x59CB;&#x6570;&#x636E; =====================")
        // zipWithIndex &#x8BE5;&#x51FD;&#x6570;&#x5C06; RDD &#x4E2D;&#x7684;&#x5143;&#x7D20;&#x548C;&#x8FD9;&#x4E2A;&#x5143;&#x7D20;&#x5728; RDD &#x4E2D;&#x7684; ID&#xFF08;&#x7D22;&#x5F15;&#x53F7;&#xFF09;&#x7EC4;&#x5408;&#x6210;&#x952E;&#x503C;&#x5BF9;
        val data: RDD[(Int, Long)] = sc.parallelize(0 to 10, 1).zipWithIndex()
        println(data.collect().toBuffer)

        println("=================== &#x5206;&#x533A;&#x548C;&#x6570;&#x636E;&#x7EC4;&#x5408;&#x6210; Map =====================")
        val func: (Int, Iterator[(Int, Long)]) => Iterator[String] = (index: Int, iter: Iterator[(Int, Long)]) => {
            iter.map(x => "[partID:" + index + ", value:" + x + "]")
        }
        val array: Array[String] = data.mapPartitionsWithIndex(func).collect()
        for (i <- array) { println(i) } println("="==================" 自定义5个分区和数据组合成 map="====================")" val rdd1: rdd[(int, long)]="data.partitionBy(new" custompartitioner(5)) array1: array[string]="rdd1.mapPartitionsWithIndex(func).collect()" for (i <- array1) < code></->

e.g.2

// SubjectPartitioner
import org.apache.spark.Partitioner
import scala.collection.mutable

/**
 *
 * @param subjects &#x5B66;&#x79D1;&#x6570;&#x7EC4;
 */
class SubjectPartitioner(subjects: Array[String]) extends Partitioner {
    // &#x521B;&#x5EFA;&#x4E00;&#x4E2A; map &#x96C6;&#x5408;&#x7528;&#x6765;&#x5B58;&#x50A8;&#x5230;&#x5206;&#x533A;&#x53F7;&#x548C;&#x5B66;&#x79D1;
    val subject: mutable.HashMap[String, Int] = new mutable.HashMap[String, Int]()
    // &#x5B9A;&#x4E49;&#x4E00;&#x4E2A;&#x8BA1;&#x6570;&#x5668;&#xFF0C;&#x7528;&#x6765;&#x751F;&#x6210;&#x81EA;&#x5B9A;&#x4E49;&#x5206;&#x533A;&#x53F7;
    var i = 0
    for (s <- subjects) { 存储学科和分区 subject +="(s" -> i)
        // &#x5206;&#x533A;&#x81EA;&#x589E;
        i += 1
    }

    // &#x83B7;&#x53D6;&#x5206;&#x533A;&#x6570;
    override def numPartitions: Int = subjects.size

    // &#x83B7;&#x53D6;&#x5206;&#x533A;&#x53F7;&#xFF08;&#x5982;&#x679C;&#x4F20;&#x5165; key &#x4E0D;&#x5B58;&#x5728;&#xFF0C;&#x9ED8;&#x8BA4;&#x5C06;&#x6570;&#x636E;&#x5B58;&#x50A8;&#x5230; 0 &#x5206;&#x533A;&#xFF09;
    override def getPartition(key: Any): Int = subject.getOrElse(key.toString, 0)
}

// SubjectPartitionerDemo
import java.net.URL

import com.work.util.SparkUtil
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

object SubjectPartitionerDemo {
    def main(args: Array[String]): Unit = {
        // &#x83B7;&#x53D6;&#x4E0A;&#x4E0B;&#x6587;&#x5BF9;&#x8C61;
        val sc: SparkContext = SparkUtil.getSparkContext()
        val tuples: RDD[(String, Int)] = sc.textFile("src/main/data/project.txt").map(line => {
            val fields: Array[String] = line.split("\t")
            for (i <- fields) { println(i) } 取出 url val url: string="fields(1)" (url, 1) }) 将相同的 进行聚合,得到了各个学科的访问量 sumed: rdd[(string, int)]="tuples.reduceByKey(_" + _).cache() 从 中取出学科的字段,数据组成:学科,url,统计数量 subjectanduc: (string, int))]="sumed.map(tup" => {
            // &#x7528;&#x6237; url
            val url: String = tup._1
            // &#x7EDF;&#x8BA1;&#x7684;&#x8BBF;&#x95EE;&#x91CF;
            val count: Int = tup._2
            // &#x5B66;&#x79D1;
            val subject: String = new URL(url).getHost
            (subject, (url, count))
        })
        // &#x5C06;&#x6240;&#x6709;&#x5B66;&#x79D1;&#x53D6;&#x51FA;&#x6765;
        val subjects: Array[String] = subjectAndUC.keys.distinct.collect
        // &#x521B;&#x5EFA;&#x81EA;&#x5B9A;&#x4E49;&#x5206;&#x533A;&#x5668;&#x5BF9;&#x8C61;
        val partitioner: SubjectPartitioner = new SubjectPartitioner(subjects)
        // &#x5206;&#x533A;
        val partitioned: RDD[(String, (String, Int))] = subjectAndUC.partitionBy(partitioner)
        // &#x53D6; top3
        val result: RDD[(String, (String, Int))] = partitioned.mapPartitions(it => {
            val list: List[(String, (String, Int))] = it.toList
            val sorted: List[(String, (String, Int))] = list.sortBy(_._2._2).reverse
            val top3: List[(String, (String, Int))] = sorted.take(3)
            // &#x56E0;&#x4E3A;&#x65B9;&#x6CD5;&#x7684;&#x8FD4;&#x56DE;&#x503C;&#x9700;&#x8981;&#x4E00;&#x4E2A; iterator
            top3.iterator
        })
        // &#x5B58;&#x50A8;&#x6570;&#x636E;
        result.saveAsTextFile("src/main/data/out/")
        // &#x91CA;&#x653E;&#x8D44;&#x6E90;
        sc.stop()
    }
}
</-></->

Original: https://www.cnblogs.com/unknownshangke/p/16443710.html
Author: Unknown尚可
Title: Spark快速上手(4)Spark核心编程-Spark分区器(Partitioner)@(RDD-K_V)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/522252/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球