Spark Sql之count(distinct)分析&&学习&&验证

Spark Sql之count distinct

学习内容

  • spark sql count(distinct)
  • 数据膨胀
  • count(distinct)原理
  • grouping sets原理
  • count(distinct)优化

spark 对count(distinct)的优化

先说结论:spark sql和hive不一样,spark对count(distinct)做了group by优化

在hive中count().

hive往往只用一个 reduce 来处理全局聚合函数,最后导致数据倾斜;在不考虑其它因素的情况下,我们的优化方案是先 group by 再 count 。


select count(distinct id) from table_a

select
  count(id)
from
(
    select
        id
    from table_a group by id
) tmp

在使用spark sql 时,不用担心这个问题,因为 spark 对count distinct 做了优化:

explain
select
    count(distinct id),
    count(distinct name)
from table_a
== Physical Plan ==
*(3) HashAggregate(keys=[], functions=[count(if ((gid#147005 = 2)) table_a.id#147007 else null), count(if ((gid#147005 = 1)) table_a.name#147006 else null)])
+- Exchange SinglePartition
   +- *(2) HashAggregate(keys=[], functions=[partial_count(if ((gid#147005 = 2)) table_a.id#147007 else null), partial_count(if ((gid#147005 = 1)) table_a.name#147006 else null)])
      +- *(2) HashAggregate(keys=[table_a.name#147006, table_a.id#147007, gid#147005], functions=[])
         +- Exchange(coordinator id: 387101114) hashpartitioning(table_a.name#147006, table_a.id#147007, gid#147005, 4096), coordinator[target post-shuffle partition size: 67108864]
            +- *(1) HashAggregate(keys=[table_a.name#147006, table_a.id#147007, gid#147005], functions=[])
               +- *(1) Expand [List(name#146984, null, 1), List(null, id#146979, 2)], [table_a.name#147006, table_a.id#147007, gid#147005]
                  +- *(1) Project [id#146979, name#146984]
                     +- *(1) FileScan parquet table_a

数据膨胀原理

从上述执行计划可以看到,expand,那为什么为产生数据膨胀呐?

distinct算子在处理过程中是将distinct后的字段和group by字段共同作为key传入reduce,导致shuffle前map阶段没有预聚合,同时shuffle时网络传输数据量过大消耗增加,对reduce处理时负载也增大

distinct算子在处理过程中会将原有数据膨胀,有N个DISTINCT关键字数据就会在map端膨胀N倍,同时对shuffle和reduce的长尾影响(原因1)也会扩大N

Spark Sql之count(distinct)分析&&学习&&验证

expand 之后,再以id、name 为 key 进行HashAggregate 也就是 group by ,这样以来,就相当于去重了。后面直接计算count (id) 、 count(name) 就可以,把数据分而治之。 在一定程度上缓解了数据倾斜。

; distinct数据膨胀

 val sql:String =
    s"""
       |select
       |  count(distinct sha1),
       |  count(distinct task_id),
       |  count(distinct task_type)
       |from tmp
       |""".stripMargin

    val df2: DataFrame = session.sql(sql)
    df2.show()
    df2.explain(true)

Spark Sql之count(distinct)分析&&学习&&验证

grouping sets数据膨胀

    val sql1:String =
      s"""
         |select
         |  count(sha1),
         |  count(task_id),
         |  count(task_type)
         |from (
         |select sha1,task_id,task_type
         |from tmp
         |group by grouping sets(sha1, task_id, task_type)
         |)
         |""".stripMargin

    val df22: DataFrame = session.sql(sql1)
    df22.explain(true)
    df22.show()

Spark Sql之count(distinct)分析&&学习&&验证

开个坑

在spark sql里面小数据量的话,count(distinct)和gruop by的执行时间是差不多的,
但是我看到有篇文章介绍的是大数据量的distinct和group by的对比,说的是大数据量的话无法在内存里HashAggregate也就是group by,两者的执行时间的差距还是很大的。具体的还没测试。。。

distinct源码

def rewrite(a: Aggregate): Aggregate = {

    val aggExpressions = a.aggregateExpressions.flatMap { e =>
      e.collect {
        case ae: AggregateExpression => ae
      }
    }

    val distinctAggGroups = aggExpressions.filter(_.isDistinct).groupBy { e =>
        val unfoldableChildren = e.aggregateFunction.children.filter(!_.foldable).toSet
        if (unfoldableChildren.nonEmpty) {

          unfoldableChildren
        } else {
          e.aggregateFunction.children.take(1).toSet
        }
    }

    if (distinctAggGroups.size > 1) {

      val gid = AttributeReference("gid", IntegerType, nullable = false)()
      val groupByMap = a.groupingExpressions.collect {
        case ne: NamedExpression => ne -> ne.toAttribute
        case e => e -> AttributeReference(e.sql, e.dataType, e.nullable)()
      }
      val groupByAttrs = groupByMap.map(_._2)
      ....

      }

      val expand = Expand(
        regularAggProjection ++ distinctAggProjections,
        groupByAttrs ++ distinctAggChildAttrs ++ Seq(gid) ++ regularAggChildAttrMap.map(_._2),
        a.child)
        .....

  }

重点代码:
//todo 当有多个distinct聚合表达式时,进行expand
if (distinctAggGroups.size > 1) { expand }

spark sql grouping sets

grouping sets 、rollup 、cube 是用来处理多维分析的函数:

grouping sets:对分组集中指定的组表达式的每个子集执行group by,group by A,B grouping sets(A,B)就等价于 group by A union group by B,其中A和B也可以是一个集合,比如group by A,B,C grouping sets((A,B),(A,C))。

rollup:在指定表达式的每个层次级别创建分组集。group by A,B,C with rollup首先会对(A、B、C)进行group by,然后对(A、B)进行group by,然后是(A)进行group by,最后对全表进行group by操作。

cube : 为指定表达式集的每个可能组合创建分组集。首先会对(A、B、C)进行group by,然后依次是(A、B),(A、C),(A),(B、C),(B),©,最后对全表进行group by操作。

前文也说了,grouping sets也是利用expand的方式

优化思路

上文我们基本可以了解到了,是由于expand导致的慢,优化方向可以朝着减少distinct关键的出现的次数,减少数据膨胀方向入手

1、增加 expand的过程中partition 的数量

但是这样有一个弊端:同时启动太多task 会造成集群资源紧张,也会导致其它任务没有资源。并且数据是 逐日增加的,总体上不好控制。

2、缩减expand 的数据量

从sql结构上:
可以把计算的指标拆开,分两次计算,然后再 join。
总体的处理原则就是,让过滤掉的数据尽量的多,expand 时的数据尽量少:

参考

参考博客

Original: https://blog.csdn.net/Lzx116/article/details/126153664
Author: 南风知我意丿
Title: Spark Sql之count(distinct)分析&&学习&&验证

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/817742/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球