3: 读取kafka数据写如mysql

关于kafka的source部分请参考 上一篇: https://www.cnblogs.com/liufei1983/p/15801848.html

1: 首先下载两个和jdbc和mysql相关的jar包,注意版本,我的flink是1.13.1, 所以flink-connect-jdck_2.11也用1.13.1的版本,否则会报错误。

3: 读取kafka数据写如mysql

2: 在MYSQL里建立一个表:

;gutter:true;
--
sql-demo.cdn_access_statistic definition (这个在MYSQL里执行)</p> <p>CREATE TABLE <code>cdn_access_statistic</code> ( <code>province</code> varchar(100) DEFAULT NULL, <code>access_count</code> bigint DEFAULT NULL, <code>total_download</code> bigint DEFAULT NULL, <code>download_speed</code> bigint DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;</p> <pre><code> 在zeppelin里创建SINK job: 因为zeppeline是在docker运行,所以MYSQL的url的地址不能写localhost, 要写宿主机的IP ;gutter:true;
%flink.ssql

DROP table if exists cdn_access_statistic;

— Please create this mysql table first in your mysql instance. Flink won’t create mysql table for you.

CREATE TABLE cdn_access_statistic (
province VARCHAR,
access_count BIGINT,
total_download BIGINT,
download_speed DOUBLE
) WITH (
‘connector.type’ = ‘jdbc’,
‘connector.url’ = ‘jdbc:mysql://192.168.3.XXX:3306/sql-demo’,
‘connector.table’ = ‘cdn_access_statistic’,
‘connector.username’ = ‘sql-demo’,
‘connector.password’ = ‘demo-sql’,
‘connector.write.flush.interval’ = ‘1s’
)

3: 确定 kafak的source table和 mysql的sink table都创建了。

3: 读取kafka数据写如mysql

4: 从kafka消费数据,存储到mysql. 可以看到mysql 数据库里数据在变化

csharp;gutter:true; %flink.ssql</p> <p>insert into cdn_access_statistic select client_ip, request_time,request_time,request_time from cdn_access_log

Original: https://www.cnblogs.com/liufei1983/p/15802576.html
Author: 刘大飞
Title: 3: 读取kafka数据写如mysql

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/8315/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

最近整理资源【免费获取】:   👉 程序员最新必读书单  | 👏 互联网各方向面试题下载 | ✌️计算机核心资源汇总