kafka-connect-hive
是基于 kafka-connect
平台实现的 hive
数据读取和写入插件,主要由 source
、 sink
两部分组成, source
部分完成 hive
表数据的读取任务, kafka-connect
将这些数据写入到其他数据存储层中,比如 hive
到 ES
数据的流入。 sink
部分完成向 hive
表写数据的任务, kafka-connect
将第三方数据源(如 MySQL
)里的数据读取并写入到 hive
表中。
环境准备
- Apache Kafka 2.11-2.1.0
- Confluent-5.1.0
- Apache Hadoop 2.6.3
- Apache Hive 1.2.1
-
Java 1.8
-
支持
KCQL
路由查询,允许将kafka
主题中的所有字段或部分字段写入hive
表中 - 支持根据某一字段动态分区
- 支持全量和增量同步数据,不支持部分更新
开始使用
1、启动 kafka
:
<span class="hljs-attr">cd <span class="hljs-string">kafka_2.11-2.1.0
</span></span>
2、启动 schema-registry
:
cd confluent-5.1.0
bin/schema-registry-<span class="hljs-keyword">start etc/<span class="hljs-keyword">schema-registry/<span class="hljs-keyword">schema-registry.properties &
</span></span></span>
schema-registry
组件提供了 kafka topic
的 schema
管理功能,保存了 schema
的各个演变版本,帮助我们解决新旧数据 schema
兼容问题。这里我们使用 apache avro
库来序列化 kafka
的 key
和 value
,因此需要依赖 schema-registry
组件, schema-registry
使用默认的配置。
3、启动 kafka-connect
:
修改 confluent-5.1.0/etc/schema-registry
目录下 connect-avro-distributed.properties
文件的配置,修改后内容如下:
这里需要设置 plugin.path
参数,该参数指定了 kafka-connect
插件包的保存地址,必须得设置。
cd confluent-<span class="hljs-number">5.1.<span class="hljs-number">0
bin/<span class="hljs-keyword">connect-distributed etc/schema-registry/<span class="hljs-keyword">connect-avro-distributed.properties
</span></span></span></span>
1、在 hive
服务器上使用 beeline
执行如下命令:
2、使用 postman
添加 kafka-connect-hive sink
的配置到 kafka-connect
:
URL:localhost:8083/connectors/
请求类型: POST
请求体如下:
{
<span class="hljs-attr">"name": <span class="hljs-string">"hive-sink-example",
<span class="hljs-attr">"config": {
<span class="hljs-attr">"name": <span class="hljs-string">"hive-sink-example",
<span class="hljs-attr">"connector.class": <span class="hljs-string">"com.landoop.streamreactor.connect.hive.sink.hiveSinkConnector",
<span class="hljs-attr">"tasks.max": <span class="hljs-number">1,
<span class="hljs-attr">"topics": <span class="hljs-string">"hive_sink_orc",
<span class="hljs-attr">"connect.hive.kcql": <span class="hljs-string">"insert into cities_orc select * from hive_sink_orc AUTOCREATE PARTITIONBY state STOREAS ORC WITH_FLUSH_INTERVAL = 10 WITH_PARTITIONING = DYNAMIC",
<span class="hljs-attr">"connect.hive.database.name": <span class="hljs-string">"hive_connect",
<span class="hljs-attr">"connect.hive.hive.metastore": <span class="hljs-string">"thrift",
<span class="hljs-attr">"connect.hive.hive.metastore.uris": <span class="hljs-string">"thrift://quickstart.cloudera:9083",
<span class="hljs-attr">"connect.hive.fs.defaultFS": <span class="hljs-string">"hdfs://quickstart.cloudera:9001",
<span class="hljs-attr">"connect.hive.error.policy": <span class="hljs-string">"NOOP",
<span class="hljs-attr">"connect.progress.enabled": <span class="hljs-literal">true
}
}
</span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span>
启动 kafka producer
,写入测试数据, scala
测试代码如下:
<span class="hljs-class"><span class="hljs-keyword">class <span class="hljs-title">AvroTest {
</span></span></span>
4、使用 beeline
查询 hive
数据:
<span class="hljs-keyword">use hive_connect;
<span class="hljs-keyword">select * <span class="hljs-keyword">from cities_orc;
</span></span></span>
输出部分结果如下:
+------------------+------------------------+---------------------+-------------------+--+
<span class="hljs-params">| cities_orc.city | cities_orc.population <span class="hljs-params">| cities_orc.country | cities_orc.state <span class="hljs-params">|
+------------------+------------------------+---------------------+-------------------+--+
| Chicago <span class="hljs-params">| 2705000 | USA <span class="hljs-params">| IL |
<span class="hljs-params">| Chicago | <span class="hljs-number">2705000 <span class="hljs-params">| USA | IL <span class="hljs-params">|
| Chicago <span class="hljs-params">| 2705000 | USA <span class="hljs-params">| IL |
<span class="hljs-params">| Chicago | <span class="hljs-number">2705000 <span class="hljs-params">| USA | IL <span class="hljs-params">|
| Chicago <span class="hljs-params">| 2705000 | USA <span class="hljs-params">| IL |
<span class="hljs-params">| Chicago | <span class="hljs-number">2705000 <span class="hljs-params">| USA | IL <span class="hljs-params">|
| Chicago <span class="hljs-params">| 2705000 | USA <span class="hljs-params">| IL |
<span class="hljs-params">| Chicago | <span class="hljs-number">2705000 <span class="hljs-params">| USA | IL <span class="hljs-params">|
| Chicago <span class="hljs-params">| 2705000 | USA <span class="hljs-params">| IL |
<span class="hljs-params">| Chicago | <span class="hljs-number">2705000 <span class="hljs-params">| USA | IL <span class="hljs-params">|
| Chicago <span class="hljs-params">| 2705000 | USA <span class="hljs-params">| IL |
<span class="hljs-params">| Chicago | <span class="hljs-number">2705000 <span class="hljs-params">| USA | IL <span class="hljs-params">|
| Chicago <span class="hljs-params">| 2705000 | USA <span class="hljs-params">| IL |
<span class="hljs-params">| Chicago | <span class="hljs-number">2705000 <span class="hljs-params">| USA | IL <span class="hljs-params">|
| Chicago <span class="hljs-params">| 2705000 | USA <span class="hljs-params">| IL |
<span class="hljs-params">| Chicago | <span class="hljs-number">2705000 <span class="hljs-params">| USA | IL <span class="hljs-params">|
| Philadelphia <span class="hljs-params">| 1568000 | USA <span class="hljs-params">| PA |
<span class="hljs-params">| Philadelphia | <span class="hljs-number">1568000 <span class="hljs-params">| USA | PA <span class="hljs-params">|
| Philadelphia <span class="hljs-params">| 1568000 | USA <span class="hljs-params">| PA |
<span class="hljs-params">| Philadelphia | <span class="hljs-number">1568000 <span class="hljs-params">| USA | PA <span class="hljs-params">|
| Philadelphia <span class="hljs-params">| 1568000 | USA <span class="hljs-params">| PA |
<span class="hljs-params">| Philadelphia | <span class="hljs-number">1568000 <span class="hljs-params">| USA | PA <span class="hljs-params">|
| Philadelphia <span class="hljs-params">| 1568000 | USA <span class="hljs-params">| PA |
<span class="hljs-params">| Philadelphia | <span class="hljs-number">1568000 <span class="hljs-params">| USA | PA <span class="hljs-params">|
| Philadelphia <span class="hljs-params">| 1568000 | USA <span class="hljs-params">| PA |
<span class="hljs-params">| Philadelphia | <span class="hljs-number">1568000 <span class="hljs-params">| USA | PA <span class="hljs-params">|
</span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span>
配置说明
connect.hive.kcql
中的配置项说明如下:
WITH_FLUSH_INTERVAL
:long
类型,表示文件提交的时间间隔,单位是毫秒WITH_FLUSH_SIZE
:long
类型,表示执行提交操作之前,已提交到HDFS
的文件长度WITH_FLUSH_COUNT
:long
类型,表示执行提交操作之前,未提交到HDFS
的记录数WITH_SCHEMA_EVOLUTION
:string
类型,默认值是MATCH
,表示hive schema
和kafka topic record
的schema
的兼容策略,hive connector
会使用该策略来添加或移除字段WITH_TABLE_LOCATION
:string
类型,表示hive
表在HDFS
中的存储位置,如果不指定的话,将使用hive
中默认的配置WITH_OVERWRITE
:boolean
类型,表示是否覆盖hive
表中已存在的记录,使用该策略时,会先删除已有的表,再新建PARTITIONBY
:List<string></string>
类型,保存分区字段。指定后,将从指定的列中获取分区字段的值WITH_PARTITIONING
:string
类型,默认值是STRICT
,表示分区创建方式。主要有DYNAMIC
和STRICT
两种方式。DYNAMIC
方式将根据PARTITIONBY
指定的分区字段创建分区,STRICT
方式要求必须已经创建了所有分区AUTOCREATE
:boolean
类型,表示是否自动创建表
Kafka connect
的配置项说明如下:
name
:string
类型,表示connector
的名称,在整个kafka-connect
集群中唯一topics
:string
类型,表示保存数据的topic
名称,必须与KCQL
语句中的topic
名称一致tasks.max
:int
类型,默认值为1,表示connector
的任务数量connector.class
:string
类型,表示connector
类的名称,值必须是com.landoop.streamreactor.connect.hive.sink.HiveSinkConnector
connect.hive.kcql
:string
类型,表示kafka-connect
查询语句connect.hive.database.name
:string
类型,表示hive
数据库的名称connect.hive.hive.metastore
:string
类型,表示连接hive metastore
所使用的网络协议connect.hive.hive.metastore.uris
:string
类型,表示hive metastore
的连接地址connect.hive.fs.defaultFS
:string
类型,表示HDFS
的地址
Original: https://www.cnblogs.com/dengbangpang/p/12987599.html
Author: 非洲羚羊
Title: kafka-connect-kudu-sink插件
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/574954/
转载文章受原作者版权保护。转载请注明原作者出处!