Flume自定义拦截器

要求:一个主题包含大量的表信息,需要根据HIVE字符串中的字段自动写入JSON中不同表对应的路径。[en]Requirements: a topic contains a lot of table information, which needs to be automatically written to the paths corresponding to different tables in the json according to the fields in the hive string.

发送给卡夫卡的最外层数据最初没有pkDay和项目,只有数据和名称。由于担心数据中的空值,Root同事讨论并要求他们将project和pkDay字段添加到最外层。[en]The outermost layer of the data sent to Kafka originally had no pkDay and project, only data and name. Worried about null values in data, root colleagues discussed and asked them to add project and pkDay fields to the outermost layer.

pkDay字段用于表的自动分区,proejct和name合起来用于自动拼接hive表的名称为 ods_project_name

我 这里判断了如果project为空时,直接丢到none里面,那么结果表名为 ods_none_name。

幸运的是,事先预测到他们会有一个空项目,当他们收到后,他们给了他们一些空项目。并要求他们修改数据。[en]Fortunately, it was predicted in advance that they would have an empty project, and when they received it, they gave them some empty project. And asked them to modify the data.

event 事件中可以获取到两个属性,一个是 event.getHeaders(),一个是event.getBody()。拿到数据后用data的值覆盖了body中的值。header中的值存进去pk_year,pk_month,pk_day,以及name和project。这个后面flume配置文件中使用。

注意一个细节:在解析json字符串时,您使用gson。碰巧在flume的lib目录中也有一个gson包,所以不需要添加额外的gson包。以前用Fastjson发现了一些坑。[en]Note one detail: when parsing json strings, you use gson. It just so happens that there is also a gson package in the lib directory of flume, so there is no need to add additional gson packages. Some pits were found using fastjson before.

编译后,将生成的JAR包放在Flume的lib目录中。[en]Once compiled, place the generated jar package in flume’s lib directory.

测试结果如下:[en]The test results are as follows:

标记红色部分:test.sinks.k1.hdfs.Path和test.sinks.k1.hdfs.filePrefix可以使用标题中的字段[en]<u>Mark the red part: test.sinks.k1.hdfs.path and test.sinks.k1.hdfs.filePrefix can use fields placed in header</u>
将蓝色部分标记为:test.Channel els.c1.CheckPointDir和test.Channel els.c1.dataDir不能使用Header中的字段。我们已经可以使用设置的红色部分根据字符串中的不同字段设置不同的HDFS存储路径。其实,这位官员有一个常规提取的拦截器,但常规的提取规则比较麻烦,担心重复多层嵌套可能会在重复字段时出错,对规则不熟悉。简而言之,它可以解决我的需求问题。[en]<u>Mark the blue part: test.channels.c1.checkpointDir and test.channels.c1.dataDirs cannot use fields placed in header. We can already use the red part of the setting to set different hdfs storage paths according to different fields in the string. In fact, the official has an interceptor for regular extraction, but the regular extraction rules are more troublesome, for fear that repeated multi-layer nesting may make mistakes when repeating fields, and I am not familiar with regularities. In short, it can solve the problem of my needs.</u>

Original: https://www.cnblogs.com/30go/p/16217225.html
Author: 硅谷工具人
Title: Flume自定义拦截器

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/6008/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

免费咨询
免费咨询
扫码关注
扫码关注
联系站长

站长Johngo!

大数据和算法重度研究者!

持续产出大数据、算法、LeetCode干货,以及业界好资源!

2022012703491714

微信来撩,免费咨询:xiaozhu_tec

分享本页
返回顶部