hive数据倾斜处理

【自取】最近整理的,有需要可以领取学习:

什么是数据倾斜(Data Skew)?

[En]

What is data skew (Data Skew)?

数据倾斜是指在应该并行处理的数据集中,有一部分数据明显多于其他部分,使得这部分数据的处理速度成为整个数据集处理的瓶颈。

[En]

Data skew means that in the data set that should be processed in parallel, one part of the data is significantly more than other parts, so that the processing speed of this part of the data has become the bottleneck of the whole data set processing.

假设数据分布不均匀,某个key对应几十万条数据,其他key对应几百条或几十条数据,那么在处理数据的时候,大量相同的key会被分配(partition)到同一个分区里,造成 _”一个人累死,其他人闲死”_的情况,具体表现在:有些任务很快就处理完了,而有些任务则迟迟未能处理完,导致整体任务最终耗时过长甚至是无法完成。

数据倾斜可分为地图末端倾斜和减少末端倾斜。

[En]

The data tilt can be divided into map end tilt and reduce end tilt.

1.1 操作:

hive数据倾斜处理

1.2 原因:

1)、key分布不均匀

2)、业务数据本身的特性

3)、建表时考虑不周

4)、某些SQL语句本身就有数据倾斜

1.3 表现:

任务进度长期保持在99%(或100%)。查看任务监控页面,发现只有一小部分(1个或更多)Reduce子任务尚未完成。因为它处理的数据量与其他Reduces的数据量差别太大。

[En]

The task progress has been maintained at 99% (or 100%) for a long time. Check the task monitoring page and find that only a small number of (1 or more) reduce subtasks have not been completed. Because the amount of data it handles is too different from that of other reduce.

单次缩减中的记录数量与平均记录数量之间的差异太大,通常高达3倍或更多。最长的时间比平均时间长。

[En]

The difference between the number of records in a single reduce and the average number of records is too large, usually up to 3 times or more. The longest time is longer than the average time.

要真正了解数据倾斜,需要知道MapReduce的工作原理。(以下摘自:https://www.zhihu.com/question/27593027

举一个介绍字数统计的例子,它的映射阶段是形成形式(“aaa”,1),然后在约简阶段加值以获得“aaa”出现的次数。如果有100千兆字节的文本用于字数统计,其中80千兆字节都是AAA,剩下的20千兆字节是其他词,那么80千兆字节的数据将被添加到相同的约简中,剩余的20千兆字节将根据分配到不同约简的情况而被添加。这导致数据倾斜,临床反应是减到99%,然后等待80G减完。

[En]

To give an example of an introduction to word count, its map phase is to form the form (“aaa”, 1), and then value is added in the reduce phase to get the number of times “aaa” occurs. If there are 100 gigabytes of text for word count, of which 80 gigabytes are all “aaa” and the remaining 20 gigabytes are other words, then 80 gigabytes of data will be added to the same reduce, and the remaining 20 gigabytes will be added according to the situation assigned to different reduce. This causes the data to tilt, and the clinical reaction is that the reduce runs to 99%, and then waits for the 80g reduce to finish.

hive数据倾斜处理

2、数据倾斜的解决方案

2.1 参数调节:

hive.map.aggr=true

Map 端部分聚合,相当于Combiner

hive.groupby.skewindata=true

当数据倾斜时执行负载平衡,并且所选项目设置为True,并且生成的查询计划将具有两个MR作业。在第一个MR作业中,Map的输出结果集被随机分配给Reducer,每个Reduce进行部分聚合操作并输出结果。结果是,相同的Group by Key可以分配给不同的Reduce,从而达到负载均衡的目的。根据预处理后的数据结果,根据Group by Key将第二个MR作业分配给Reducer(这一过程确保了相同的Group by Key被分配给相同的Reduce),最终完成最终的聚合操作。

[En]

Load balancing is performed when the data is skewed, and the selected item is set to true, and the generated query plan will have two MR Job. In the first MR Job, the output result set of the Map is randomly distributed to the Reduce, and each Reduce does a partial aggregation operation and outputs the result. The result is that the same Group By Key may be distributed to different Reduce, thus achieving the purpose of load balancing. The second MR Job is distributed to the Reduce according to the Group By Key according to the preprocessed data results (this process ensures that the same Group By Key is distributed to the same Reduce), and finally completes the final aggregation operation.

2.2 SQL语句调节:

如何Join:

在驱动器表的选择上,选择连接键分布最均匀的表作为驱动器表。

[En]

With regard to the selection of the driver table, the table with the most uniform distribution of join key is selected as the driver table.

做好列裁剪和筛选操作,达到两表连接时数据量变得比较小的效果。

[En]

Do a good job of column clipping and filter operations, so as to achieve the effect that the amount of data becomes relatively small when the two tables do join.

当出现小文件过多,需要合并小文件。可以通过set hive.merge.mapfiles=true来解决。

大小表Join:

解决方案:小表在连接的左侧,大表在右侧,或者使用mapJoin将小表加载到内存中。然后映射较大的表。

[En]

Solution: the small table is on the left side of the join, the large table is on the right, or use mapjoin to load the small table into memory. Then map the larger tables.

大表Join大表

如果关联的字段需要连接,但数据为空,如果表1中的ID需要与表2中的ID关联,则REDUE WITH NULL值将落在节点上。

[En]

If the associated field needs to be join but the data is null, if the id in Table 1 needs to be associated with the id in Table 2, the reduce with null value will fall on a node.

解决方案1:滤除子查询中的空值,如果id为空,则不参与关联。

[En]

Solution 1: filter out null values in the subquery, and do not participate in the association if id is empty.

解决方案2:将随机密钥值分配给空值的用例(字符串+rand())

[En]

Solution 2: use case when to assign random key values to null values (string + rand ())

count (distinct)大量相同特殊值

如果数据量非常大,执行类型为SELECT的SQL时会出现数据倾斜的问题,从t group by a;中选择一个DISTINCT COUNT(DISTINCT B)。

[En]

If the amount of data is very large, the problem of data skew occurs when executing SQL of type select a distinct count (distinct b) from t group by a;.

解决方案:使用sum()…改为分组依据。例如,从按a分组的(从t个按部门b分组的部门b中选择一个部门b)中选择一个部门合计(1

[En]

Solution: use sum ()… group by instead. For example, select a department sum (1) from (select a department b from t group by a department b) group by a

如果还有其他计算,例如需要GROUP BY,可以首先单独处理值为空的记录,然后与其他计算结果合并。

[En]

If there are other calculations, such as the need for group by, you can first deal with the records with empty values separately, and then union with other calculation results.

实例:

场景:如日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和 用户表中的user_id 关联,会碰到数据倾斜的问题。

解决方法1: user_id为空的不参与关联

select * from log a
  join users b
  on a.user_id is not null
  and a.user_id = b.user_id
union all
select * from log a
  where a.user_id is null;

解决方案2:将空值分配给新的键值

[En]

Solution 2: assign null values to new key values

select *
  from log a
  left outer join users b
  on case when a.user_id is null then concat(‘hive’,rand() ) else a.user_id end = b.user_id;

结论:方法2比方法1更有效,不仅IO更少,而且任务更少。在解决方法1中,日志读取两次,作业数为2。解决方案2作业数为1。此优化适用于无效id(如-99、‘’、NULL等)导致的不对称问题。通过将空键改为一个字符串加一个随机数,将倾斜的数据分成不同的化简,解决了数据倾斜的问题。

[En]

Conclusion: method 2 is more efficient than method 1, not only with less io, but also with fewer tasks. In workaround 1, log is read twice, and jobs is 2. Solution 2 the number of job is 1. This optimization is suitable for skew problems caused by invalid id (such as-99,”, null, etc.). By changing the null key into a string plus a random number, the skewed data can be divided into different reduce to solve the data skew problem.

特殊情况特殊处理:

在业务逻辑优化效果不是很好的情况下,有时可以将倾斜的数据取出单独处理。最后,联合航空公司又回来了。

[En]

In cases where the effect of business logic optimization is not so good, sometimes the skewed data can be taken out and processed separately. Finally, union went back.

实例:

不同数据类型的关联会导致数据不对称:

[En]

The association of different data types results in data skew:

场景:用户表中user_id字段为int,log表中user_id字段既有string类型也有int类型。当按照user_id进行两个表的Join操作时,默认的Hash操作会按int型的id来进行分配,这样会导致所有string类型id的记录都分配到一个Reducer中。

解决方案:将数字类型转换为字符串类型

[En]

Solution: convert a number type to a string type

select * from users a
  left outer join logs b
  on a.usr_id = cast(b.user_id as string)

以下是另一种总结方式:

[En]

Here is another way to summarize:

数据倾斜的原因是:

[En]

The reasons for the skew of data:

1,map端:输入文件的大小不均匀

2,reduce端:key分布不均匀,导致partition不均匀

数据不对称的解决方案:

[En]

The solution to data skew:

1,当出现小文件过多时:合并小文件

可以通过set hive.merge.mapfiles=true来解决。

2,当group by分组的维度过少,每个维度的值过多时:调优参数

(1)设置在map阶段做部分聚合操作

hive.map.aggr=true

效率更高,但需要更多内存。

[En]

More efficient but requires more memory.

(2)设置数据倾斜时负载均衡

hive.groupby.skewindata=true

当该选项设置为TRUE时,生成的查询计划有两个MRJob。在第一个MRJob中,Map的输出结果集被随机分配给Reducer,每个Reducer执行部分聚合操作并输出结果。结果是相同的GroupBy密钥可以分配给不同的Reduce,从而达到负载均衡的目的。根据预处理后的数据结果,根据GroupBy Key将第二个MRJob分配给Reducer(这一过程确保将相同的GroupBy Key分配给相同的Reducer),最终完成最终的聚合操作。

[En]

When the option is set to true, the generated query plan has two MRJob. In the first MRJob, the output result set of the Map is randomly distributed to the Reduce, and each Reduce does a partial aggregation operation and outputs the result. The result is that the same GroupBy Key may be distributed to different Reduce, thus achieving the purpose of load balancing. The second MRJob is distributed to the Reduce according to the GroupBy Key according to the preprocessed data results (this process ensures that the same GroupBy Key is distributed to the same Reduce), and finally completes the final aggregation operation.

第(2)项起着至关重要的作用,它分为两个MapReduce.在混洗过程中,第一次在划分过程中随机标记密钥,使其分布在不同的归约上进行计算,但不能完成所有操作,因此需要对第二次MapReduce进行积分才能恢复到正常的混洗。由于第一时间改善了数据分布不均匀的问题,基本解决了数据倾斜问题。

[En]

Item (2) plays a crucial role, which is divided into two mapreduce. The first one marks key randomly during partition in the shuffle process, so that it is distributed on different reduce to calculate, but can not complete all operations, so the second mapreduce integration is needed to return to normal shuffle. Because the problem of uneven data distribution is improved in the first time, the problem of data skew is basically solved.

3,调节SQL语句

(1)关联字段带空值的两表Join时:把空值的key变成一个字符串加上随机数,这样就可以把倾斜的数据分到不同的reduce上,此外由于空值关联不起来,所以处理后并不影响最终结果。

(2)大小表Join时:使用map join让小表(1000条以下的记录条数) 先进内存,在map端完成reduce。

在 hive 中,能够在 HQL 语句中直接指定该次查询使用map join,具体做法是:在查询/子查询的SELECT关键字后面添加/+ MAPJOIN(tablelist) /,提示优化器转化为map join(早期的 Hive 版本的优化器是不能自动优化 map join 的)。

select /* +mapjoin(movies) */ a.title, b.rating from movies a join ratings b on a.movieid = b.movieid;

在 hive0.11 版本以后会自动开启 map join 优化,由两个参数控制:

set hive.auto.convert.join=true; //设置 MapJoin 优化自动开启
set hive.mapjoin.smalltable.filesize=25000000 //设置小表不超过多大时开启 mapjoin 优化

(3)大表Join大表时:把大表切分成小表,然后分别map join。

(4)count(distinct xx)时有大量相同的特殊值:用sum() group by的方式来替换count(distinct)完成计算。如:如select a,count(distinct b) from t group by a,用select a,sum(1) from (select a,b from t group by a,b) group by a替代。

(5)其他情况:如果倾斜的key数量比较少,那么将倾斜的数据单独拿出来处理,最后union回去;如果倾斜的key数量比较多,那么给key增加随机前/后缀,使得原来Key相同的数据变为Key不相同的数据,从而使倾斜的数据集分散到不同的任务中,在Join的另一侧数据中,将与倾斜Key对应的部分数据和随机前/后缀集作笛卡尔乘积,从而保证无论数据倾斜侧倾斜Key如何加前缀,都能与之正常Join。

Original: https://www.cnblogs.com/xiaojianblogs/p/14281483.html
Author: 潇湘灬隐者
Title: hive数据倾斜处理

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/6875/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

发表回复

登录后才能评论
免费咨询
免费咨询
扫码关注
扫码关注
联系站长

站长Johngo!

大数据和算法重度研究者!

持续产出大数据、算法、LeetCode干货,以及业界好资源!

2022012703491714

微信来撩,免费咨询:xiaozhu_tec

分享本页
返回顶部