Flink CDC 与Hudi整合

之前写过Flink CDC sink 到 Iceberg中,本篇主要实践如何CDC到hudi中.

什么是hudi?

Hudi is a rich platform to build streaming data lakes with incremental data pipelines
on a self-managing database layer, while being optimized for lake engines and regular batch processing.

hudi 主要解决什么问题?

  • HDFS的可伸缩性限制
  • 需要在Hadoop中更快地呈现数据
  • 没有直接支持对现有数据的更新和删除
  • 快速的ETL和建模
  • 要检索所有更新的记录,无论这些更新是添加到最近日期分区的新记录还是对旧数据的更新,Hudi都允许用户使用最后一个检查点时间戳。此过程不用执行扫描整个源表的查询

hudi的特性:

  • Upserts, Deletes with fast, pluggable indexing.

  • Incremental queries, Record level change streams

  • Transactions, Rollbacks, Concurrency Control.

  • SQL Read/Writes from Spark, Presto, Trino, Hive & more

  • Automatic file sizing, data clustering, compactions, cleaning.

  • Streaming ingestion, Built-in CDC sources & tools.

  • Built-in metadata tracking for scalable storage access.

  • Backwards compatible schema evolution and enforcement.

Flink: 1.13.1

Hudi: 0.10.1

使用本地环境, hadoop 使用之前虚拟机安装的环境

MySQL Docker 安装个镜像,主要用于模拟数据变更,产生binlog数据

dockerpull mysql:latest

docker run -itd--name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 mysql

进入容器,可以使用mysql连接验证:

dockerexec -it 07e946b1fa9a /bin/bash

mysql -uroot -p123456

创建MySQL表:

createtable users
(
    id bigint auto_increment primary key,
    name varchar(20) null,
    birthday timestamp defaultCURRENT_TIMESTAMP not null,
    ts timestamp defaultCURRENT_TIMESTAMP not null,
    sex int
);

pom.xml:


    4.0.0

    com.chaplinthink
    flink-hudi
    1.0-SNAPSHOT

        8
        8

            org.apache.hadoop
            hadoop-client
            3.2.1

            org.apache.hadoop
            hadoop-hdfs
            3.2.1

                    javax.servlet
                    servlet-api

            org.apache.hadoop
            hadoop-common
            3.2.1

            org.apache.flink
            flink-core
            1.13.1

            org.apache.flink
            flink-streaming-java_2.11
            1.13.1

            org.apache.flink
            flink-connector-jdbc_2.11
            1.13.1

            org.apache.flink
            flink-java
            1.13.1

            org.apache.flink
            flink-clients_2.11
            1.13.1

            org.apache.flink
            flink-table-api-java-bridge_2.11
            1.13.1

            org.apache.flink
            flink-table-common
            1.13.1

            org.apache.flink
            flink-table-planner_2.11
            1.13.1

            org.apache.flink
            flink-table-planner-blink_2.11
            1.13.1

            org.apache.flink
            flink-table-planner-blink_2.11
            1.13.1
            test-jar

            org.apache.flink
            flink-runtime-web_2.11
            1.13.1

            com.ververica

            flink-sql-connector-mysql-cdc

            2.2.0

            org.apache.hudi
            hudi-flink-bundle_2.11
            0.10.1

            mysql
            mysql-connector-java
            5.1.49

使用FlinkSQL 创建MySQL数据源表、Hudi目标表,通过
INSERT INTO hudi_users2 SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') FROM mysql_users 将数据写入hudi

核心代码:

        final EnvironmentSettings fsSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        environment.enableCheckpointing(3000);

        final StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment, fsSettings);
        tableEnvironment.getConfig().setSqlDialect(SqlDialect.DEFAULT);

        // 数据源表
        String sourceDDL = "CREATE TABLE mysql_users (\n" +
                "                             id BIGINT PRIMARY KEY NOT ENFORCED ,\n" +
                "                             name STRING,\n" +
                "                             birthday TIMESTAMP(3),\n" +

                "                             ts TIMESTAMP(3)\n" +
                ") WITH (\n" +
                "      'connector' = 'mysql-cdc',\n" +
                "      'hostname' = '192.168.70.3',\n" +
                "      'port' = '3306',  " +
                "      'username' = 'aa',\n" +
                "      'password' = 'aa',  " +
                "      'server-time-zone' = 'Asia/Shanghai'," +
                "      'database-name' = 'test',\n" +
                "      'table-name' = 'users'\n" +
                "      )";

        /**
         *  触发器策略是在完成五次提交后执行压缩
         */
        // 输出目标表
        String sinkDDL = "CREATE TABLE hudi_users2\n" +
                "(\n" +
                "    id BIGINT PRIMARY KEY NOT ENFORCED,\n" +
                "    name STRING,\n" +
                "    birthday TIMESTAMP(3),\n" +
                "    ts TIMESTAMP(3),\n" +
                "    partition VARCHAR(20)\n" +
                ") PARTITIONED BY (partition) WITH (\n" +
                "    'connector' = 'hudi',\n" +
                "    'table.type' = 'MERGE_ON_READ',\n" +
                "    'path' = 'hdfs://ip:8020/hudi/hudi_users2'\n " +
                ")";

        String transformSQL = "INSERT INTO hudi_users2 SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') FROM mysql_users\n";

        tableEnvironment.executeSql(sourceDDL);
        tableEnvironment.executeSql(sinkDDL);
        tableEnvironment.executeSql(transformSQL);

        environment.execute("mysql-to-hudi");

本地启动Flink程序

然后进行MySQL DML 操作

insertinto users (name) values ('hello');
insertinto users (name) values ('world');
insertinto users (name) values ('iceberg');
insertinto users (name) values ('hudi');

update users set name = 'hello spark' where id = 4;
delete from users where id = 5;

查看HDFS上hudi数据路径:

Hudi 默认情况下,MERGE_ON_READ表的压缩是启用的, 触发器策略是在完成五次提交后执行压缩. 在MySQL执行insert、update、delete等操作后,就可以用hive/spark-sql/presto进行查询。
如果没有生成parquet文件,我们建的parquet表是查询不出数据的。

五次提交后可以看到数据文件:

关掉Flink CDC程序, 单独写个FlinkSQL程序读取HDFS 上hudi数据:

public static void main(String[] args) throwsException {
        final EnvironmentSettings fsSettings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        final StreamExecutionEnvironmentenvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);
        final StreamTableEnvironmenttableEnvironment = StreamTableEnvironment.create(environment, fsSettings);
       tableEnvironment.getConfig().setSqlDialect(SqlDialect.DEFAULT);

        String sourceDDL = "CREATE TABLEhudi_users2\n" +
                "(\n" +
                "    id BIGINT PRIMARY KEY NOT ENFORCED,\n"+
                "    name STRING,\n" +
                "    birthday TIMESTAMP(3),\n" +
                "    ts TIMESTAMP(3),\n" +
                "    partition VARCHAR(20)\n" +
                ") PARTITIONED BY(partition) WITH (\n" +
                "    'connector' = 'hudi',\n" +
                "    'table.type' = 'MERGE_ON_READ',\n" +
                "    'path' ='hdfs://ip:8020/hudi/hudi_users2',\n" +
                "    'read.streaming.enabled' = 'true',\n"+
                "    'read.streaming.check-interval' = '1'\n" +
                ")";
        tableEnvironment.executeSql(sourceDDL);
        TableResult result2 =tableEnvironment.executeSql("select * from hudi_users2");
        result2.print();

       environment.execute("read_hudi");
    }

FlinkSQL读取到打印的数据:

与MySQL 数据库表数据比对可以看到数据是一致的:

至此flink + hudi 湖仓一体化方案的原型就构建完成了.

本篇主要讲解Flink CDC与hudi整合实践, 探索新的湖仓一体架构, 业内37手游的湖仓一体架构也可供参考如下:

对频繁增加表字段的痛点需求,同步下游系统的时候希望能够自动加入这个字段,目前还没有完美的解决方案,Flink CDC社区后续看是否提供 Schema Evolution 的支持.

目前MySQL新增字段,是需要修改Flink程序,然后重启.

参考:

Original: https://www.cnblogs.com/bigdata1024/p/16226590.html
Author: chaplinthink
Title: Flink CDC 与Hudi整合

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/565333/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球