kafka-connect-kudu-sink插件

kafka-connect-hive是基于 kafka-connect平台实现的 hive数据读取和写入插件,主要由 sourcesink两部分组成, source部分完成 hive表数据的读取任务, kafka-connect将这些数据写入到其他数据存储层中,比如 hiveES数据的流入。 sink部分完成向 hive表写数据的任务, kafka-connect将第三方数据源(如 MySQL)里的数据读取并写入到 hive表中。

环境准备

  • Apache Kafka 2.11-2.1.0
  • Confluent-5.1.0
  • Apache Hadoop 2.6.3
  • Apache Hive 1.2.1
  • Java 1.8

  • 支持 KCQL路由查询,允许将 kafka主题中的所有字段或部分字段写入 hive表中

  • 支持根据某一字段动态分区
  • 支持全量和增量同步数据,不支持部分更新

开始使用

1、启动 kafka

<span class="hljs-attr">cd <span class="hljs-string">kafka_2.11-2.1.0
</span></span>

2、启动 schema-registry

cd confluent-5.1.0
bin/schema-registry-<span class="hljs-keyword">start etc/<span class="hljs-keyword">schema-registry/<span class="hljs-keyword">schema-registry.properties &
</span></span></span>

schema-registry组件提供了 kafka topicschema管理功能,保存了 schema的各个演变版本,帮助我们解决新旧数据 schema兼容问题。这里我们使用 apache avro库来序列化 kafkakeyvalue,因此需要依赖 schema-registry组件, schema-registry使用默认的配置。

3、启动 kafka-connect

修改 confluent-5.1.0/etc/schema-registry目录下 connect-avro-distributed.properties文件的配置,修改后内容如下:

这里需要设置 plugin.path参数,该参数指定了 kafka-connect插件包的保存地址,必须得设置。

cd confluent-<span class="hljs-number">5.1.<span class="hljs-number">0
bin/<span class="hljs-keyword">connect-distributed etc/schema-registry/<span class="hljs-keyword">connect-avro-distributed.properties
</span></span></span></span>

1、在 hive服务器上使用 beeline执行如下命令:

2、使用 postman添加 kafka-connect-hive sink的配置到 kafka-connect

URL&#xFF1A;localhost:8083/connectors/

请求类型: POST

请求体如下:

{
    <span class="hljs-attr">"name": <span class="hljs-string">"hive-sink-example",
    <span class="hljs-attr">"config": {
        <span class="hljs-attr">"name": <span class="hljs-string">"hive-sink-example",
        <span class="hljs-attr">"connector.class": <span class="hljs-string">"com.landoop.streamreactor.connect.hive.sink.hiveSinkConnector",
        <span class="hljs-attr">"tasks.max": <span class="hljs-number">1,
        <span class="hljs-attr">"topics": <span class="hljs-string">"hive_sink_orc",
        <span class="hljs-attr">"connect.hive.kcql": <span class="hljs-string">"insert into cities_orc select * from hive_sink_orc AUTOCREATE PARTITIONBY state STOREAS ORC WITH_FLUSH_INTERVAL = 10 WITH_PARTITIONING = DYNAMIC",
        <span class="hljs-attr">"connect.hive.database.name": <span class="hljs-string">"hive_connect",
        <span class="hljs-attr">"connect.hive.hive.metastore": <span class="hljs-string">"thrift",
        <span class="hljs-attr">"connect.hive.hive.metastore.uris": <span class="hljs-string">"thrift://quickstart.cloudera:9083",
        <span class="hljs-attr">"connect.hive.fs.defaultFS": <span class="hljs-string">"hdfs://quickstart.cloudera:9001",
        <span class="hljs-attr">"connect.hive.error.policy": <span class="hljs-string">"NOOP",
        <span class="hljs-attr">"connect.progress.enabled": <span class="hljs-literal">true
    }
}
</span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span>

启动 kafka producer,写入测试数据, scala测试代码如下:

<span class="hljs-class"><span class="hljs-keyword">class <span class="hljs-title">AvroTest {

 </span></span></span>

4、使用 beeline查询 hive数据:

<span class="hljs-keyword">use hive_connect;
<span class="hljs-keyword">select * <span class="hljs-keyword">from cities_orc;
</span></span></span>

输出部分结果如下:

+------------------+------------------------+---------------------+-------------------+--+
<span class="hljs-params">| cities_orc.city  | cities_orc.population  <span class="hljs-params">| cities_orc.country  | cities_orc.state  <span class="hljs-params">|
+------------------+------------------------+---------------------+-------------------+--+
| Chicago          <span class="hljs-params">| 2705000                | USA                 <span class="hljs-params">| IL                |
<span class="hljs-params">| Chicago          | <span class="hljs-number">2705000                <span class="hljs-params">| USA                 | IL                <span class="hljs-params">|
| Chicago          <span class="hljs-params">| 2705000                | USA                 <span class="hljs-params">| IL                |
<span class="hljs-params">| Chicago          | <span class="hljs-number">2705000                <span class="hljs-params">| USA                 | IL                <span class="hljs-params">|
| Chicago          <span class="hljs-params">| 2705000                | USA                 <span class="hljs-params">| IL                |
<span class="hljs-params">| Chicago          | <span class="hljs-number">2705000                <span class="hljs-params">| USA                 | IL                <span class="hljs-params">|
| Chicago          <span class="hljs-params">| 2705000                | USA                 <span class="hljs-params">| IL                |
<span class="hljs-params">| Chicago          | <span class="hljs-number">2705000                <span class="hljs-params">| USA                 | IL                <span class="hljs-params">|
| Chicago          <span class="hljs-params">| 2705000                | USA                 <span class="hljs-params">| IL                |
<span class="hljs-params">| Chicago          | <span class="hljs-number">2705000                <span class="hljs-params">| USA                 | IL                <span class="hljs-params">|
| Chicago          <span class="hljs-params">| 2705000                | USA                 <span class="hljs-params">| IL                |
<span class="hljs-params">| Chicago          | <span class="hljs-number">2705000                <span class="hljs-params">| USA                 | IL                <span class="hljs-params">|
| Chicago          <span class="hljs-params">| 2705000                | USA                 <span class="hljs-params">| IL                |
<span class="hljs-params">| Chicago          | <span class="hljs-number">2705000                <span class="hljs-params">| USA                 | IL                <span class="hljs-params">|
| Chicago          <span class="hljs-params">| 2705000                | USA                 <span class="hljs-params">| IL                |
<span class="hljs-params">| Chicago          | <span class="hljs-number">2705000                <span class="hljs-params">| USA                 | IL                <span class="hljs-params">|
| Philadelphia     <span class="hljs-params">| 1568000                | USA                 <span class="hljs-params">| PA                |
<span class="hljs-params">| Philadelphia     | <span class="hljs-number">1568000                <span class="hljs-params">| USA                 | PA                <span class="hljs-params">|
| Philadelphia     <span class="hljs-params">| 1568000                | USA                 <span class="hljs-params">| PA                |
<span class="hljs-params">| Philadelphia     | <span class="hljs-number">1568000                <span class="hljs-params">| USA                 | PA                <span class="hljs-params">|
| Philadelphia     <span class="hljs-params">| 1568000                | USA                 <span class="hljs-params">| PA                |
<span class="hljs-params">| Philadelphia     | <span class="hljs-number">1568000                <span class="hljs-params">| USA                 | PA                <span class="hljs-params">|
| Philadelphia     <span class="hljs-params">| 1568000                | USA                 <span class="hljs-params">| PA                |
<span class="hljs-params">| Philadelphia     | <span class="hljs-number">1568000                <span class="hljs-params">| USA                 | PA                <span class="hljs-params">|
| Philadelphia     <span class="hljs-params">| 1568000                | USA                 <span class="hljs-params">| PA                |
<span class="hljs-params">| Philadelphia     | <span class="hljs-number">1568000                <span class="hljs-params">| USA                 | PA                <span class="hljs-params">|
</span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span>

配置说明

connect.hive.kcql中的配置项说明如下:

  • WITH_FLUSH_INTERVALlong类型,表示文件提交的时间间隔,单位是毫秒
  • WITH_FLUSH_SIZElong类型,表示执行提交操作之前,已提交到 HDFS的文件长度
  • WITH_FLUSH_COUNTlong类型,表示执行提交操作之前,未提交到 HDFS的记录数
  • WITH_SCHEMA_EVOLUTIONstring类型,默认值是 MATCH,表示 hive schemakafka topic recordschema的兼容策略, hive connector会使用该策略来添加或移除字段
  • WITH_TABLE_LOCATIONstring类型,表示 hive表在 HDFS中的存储位置,如果不指定的话,将使用 hive中默认的配置
  • WITH_OVERWRITEboolean类型,表示是否覆盖 hive表中已存在的记录,使用该策略时,会先删除已有的表,再新建
  • PARTITIONBYList<string></string>类型,保存分区字段。指定后,将从指定的列中获取分区字段的值
  • WITH_PARTITIONINGstring类型,默认值是 STRICT,表示分区创建方式。主要有 DYNAMICSTRICT两种方式。 DYNAMIC方式将根据 PARTITIONBY指定的分区字段创建分区, STRICT方式要求必须已经创建了所有分区
  • AUTOCREATEboolean类型,表示是否自动创建表

Kafka connect的配置项说明如下:

  • namestring类型,表示 connector的名称,在整个 kafka-connect集群中唯一
  • topicsstring类型,表示保存数据的 topic名称,必须与 KCQL语句中的 topic名称一致
  • tasks.maxint类型,默认值为1,表示 connector的任务数量
  • connector.classstring类型,表示 connector类的名称,值必须是 com.landoop.streamreactor.connect.hive.sink.HiveSinkConnector
  • connect.hive.kcqlstring类型,表示 kafka-connect查询语句
  • connect.hive.database.namestring类型,表示 hive数据库的名称
  • connect.hive.hive.metastorestring类型,表示连接 hive metastore所使用的网络协议
  • connect.hive.hive.metastore.urisstring类型,表示 hive metastore的连接地址
  • connect.hive.fs.defaultFSstring类型,表示 HDFS的地址

Original: https://www.cnblogs.com/dengbangpang/p/12987599.html
Author: 非洲羚羊
Title: kafka-connect-kudu-sink插件

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/574954/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • SpringCloud 学习总结

    SpringCloud 学习总结 学习回顾 1.Maven依赖管理 ​ 在微服务项目中,我们一般是先创建一个父项目模块对整个项目的依赖进行版本限定和依赖控制,子模块继承父模块后,不…

    Java 2023年6月8日
    091
  • Springboot 链接Oracle 配置

    1.在配置文件中配置Oracle配置 spring.datasource.url=jdbc:oracle:thin:@ip:端口:XE spring.datasource.user…

    Java 2023年5月30日
    061
  • Spring注解简析

    JAVA 元注解 @Documented @Inherited @Retention @Target @Repeatable @Native 在java.lang.annotati…

    Java 2023年6月13日
    088
  • Docker学习

    Docker简介 Docker是一个开源的应用容器引擎,是一个轻量级容器技术。Docker支持将软件编译成一个镜像,然后在镜像中各种软件做好配置,将镜像发布出去,其他使用者可以直接…

    Java 2023年6月5日
    0144
  • 为Windows Service 2019 使用 Docker

    引言最近收到领导通知,甲方需要将原来的服务器迁移到新的服务器。原来的服务器上安装了很多的服务,每次重启之后总是有很多的问题需要人工大量的进行干预。这次迁移的还是Windows服务器…

    Java 2023年6月15日
    075
  • 批量转换文件字符集

    操作步骤 先设置输入路径与输出路径 输入路径:需要被转换的文件路径 输出路径:转换后的文件储存路径 我没有写这个属性的交互操作,只是在第一行用字面量进行设置 如果输出路径的目录不存…

    Java 2023年6月15日
    079
  • Java基础随笔

    1.一些简单的dos命令: – d: 回车 盘符切换 – dir(directory):列出当前目录下的文件以及文件夹 – del:删除文件 – md:创建文件夹 – rd:删除文…

    Java 2023年6月5日
    077
  • nginx上传文件大小限制

    posted @2019-12-27 10:22 _小豪豪 阅读(1310 ) 评论() 编辑 Original: https://www.cnblogs.com/remember…

    Java 2023年5月30日
    088
  • (九)、SpringBoot整合Swagger2、Swagger3

    (九)、SpringBoot整合Swagger2、Swagger3 一、整合 Swagger2 1、maven 依赖: io.springfox springfox-swagger…

    Java 2023年5月29日
    0124
  • 浅谈Spring Data ElasticSearch

    Spring Data Spring Data 帮助我们避免了一些样板式代码,比如我们要定义一个接口,可以直接继承接口ElasticSearchRepository接口,这样Spr…

    Java 2023年5月30日
    069
  • String与StringBuilder相互转换以及获取字符串中第一个中文汉字

    String与StringBuilder相互转换 1. StringBuilder转为String StringBuilder sb = new StringBuilder(); …

    Java 2023年6月14日
    073
  • java调用python脚本,生成excel

    java: 1 /** 2 * 使用python创建excel并且输出 3 * @throws Exception 4 */ 5 public void pyExportExcel…

    Java 2023年6月16日
    072
  • 简单记录一次远古版本dubbo发生的PermGen space异常

    环境介绍: dubbo的版本是比较旧的版本, 肯定是小于2.5的, jdk版本是1.7, 默认使用的是HotSpot虚拟机 前提说明: dubbo版本应该就是最原始的2.x的版本,…

    Java 2023年6月6日
    090
  • IDEA开启并配置services窗口

    services窗口是一个管理所有服务的地方 开启方法:1.点击菜单栏:Views -> Tool Windows -> Services;或者使用快捷键Alt + F…

    Java 2023年6月7日
    087
  • JDBC概述

    JDBC概述 JDBC(Java DataBase Connectivity),它是一种用于执行 SQL语句的 JavaAPI。通过使用JDBC就可以使用 相同的API访问 不同的…

    Java 2023年6月9日
    074
  • 历时2月,动态线程池 DynamicTp 发布里程碑版本 V1.0.8

    关于 DynamicTp DynamicTp 是一个基于配置中心实现的轻量级动态线程池管理工具,主要功能可以总结为动态调参、通知报警、运行监控、三方包线程池管理等几大类。 经过多个…

    Java 2023年6月14日
    090
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球