Flink SQL 批模式下 ClickHouse 批量写入

Flink SQL 批模式下 ClickHouse 批量写入

内置使用 JdbcBatchingOutputFormat 批量处理类

pom依赖

  ru.yandex.clickhouse
  clickhouse-jdbc
  0.3.1-patch

  org.apache.flink
  flink-connector-jdbc_2.11
  ${flink.version}

  cn.hutool
  hutool-all
  ${hutool.version}

  mysql
  mysql-connector-java
  ${mysql.version}

clickHouse数据源需要的扩展类:
工厂类
public class ClickHouseDynamicTableFactory implements DynamicTableSinkFactory {
    public static final String IDENTIFIER = "clickhouse";

    private static final String DRIVER_NAME = "ru.yandex.clickhouse.ClickHouseDriver";

    public static final ConfigOption URL = ConfigOptions
            .key("url")
            .stringType()
            .noDefaultValue()
            .withDescription("the jdbc database url.");

    public static final ConfigOption TABLE_NAME = ConfigOptions
            .key("table-name")
            .stringType()
            .noDefaultValue()
            .withDescription("the jdbc table name.");

    public static final ConfigOption USERNAME = ConfigOptions
            .key("username")
            .stringType()
            .noDefaultValue()
            .withDescription("the jdbc user name.");

    public static final ConfigOption PASSWORD = ConfigOptions
            .key("password")
            .stringType()
            .noDefaultValue()
            .withDescription("the jdbc password.");

    public static final ConfigOption FORMAT = ConfigOptions
            .key("format")
            .stringType()
            .noDefaultValue()
            .withDescription("the format.");

    @Override
    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    @Override
    public Set> requiredOptions() {
        Set> requiredOptions = new HashSet<>();
        requiredOptions.add(TABLE_NAME);
        requiredOptions.add(URL);
        return requiredOptions;
    }

    @Override
    public Set> optionalOptions() {
        return new HashSet<>();
    }

    @Override
    public DynamicTableSink createDynamicTableSink(Context context) {

        // either implement your custom validation logic here ...

        final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

        final ReadableConfig config = helper.getOptions();

        // validate all options
        helper.validate();

        // get the validated options
        JdbcOptions jdbcOptions = getJdbcOptions(config);

        // derive the produced data type (excluding computed columns) from the catalog table
        final DataType dataType = context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();

        // table sink
        return new ClickHouseDynamicTableSink(jdbcOptions, dataType);
    }

    private JdbcOptions getJdbcOptions(ReadableConfig readableConfig) {
        final String url = readableConfig.get(URL);
        final JdbcOptions.Builder builder = JdbcOptions.builder()
                .setDriverName(DRIVER_NAME)
                .setDBUrl(url)
                .setTableName(readableConfig.get(TABLE_NAME))
                .setDialect(new ClickHouseDialect());

        readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
        readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
        return builder.build();
    }
}

方言类
public class ClickHouseDialect implements JdbcDialect {

    private static final long serialVersionUID = 1L;

    @Override
    public String dialectName() {
        return "ClickHouse";
    }

    @Override
    public boolean canHandle(String url) {
        return url.startsWith("jdbc:clickhouse:");
    }

    @Override
    public JdbcRowConverter getRowConverter(RowType rowType) {
        return new ClickHouseRowConverter(rowType);
    }

    @Override
    public String getLimitClause(long l) {
        return "limit num : " + l;
    }

    @Override
    public Optional defaultDriverName() {
        return Optional.of(ClickHouseDriver.class.getName());
    }

    @Override
    public String quoteIdentifier(String identifier) {
        return "" + identifier + "";
    }
}
Sink输出类(重点)
public class ClickHouseDynamicTableSink implements DynamicTableSink {

    private final JdbcOptions jdbcOptions;
    private final DataType dataType;
    private static final JdbcExecutionOptions DEFAULT_EXECUTION_OPTIONS = JdbcExecutionOptions.builder()
            // 写入触发数据量阈值
            .withBatchSize(2000)
            // 写入触发时间阈值
            .withBatchIntervalMs(1000)
            // 重试次数
            .withMaxRetries(3)
            .build();

    public ClickHouseDynamicTableSink(JdbcOptions jdbcOptions, DataType dataType) {
        this.jdbcOptions = jdbcOptions;
        this.dataType = dataType;
    }

    @Override
    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        return requestedMode;
    }

    @SneakyThrows
    @Override
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
        ClickHouseTableEnum tableEnum = ClickHouseTableEnum.valueOf(jdbcOptions.getTableName());
        TableService tableService = new TableServiceImpl(dataType, tableEnum);
        return SinkFunctionProvider.of(new GenericJdbcSinkFunction<>(
                new JdbcBatchingOutputFormat<>(
                        new SimpleJdbcConnectionProvider(jdbcOptions),
                        DEFAULT_EXECUTION_OPTIONS,
                        thisContext -> JdbcBatchStatementExecutor.simple(
                                tableService.getInsertSql(),
                                tableService.getStatementBuilder(),
                                Function.identity()),
                        // 批模式下,数据对象重复利用,会发生覆盖问题,需要深拷贝对象
                        new RowDataConventFunction())));
    }

    @Override
    public DynamicTableSink copy() {
        return new ClickHouseDynamicTableSink(jdbcOptions, dataType);
    }

    @Override
    public String asSummaryString() {
        return "ClickHouse Table Sink";
    }

    @Slf4j
    static class RowDataConventFunction implements JdbcBatchingOutputFormat.RecordExtractor, Serializable {
        @Override
        public RowData apply(RowData rowData) {
            BoxedWrapperRowData newRowData = null;
            try {
                newRowData = new BoxedWrapperRowData(rowData.getArity());
                // 利用反射拷贝旧对象的值
                Field field = ReflectUtil.getField(BoxedWrapperRowData.class, "fields");
                Object[] fields = (Object[]) ReflectUtil.getFieldValue(rowData, field);
                Object[] newFields = new Object[fields.length];
                for (int i = 0; i < fields.length; i++) {
                    newFields[i] = Objects.isNull(fields[i]) ? null : ReflectUtil.invoke(fields[i], "copy");
                }
                ReflectUtil.setFieldValue(newRowData, "fields", newFields);
            } catch (Exception e) {
                log.error("convert error,data:{},", rowData, e);
            }
            return newRowData;
        }
    }
}
转换类
public class ClickHouseRowConverter extends AbstractJdbcRowConverter {
    private static final long serialVersionUID = 1L;

    public ClickHouseRowConverter(RowType rowType) {
        super(rowType);
    }

    @Override
    public String converterName() {
        return "ClickHouse";
    }
}

支持序列化的 BiFunction

@FunctionalInterface
public interface MyBiFunction extends Serializable {
    R apply(T t, U u);
}
sql 生成类
public class TableServiceImpl {
    private final List logicalTypeList;
    private final String insertSql;

    public TableServiceImpl(DataType dataType, ClickHouseTableEnum tableEnum) {
        this.logicalTypeList = dataType.getLogicalType().getChildren();
        this.insertSql = initInsertSql(tableEnum);
    }

    private static final Map, MyBiFunction> FUNCTION_MAP = Maps.newHashMap();

    static {
        // 我的业务中用到的类型,可根据自己的业务,进行增加
        FUNCTION_MAP.put(IntType.class, RowData::getInt);
        FUNCTION_MAP.put(VarCharType.class, RowData::getString);
        FUNCTION_MAP.put(DoubleType.class, RowData::getDouble);
        FUNCTION_MAP.put(BigIntType.class, RowData::getLong);
        FUNCTION_MAP.put(CharType.class, RowData::getString);
    }

    public String getInsertSql() {
        return insertSql;
    }

    public JdbcStatementBuilder getStatementBuilder() {
        return (statement, value) -> {
            for (int i = 0; i < logicalTypeList.size(); i++) {
                LogicalType logicalType = logicalTypeList.get(i);
                Object realValue = FUNCTION_MAP.get(logicalType.getClass()).apply(value, i);
                statement.setObject(i + 1, realValue);
            }
        };
    }

    // 根据枚举字段配置,生成 insert sql
    public static String initInsertSql(ClickHouseTableEnum tableEnum) {
        List columns = tableEnum.getColumns().stream().map(ClickHouseTableEnum.ColumnObj::getColumnName).collect(Collectors.toList());
        return String.format("insert into %s (%s) values (%s)"
                , tableEnum.name()
                , StrUtil.join(",", columns)
                , StrUtil.repeatAndJoin("?", columns.size(), ","));
    }

    public static void main(String[] args) {
        System.out.println(initInsertSql(ClickHouseTableEnum.attr_order_group));
    }
}
clickHouseTable 枚举类
@Getter
public enum ClickHouseTableEnum {
    /**
     * 测试表,因为业务需要,我定义的 ColumnObj 类,实际用个字符串就ok
     */
    test(Lists.newArrayList(
            ColumnObj.of("name")
            , ColumnObj.of("age")
    )),
    ;
    private final List columns;

    ClickHouseTableEnum(List columns) {
        this.columns = columns;
    }

    @Getter
    @Setter
    @ToString
    public static class ColumnObj {
        /**
         * clickHouse 中字段名称
         */
        private String columnName;
        /**
         * flink sql 中获取字段的key
         */
        private String sqlColumnKey;

        /**
         * 两个值相同的情况,使用此构造函数
         */
        private ColumnObj(String columnName) {
            this.columnName = columnName;
            this.sqlColumnKey = columnName;
        }
    }
}
Spi 配置自定义的工厂

resources 目录下,创建 META-INF/services 目录
创建文件: org.apache.flink.table.factories.Factory

内容如下:指向自己的工厂类全路径

com.xxx.xxx.xxx.ClickHouseDynamicTableFactory

Flink SQL 批模式下 ClickHouse 批量写入
输出测试
public class Test {
    public static void main(String[] args) {
        // 初始化 批模式环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();
        Configuration configuration = settings.toConfiguration();
        configuration.set(CoreOptions.DEFAULT_PARALLELISM, 5);
        TableEnvironment tableEnv = TableEnvironment.create(configuration);

        // 创建 clickHouse 输出表
        // 注意,WITH 后面的参数,table-name 需要跟 clickHouseTable 枚举类中对应上
        tableEnv.executeSql("CREATE TABLE out_table_test (\n" +
                "  name STRING,\n" +
                "  age INT\n" +
                ")  WITH (\n" +
                "   'connector' = 'clickhouse',\n" +
                "   'url' = 'jdbc:clickhouse://172.23.4.32:8123/test',\n" +
                "   'table-name' = 'test'\n" +
                ")");

        Table table = tableEnv.sqlQuery("select 'alice',18 ");
        table.executeInsert("out_table_test");
        // 打印日志
        printLog(tableEnv, table, "test");
    }

    private static void printLog(TableEnvironment tableEnv, Table endTable, String outTableName) {
        String outPrint = "consolePrint_" + outTableName;
        tableEnv.executeSql("CREATE TABLE " + outPrint + " " + endTable.getResolvedSchema() + " WITH (\n" +
                "  'connector' = 'print'\n" +
                ")");
        endTable.executeInsert(outPrint);

        Table countTable = tableEnv.sqlQuery("select count(*) from " + endTable);
        tableEnv.executeSql("CREATE TABLE " + outPrint + "_count " + countTable.getResolvedSchema() + " WITH (\n" +
                "  'connector' = 'print'\n" +
                ")");
        countTable.executeInsert(outPrint + "_count");
    }
}

Original: https://www.cnblogs.com/lalala1/p/16692511.html
Author: 蝎子莱莱②号
Title: Flink SQL 批模式下 ClickHouse 批量写入

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/620123/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • NoSQL,关系型数据库,行列数据库对比、类比

    数据库对比、类比 关系型数据库、NoSQL 关系型数据库 表与表之间有关系。表存储一些格式化的数据结构,每个元组字段的组成都一样,这样便于表之间的联结操作。不过也限制了其性能瓶颈。…

    Java 2023年6月8日
    080
  • java并发的发布和订阅测试

    现在编码的时候,为了处理消息,大家动不动就上个重器,例如MQ之类的。但很多时候,并不是那么有必要,因为数据量和并发其实远远不够。 可以替代的方案非常多,其中一个是java.util…

    Java 2023年6月9日
    067
  • java数组算法——数组元素的赋值2

    java数组算法——数组元素的赋值2——java经典面试题:创建一个长度为6的int型数组,要求数组元素的值都在1-30之间,且是随机赋值。同时要求元素时的值各不相同 经典面试题:…

    Java 2023年6月5日
    0100
  • 【每天学一点-01】 在SpringBoot项目中使用Swagger2

    今天在做毕设的时候,发现在前后端分离的情况下,去调用接口数据时很不方便,然后回想过去,和同学一起做项目的时候,他负责后端,我负责前端,当时调用他的弄好的接口可以说是非常方便,主要是…

    Java 2023年6月5日
    099
  • 2022最新版SSM源码分析:一套教程助你深入理解底层原理,提高核心竞争力!

    众所周知SSM源码分析教程里面包括Mybatis、Spring以及SpringMVC这三个经典的开源框架的源码分析。我们编程人员技术提升逃不过的一个重要方式就是阅读和理解优秀开源项…

    Java 2023年6月9日
    0102
  • 热部署只知道devtools吗?JRebel不香吗?

    前言 JRebel收费怎么破? 什么是本地热部署? 什么是远程热部署? JRebel和devtools的区别 如何安装JRebel? 如何本地热部署? 如何远程热部署? 多模块开发…

    Java 2023年6月14日
    0114
  • GBase数据库对象相关操作

    create table &#x8868;&#x540D;(&#x5217;&#x540D; &#x6570;&#x636E;&am…

    Java 2023年6月9日
    087
  • m

    MyBatisPlus架构图(盗用官网的,侵,删。) SpringBoot第一个简单应用 8.编写测试类 运行结果: 常用注解 MyBatisPlus提供了一些注解供我们在实体类和…

    Java 2023年5月30日
    077
  • Netty源码分析之ByteBuf(二)—内存分配器ByteBufAllocator

    Netty中的内存分配是基于ByteBufAllocator这个接口实现的,通过对它的具体实现,可以用来分配我们之前描述过的任意类型的BytebBuf实例;我们先看一下ByteBu…

    Java 2023年6月9日
    081
  • arthas学习图文记录

    Arthas 是阿里开源的 Java 诊断工具。在线排查问题,无需重启;动态跟踪 Java 代码;实时监控 JVM 状态。Arthas 支持 JDK 6+,支持 Linux/Mac…

    Java 2023年6月5日
    0137
  • Java9的模块化是什么

    Java9中的一个重大特性是增加了一种新型的程序设计组件 – 模块。 官方对模块的定义为:一个被命名的,代码和数据的自描述集合。( the module, which …

    Java 2023年6月7日
    098
  • MyBatis-Plus–@TableLogic注解

    开发过程中一般会遇到删除场景,但是为了保存数据实际运用时不会真的删除,MyBatis-Plus里可以将某个字段(例:delete_flag)标记为逻辑删除字段,方法是:在字段上加@…

    Java 2023年6月13日
    084
  • leetcode 144. Binary Tree Preorder Traversal 二叉树展开为链表(中等)

    一、题目大意 给你二叉树的根节点 root ,返回它节点值的 前序 遍历。 示例 1: 输入:root = [1,null,2,3]输出:[1,2,3] 示例 2: 输入:root…

    Java 2023年6月14日
    083
  • 设计模式在 Spring 框架中的良好应用

    在开始正文之前,请你先思考几个问题: 你项目中有使用哪些 GOF 设计模式 说一说 GOF 23 种设计模式的设计理念 说说 Spring 框架中如何实现设计模式 假设我是面试官问…

    Java 2023年5月30日
    095
  • 半同步半异步线程池框架代码实现

    cpp;gutter:true; SyncTaskQueue.h</p> <h1>pragma once</h1> <h1>incl…

    Java 2023年5月30日
    082
  • 反射和注解

    1.1 类加载 当程序要使用某个类时,如果该类还未被加载到内存中,则系统会通过类的加载,类的连接,类的初始化这三个步骤来对类进行初始化。如果不出意外情况,JVM将会连续完成这三个步…

    Java 2023年6月6日
    0100
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球