[离线计算-Spark|Hive] HDFS小文件处理

HDFS 小文件过多会对hadoop 扩展性以及稳定性造成影响, 因为要在namenode 上存储维护大量元信息.

大量的小文件也会导致很差的查询分析性能,因为查询引擎执行查询时需要进行太多次文件的打开/读取/关闭.

小文件解决思路

通常能想到的方案就是通过Spark API 对文件目录下的小文件进行读取,然后通过Spark的算子repartition操作进行合并小文件,repartition 分区数通过输入文件的总大小和期望输出文件的大小通过预计算而得。

总体流程如下:

该方案适合针对已发现有小文件问题,然后对其进行处理. 下面介绍下hudi是如何实现在写入时实现对小文件的智能处理.

Hudi小文件处理

Hudi会自管理文件大小,避免向查询引擎暴露小文件,其中自动处理文件大小起很大作用

在进行insert/upsert操作时,Hudi可以将文件大小维护在一个指定文件大小

hudi 小文件处理流程:

每次写入都会遵循此过程,以确保Hudi表中没有小文件。

核心代码:

写入文件分配:

org.apache.hudi.table.action.commit.UpsertPartitioner#assignInserts

 //获取分区路径
 Set<string> partitionPaths = profile.getPartitionPaths();

 //&#x6839;&#x636E;&#x5148;&#x524D;&#x63D0;&#x4EA4;&#x671F;&#x95F4;&#x5199;&#x5165;&#x7684;&#x8BB0;&#x5F55;&#x83B7;&#x53D6;&#x5E73;&#x5747;&#x8BB0;&#x5F55;&#x5927;&#x5C0F;&#x3002;&#x7528;&#x4E8E;&#x4F30;&#x8BA1;&#x6709;&#x591A;&#x5C11;&#x8BB0;&#x5F55;&#x6253;&#x5305;&#x5230;&#x4E00;&#x4E2A;&#x6587;&#x4EF6;&#x4E2D;&#x3002;
 long averageRecordSize = averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),config);

 LOG.info("AvgRecordSize => " + averageRecordSize);

 //&#x83B7;&#x53D6;&#x6BCF;&#x4E2A;&#x5206;&#x533A;&#x6587;&#x4EF6;&#x8DEF;&#x5F84;&#x4E0B;&#x5C0F;&#x6587;&#x4EF6;
 Map<string, list<smallfile>> partitionSmallFilesMap =
        getSmallFilesForPartitions(new ArrayList<string>(partitionPaths), jsc);

for (String partitionPath : partitionPaths) {
     ...

     List<smallfile> smallFiles = partitionSmallFilesMap.get(partitionPath);
    //&#x672A;&#x5206;&#x914D;&#x7684;&#x5199;&#x5165;&#x8BB0;&#x5F55;
    long totalUnassignedInserts = pStat.getNumInserts();

    ...

    for (SmallFile smallFile : smallFiles) {
      //hoodie.parquet.max.file.size &#x6570;&#x636E;&#x6587;&#x4EF6;&#x6700;&#x5927;&#x5927;&#x5C0F;&#xFF0C;Hudi&#x5C06;&#x8BD5;&#x7740;&#x7EF4;&#x62A4;&#x6587;&#x4EF6;&#x5927;&#x5C0F;&#x5230;&#x8BE5;&#x6307;&#x5B9A;&#x503C;
      //&#x7B97;&#x51FA;&#x6570;&#x636E;&#x6587;&#x4EF6;&#x5927;&#x5C0F; - &#x5C0F;&#x6587;&#x4EF6; &#x5C31;&#x662F;&#x5269;&#x4F59;&#x53EF;&#x4EE5;&#x5199;&#x5165;&#x6587;&#x4EF6;&#x5927;&#x5C0F;&#xFF0C; &#x9664;&#x4EE5;&#x5E73;&#x5747;&#x8BB0;&#x5F55;&#x5927;&#x5C0F;&#x5C31;&#x662F;&#x63D2;&#x5165;&#x7684;&#x8BB0;&#x5F55;&#x884C;&#x6570;
      long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize, totalUnassignedInserts);

        //&#x5206;&#x914D;&#x8BB0;&#x5F55;&#x5230;&#x5C0F;&#x6587;&#x4EF6;&#x4E2D;
        if (recordsToAppend > 0 && totalUnassignedInserts > 0) {
            // create a new bucket or re-use an existing bucket
            int bucket;
            if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
              bucket = updateLocationToBucket.get(smallFile.location.getFileId());
              LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket);
            } else {
              bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId());
              LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket);
            }
            bucketNumbers.add(bucket);
            recordsPerBucket.add(recordsToAppend);
            //&#x51CF;&#x53BB;&#x5DF2;&#x7ECF;&#x5206;&#x914D;&#x7684;&#x8BB0;&#x5F55;&#x6570;
            totalUnassignedInserts -= recordsToAppend;
          }

        //&#x5982;&#x679C;&#x8BB0;&#x5F55;&#x6CA1;&#x6709;&#x5206;&#x914D;&#x5B8C;
        if (totalUnassignedInserts > 0) {
            //hoodie.copyonwrite.insert.split.size &#x6BCF;&#x4E2A;&#x5206;&#x533A;&#x6761;&#x6570;
            long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize();
            //&#x662F;&#x5426;&#x81EA;&#x52A8;&#x8BA1;&#x7B97;&#x6BCF;&#x4E2A;&#x5206;&#x533A;&#x6761;&#x6570;
            if (config.shouldAutoTuneInsertSplits()) {
                insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize;
            }

           //&#x8BA1;&#x7B97;&#x8981;&#x521B;&#x5EFA;&#x7684;bucket
           int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket);

          ...

          for (int b = 0; b < insertBuckets; b++) {
            bucketNumbers.add(totalBuckets);
            if (b == insertBuckets - 1) {
              //&#x9488;&#x5BF9;&#x6700;&#x540E;&#x4E00;&#x4E2A;buket&#x5904;&#x7406;&#xFF0C;&#x5C31;&#x662F;&#x5199;&#x5B8C;&#x5269;&#x4E0B;&#x7684;&#x8BB0;&#x5F55;
              recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket);
            } else {
              recordsPerBucket.add(insertRecordsPerBucket);
            }
            BucketInfo bucketInfo = new BucketInfo();
            bucketInfo.bucketType = BucketType.INSERT;
            bucketInfo.partitionPath = partitionPath;
            bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
            bucketInfoMap.put(totalBuckets, bucketInfo);
            totalBuckets++;
          }

        }

    }

}

</smallfile></string></string,></string>
 if (!commitTimeline.empty()) { // if we have some commits
      HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
      List<hoodiebasefile> allFiles = table.getBaseFileOnlyView()
          .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());

      for (HoodieBaseFile file : allFiles) {

        //&#x83B7;&#x53D6;&#x5C0F;&#x4E8E; hoodie.parquet.small.file.limit &#x53C2;&#x6570;&#x503C;&#x5C31;&#x4E3A;&#x5C0F;&#x6587;&#x4EF6;
        if (file.getFileSize() < config.getParquetSmallFileLimit()) {
          String filename = file.getFileName();
          SmallFile sf = new SmallFile();
          sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
          sf.sizeBytes = file.getFileSize();
          smallFileLocations.add(sf);
        }
      }
    }

</hoodiebasefile>

UpsertPartitioner继承spark的Partitioner, hudi在写入的时候会利用spark 自定分区的机制优化记录分配到不同文件的能力, 从而达到在写入时不断优化解决小文件问题.

涉及到的关键配置:

  • hoodie.parquet.max.file.size:数据文件最大大小,Hudi将试着维护文件大小到该指定值;
  • hoodie.parquet.small.file.limit:小于该大小的文件均被视为小文件;
  • hoodie.copyonwrite.insert.split.size:单文件中插入记录条数,此值应与单个文件中的记录数匹配(可以根据最大文件大小和每个记录大小来确定)

在hudi写入时候如何使用、配置参数?

在写入hudi的代码中 .option中配置上述参数大小,如下:

本文主要介绍小文件的处理方法思路,以及通过阅读源码和相关资料学习hudi 如何在写入时智能的处理小文件问题新思路.Hudi利用spark 自定义分区的机制优化记录分配到不同文件的能力,达到小文件的合并处理.

Original: https://www.cnblogs.com/bigdata1024/p/15828375.html
Author: chaplinthink
Title: [离线计算-Spark|Hive] HDFS小文件处理

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/683367/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • crash命令 —— wr

    参考:https://crash-utility.github.io/help_pages/wr.html 用法: 需要具备对 /dev/mem设备节点的写权限。 写某个地址,默认…

    技术杂谈 2023年5月30日
    0109
  • 添加COM类型库ACTIVEX接口

    头文件 public:STDMETHOD(GetEsamVersion)(BSTR pVersion,int nRet); 在项目.idl文件里添加 interface ITool…

    技术杂谈 2023年7月11日
    070
  • webstorm中js文件被识别成txt类型

    问题描述: webstorm中index.js文件被识别成txt格式,如下图。 原因: webstorm中js文件被识别成txt文件,原因在于txt类型识别了以当前js文件名命名的…

    技术杂谈 2023年5月31日
    088
  • 两种树的直径求法

    两遍DFS 优点:方便记录直径的两端点。 缺点:无法除理带负权的树。 树形DP 优点:短,可以处理有负权的树。 缺点:不好记录端点,容易打错。 最初每条路要走两遍,修一条路就可使与…

    技术杂谈 2023年6月21日
    090
  • 使用react全家桶制作博客后台管理系统

    前面的话 笔者在做一个完整的博客上线项目,包括前台、后台、后端接口和服务器配置。本文将详细介绍使用react全家桶制作的博客后台管理系统 概述 该项目是基于react全家桶(Rea…

    技术杂谈 2023年5月31日
    0118
  • 2个函数宏技巧

    1.用宏调用对象函数 #define FOR_EACH_OBSERVER(ObserverType, observer_list, func) \ do{ \ CObserverL…

    技术杂谈 2023年5月31日
    082
  • SOLR查询匹配关键词的方式

    如果在字段后直接写关键词,solr对条件关键词分词后,各分词之间按照OR的关系进行匹配 如果查询条件关键词用半角双引号括叫起来,则分词间用AND关系进行匹配 尽管使用双引号括起来,…

    技术杂谈 2023年5月31日
    0104
  • idea 的Low Memory问题

    今天使用 idea 出现 Low MemoryThe IDE is running low on memory and this might affect performance….

    技术杂谈 2023年5月31日
    0128
  • php实现查询上传文件进度

    参考:http://www.ultramegatech.com/2010/10/create-an-upload-progress-bar-with-php-and-jquery/…

    技术杂谈 2023年6月1日
    0111
  • 深入剖析堆原理与堆排序

    堆的介绍 完全二叉树:完全二叉树是满二叉树去除最后N个节点之后得到的树((N \geq0, N \in N^*)) 大根堆:节点的父亲节点比自身节点大,比如根节点的值为(8),比其…

    技术杂谈 2023年7月23日
    082
  • 面试官:请说出4种不使用第三方变量交换两个变量值的方法

    哈喽,大家好,我是阿Q。前几天有个小伙伴去面试,被面试官的一个问题劝退了:请说出几种不使用第三方变量交换两个变量值的方法。 问题有点绕,好不容易缕清了面试官的问题,却发现答不上来。…

    技术杂谈 2023年7月11日
    081
  • U盘启动笔记本无法安装Win7问题和解决

    用”大白菜”工具制作启动U盘,从U盘启动后进入Win PE环境安装Win7,提示”安装win7系统安装程序无法创建新的系统分区,也无法定位现有系…

    技术杂谈 2023年5月31日
    0103
  • 安卓逆向从0到1学习总结

    PS:该文已经首发于公众号信安之路!!! 初识安卓逆向是在2019年的暑假,到现在也快一年了,这一年来有刚从web渗透转来的迷茫,有成功破解了第一个app的喜悦,也有通宵熬夜逆向的…

    技术杂谈 2023年7月11日
    082
  • 命名实体识别是什么

    命名实体识别(Named Entity Recoginition, NER)旨在将一串文本中的实体识别出来,并标注出它所指代的类型,比如人名、地名等等。具体地,根据MUC会议规定,…

    技术杂谈 2023年7月11日
    0172
  • 电商WMS介绍

    其他竞品 今天先到这儿,希望对云原生,技术领导力, 企业管理,系统架构设计与评估,团队管理, 项目管理, 产品管管,团队建设 有参考作用 , 您可能感兴趣的文章: 如有想了解更多软…

    技术杂谈 2023年6月1日
    096
  • Mysql重复数据查询置为空

    前两天产品有个需求,相同的商品因为价格不同而分开展示,但是明细还是算一条明细,具体区分展示出商品的价格和数量信息,其他重复的商品信息要置空。 需求并不难,用程序代码循环处理就可以了…

    技术杂谈 2023年7月11日
    092
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球