[离线计算-Spark|Hive] HDFS小文件处理

HDFS 小文件过多会对hadoop 扩展性以及稳定性造成影响, 因为要在namenode 上存储维护大量元信息.

大量的小文件也可能导致查询分析性能不佳,因为查询引擎在执行查询时需要打开/读取/关闭太多的文件。

[En]

A large number of small files can also lead to poor query analysis performance because the query engine needs to open / read / close too many files when executing the query.

小文件解决思路

通常能想到的方案就是通过Spark API 对文件目录下的小文件进行读取,然后通过Spark的算子repartition操作进行合并小文件,repartition 分区数通过输入文件的总大小和期望输出文件的大小通过预计算而得。

总体流程如下:

该方案适合针对已发现有小文件问题,然后对其进行处理. 下面介绍下hudi是如何实现在写入时实现对小文件的智能处理.

Hudi小文件处理

Hudi会自管理文件大小,避免向查询引擎暴露小文件,其中自动处理文件大小起很大作用

在进行insert/upsert操作时,Hudi可以将文件大小维护在一个指定文件大小

hudi 小文件处理流程:

每次写入都会遵循此过程,以确保Hudi表中没有小文件。

核心代码:

写入文件分配:

org.apache.hudi.table.action.commit.UpsertPartitioner#assignInserts

 //获取分区路径
 Set<string> partitionPaths = profile.getPartitionPaths();

 //&#x6839;&#x636E;&#x5148;&#x524D;&#x63D0;&#x4EA4;&#x671F;&#x95F4;&#x5199;&#x5165;&#x7684;&#x8BB0;&#x5F55;&#x83B7;&#x53D6;&#x5E73;&#x5747;&#x8BB0;&#x5F55;&#x5927;&#x5C0F;&#x3002;&#x7528;&#x4E8E;&#x4F30;&#x8BA1;&#x6709;&#x591A;&#x5C11;&#x8BB0;&#x5F55;&#x6253;&#x5305;&#x5230;&#x4E00;&#x4E2A;&#x6587;&#x4EF6;&#x4E2D;&#x3002;
 long averageRecordSize = averageBytesPerRecord(table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(),config);

 LOG.info("AvgRecordSize => " + averageRecordSize);

 //&#x83B7;&#x53D6;&#x6BCF;&#x4E2A;&#x5206;&#x533A;&#x6587;&#x4EF6;&#x8DEF;&#x5F84;&#x4E0B;&#x5C0F;&#x6587;&#x4EF6;
 Map<string, list<smallfile>> partitionSmallFilesMap =
        getSmallFilesForPartitions(new ArrayList<string>(partitionPaths), jsc);

for (String partitionPath : partitionPaths) {
     ...

     List<smallfile> smallFiles = partitionSmallFilesMap.get(partitionPath);
    //&#x672A;&#x5206;&#x914D;&#x7684;&#x5199;&#x5165;&#x8BB0;&#x5F55;
    long totalUnassignedInserts = pStat.getNumInserts();

    ...

    for (SmallFile smallFile : smallFiles) {
      //hoodie.parquet.max.file.size &#x6570;&#x636E;&#x6587;&#x4EF6;&#x6700;&#x5927;&#x5927;&#x5C0F;&#xFF0C;Hudi&#x5C06;&#x8BD5;&#x7740;&#x7EF4;&#x62A4;&#x6587;&#x4EF6;&#x5927;&#x5C0F;&#x5230;&#x8BE5;&#x6307;&#x5B9A;&#x503C;
      //&#x7B97;&#x51FA;&#x6570;&#x636E;&#x6587;&#x4EF6;&#x5927;&#x5C0F; - &#x5C0F;&#x6587;&#x4EF6; &#x5C31;&#x662F;&#x5269;&#x4F59;&#x53EF;&#x4EE5;&#x5199;&#x5165;&#x6587;&#x4EF6;&#x5927;&#x5C0F;&#xFF0C; &#x9664;&#x4EE5;&#x5E73;&#x5747;&#x8BB0;&#x5F55;&#x5927;&#x5C0F;&#x5C31;&#x662F;&#x63D2;&#x5165;&#x7684;&#x8BB0;&#x5F55;&#x884C;&#x6570;
      long recordsToAppend = Math.min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize, totalUnassignedInserts);

        //&#x5206;&#x914D;&#x8BB0;&#x5F55;&#x5230;&#x5C0F;&#x6587;&#x4EF6;&#x4E2D;
        if (recordsToAppend > 0 && totalUnassignedInserts > 0) {
            // create a new bucket or re-use an existing bucket
            int bucket;
            if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) {
              bucket = updateLocationToBucket.get(smallFile.location.getFileId());
              LOG.info("Assigning " + recordsToAppend + " inserts to existing update bucket " + bucket);
            } else {
              bucket = addUpdateBucket(partitionPath, smallFile.location.getFileId());
              LOG.info("Assigning " + recordsToAppend + " inserts to new update bucket " + bucket);
            }
            bucketNumbers.add(bucket);
            recordsPerBucket.add(recordsToAppend);
            //&#x51CF;&#x53BB;&#x5DF2;&#x7ECF;&#x5206;&#x914D;&#x7684;&#x8BB0;&#x5F55;&#x6570;
            totalUnassignedInserts -= recordsToAppend;
          }

        //&#x5982;&#x679C;&#x8BB0;&#x5F55;&#x6CA1;&#x6709;&#x5206;&#x914D;&#x5B8C;
        if (totalUnassignedInserts > 0) {
            //hoodie.copyonwrite.insert.split.size &#x6BCF;&#x4E2A;&#x5206;&#x533A;&#x6761;&#x6570;
            long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize();
            //&#x662F;&#x5426;&#x81EA;&#x52A8;&#x8BA1;&#x7B97;&#x6BCF;&#x4E2A;&#x5206;&#x533A;&#x6761;&#x6570;
            if (config.shouldAutoTuneInsertSplits()) {
                insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize;
            }

           //&#x8BA1;&#x7B97;&#x8981;&#x521B;&#x5EFA;&#x7684;bucket
           int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket);

          ...

          for (int b = 0; b < insertBuckets; b++) {
            bucketNumbers.add(totalBuckets);
            if (b == insertBuckets - 1) {
              //&#x9488;&#x5BF9;&#x6700;&#x540E;&#x4E00;&#x4E2A;buket&#x5904;&#x7406;&#xFF0C;&#x5C31;&#x662F;&#x5199;&#x5B8C;&#x5269;&#x4E0B;&#x7684;&#x8BB0;&#x5F55;
              recordsPerBucket.add(totalUnassignedInserts - (insertBuckets - 1) * insertRecordsPerBucket);
            } else {
              recordsPerBucket.add(insertRecordsPerBucket);
            }
            BucketInfo bucketInfo = new BucketInfo();
            bucketInfo.bucketType = BucketType.INSERT;
            bucketInfo.partitionPath = partitionPath;
            bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
            bucketInfoMap.put(totalBuckets, bucketInfo);
            totalBuckets++;
          }

        }

    }

}

</smallfile></string></string,></string>
 if (!commitTimeline.empty()) { // if we have some commits
      HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
      List<hoodiebasefile> allFiles = table.getBaseFileOnlyView()
          .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());

      for (HoodieBaseFile file : allFiles) {

        //&#x83B7;&#x53D6;&#x5C0F;&#x4E8E; hoodie.parquet.small.file.limit &#x53C2;&#x6570;&#x503C;&#x5C31;&#x4E3A;&#x5C0F;&#x6587;&#x4EF6;
        if (file.getFileSize() < config.getParquetSmallFileLimit()) {
          String filename = file.getFileName();
          SmallFile sf = new SmallFile();
          sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
          sf.sizeBytes = file.getFileSize();
          smallFileLocations.add(sf);
        }
      }
    }

</hoodiebasefile>

UpsertPartitioner继承spark的Partitioner, hudi在写入的时候会利用spark 自定分区的机制优化记录分配到不同文件的能力, 从而达到在写入时不断优化解决小文件问题.

涉及到的关键配置:

  • hoodie.parquet.max.file.size:数据文件最大大小,Hudi将试着维护文件大小到该指定值;
  • hoodie.parquet.small.file.limit:小于该大小的文件均被视为小文件;
  • hoodie.copyonwrite.insert.split.size:单文件中插入记录条数,此值应与单个文件中的记录数匹配(可以根据最大文件大小和每个记录大小来确定)

在hudi写入时候如何使用、配置参数?

在写入hudi的代码中 .option中配置上述参数大小,如下:

本文主要介绍小文件的处理方法思路,以及通过阅读源码和相关资料学习hudi 如何在写入时智能的处理小文件问题新思路.Hudi利用spark 自定义分区的机制优化记录分配到不同文件的能力,达到小文件的合并处理.

Original: https://www.cnblogs.com/bigdata1024/p/15828375.html
Author: chaplinthink
Title: [离线计算-Spark|Hive] HDFS小文件处理

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/522597/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球