(1)通过FlinkSQL将数据写入mysql demo
原创
wx5d37d5fd4aa62©著作权
文章标签 Flink 大数据 FlinkSQL 流式计算 flink 文章分类 Hadoop 大数据
©著作权归作者所有:来自51CTO博客作者wx5d37d5fd4aa62的原创作品,请联系作者获取转载授权,否则将追究法律责任
FlinkSQL的出现,极大程度上降低了Flink的编程门槛,更加容易理解和掌握使用。今天将自己的笔记分享出来,希望能帮助在这方面有需要的朋友。
(1)首先引入POM依赖:
1.13.1 2.12 1.7.30 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-table-api-java-bridge_${scala.binary.version} ${flink.version} org.apache.flink flink-connector-jdbc_${scala.binary.version} ${flink.version} org.apache.flink flink-table-planner-blink_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} org.apache.flink flink-table-common ${flink.version} org.apache.flink flink-json ${flink.version} com.fasterxml.jackson.core jackson-databind 2.12.0
(2)编写代码
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode() //.useOldPlanner() // flink .useBlinkPlanner() // blink .build(); StreamTableEnvironment ste = StreamTableEnvironment.create(env, settings); String ddl = "CREATE TABLE flinksinksds(\r\n" + "componentname STRING,\r\n" + "componentcount INT,\r\n" + "componentsum INT\r\n" + ") WITH(\r\n" + "'connector.type'='jdbc',\r\n" + "'connector.driver' = 'com.mysql.cj.jdbc.Driver'," + "'connector.url'='jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai',\r\n" + "'connector.table'='flinksink',\r\n" + "'connector.username'='root',\r\n" + "'connector.password'='root',\r\n" + "'connector.write.flush.max-rows'='1'\r\n" + ")"; System.err.println(ddl); ste.executeSql(ddl); String insert = "insert into flinksinksds(componentname,componentcount,componentsum)" + "values('1024', 1 , 2 )"; ste.executeSql(insert); env.execute(); System.exit(0);}
(3)执行结果:
- 打赏
- 赞
- 收藏
- 评论
- *举报
Original: https://blog.51cto.com/u_14465598/5573669
Author: wx5d37d5fd4aa62
Title: (1)通过FlinkSQL将数据写入mysql demo
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/508201/
转载文章受原作者版权保护。转载请注明原作者出处!