(1)通过FlinkSQL将数据写入mysql demo

(1)通过FlinkSQL将数据写入mysql demo

原创

wx5d37d5fd4aa62©著作权

文章标签 Flink 大数据 FlinkSQL 流式计算 flink 文章分类 Hadoop 大数据

©著作权归作者所有:来自51CTO博客作者wx5d37d5fd4aa62的原创作品,请联系作者获取转载授权,否则将追究法律责任

(1)通过FlinkSQL将数据写入mysql demo

FlinkSQL的出现,极大程度上降低了Flink的编程门槛,更加容易理解和掌握使用。今天将自己的笔记分享出来,希望能帮助在这方面有需要的朋友。

(1)首先引入POM依赖:

1.13.1    2.12    1.7.30            org.apache.flink        flink-java        ${flink.version}                org.apache.flink        flink-streaming-java_${scala.binary.version}        ${flink.version}                org.apache.flink        flink-clients_${scala.binary.version}        ${flink.version}                org.apache.flink        flink-table-api-java-bridge_${scala.binary.version}        ${flink.version}                    org.apache.flink        flink-connector-jdbc_${scala.binary.version}        ${flink.version}                        org.apache.flink        flink-table-planner-blink_${scala.binary.version}        ${flink.version}                org.apache.flink        flink-streaming-scala_${scala.binary.version}        ${flink.version}                org.apache.flink        flink-table-common        ${flink.version}                org.apache.flink        flink-json        ${flink.version}                    com.fasterxml.jackson.core        jackson-databind        2.12.0

(2)编写代码

public static void main(String[] args) throws Exception {    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    EnvironmentSettings settings = EnvironmentSettings.newInstance()            .inStreamingMode()            //.useOldPlanner() // flink            .useBlinkPlanner() // blink            .build();    StreamTableEnvironment ste = StreamTableEnvironment.create(env, settings);    String ddl = "CREATE TABLE flinksinksds(\r\n" +            "componentname STRING,\r\n" +            "componentcount INT,\r\n" +            "componentsum INT\r\n" +            ") WITH(\r\n" +            "'connector.type'='jdbc',\r\n" +            "'connector.driver' =  'com.mysql.cj.jdbc.Driver'," +            "'connector.url'='jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai',\r\n" +            "'connector.table'='flinksink',\r\n" +            "'connector.username'='root',\r\n" +            "'connector.password'='root',\r\n" +            "'connector.write.flush.max-rows'='1'\r\n" +            ")";    System.err.println(ddl);    ste.executeSql(ddl);    String insert = "insert into flinksinksds(componentname,componentcount,componentsum)" +            "values('1024', 1 , 2 )";    ste.executeSql(insert);    env.execute();    System.exit(0);}

(3)执行结果:

(1)通过FlinkSQL将数据写入mysql demo
  • 打赏
  • 收藏
  • 评论
  • *举报

上一篇:(3)FlinkSQL滑动窗口demo演示

下一篇:(2)FlinkSQL滚动窗口demo演示

Original: https://blog.51cto.com/u_14465598/5573669
Author: wx5d37d5fd4aa62
Title: (1)通过FlinkSQL将数据写入mysql demo

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/508201/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球