(5)FlinkSQL将socket数据写入到mysql方式二
原创
wx5d37d5fd4aa62©著作权
文章标签 Flink FlinkSQL FlinkStreaming NBI大数据 NBI可视化 文章分类 Hadoop 大数据
©著作权归作者所有:来自51CTO博客作者wx5d37d5fd4aa62的原创作品,请联系作者获取转载授权,否则将追究法律责任
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); DataStreamSource streamSource = env.socketTextStream("127.0.0.1", 9999,"\n"); SingleOutputStreamOperator waterDS = streamSource.map(new MapFunction() { @Override public WaterSensor map(String s) throws Exception { String[] split = s.split(","); return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2])); } }); // 将流转化为表 Table table = tableEnv.fromDataStream(waterDS, $("id"), $("ts"), $("vc"), $("pt").proctime()); tableEnv.createTemporaryView("EventTable", table); tableEnv.executeSql("CREATE TABLE flinksink (" + "componentname STRING," + "componentcount BIGINT NOT NULL," + "componentsum BIGINT" + ") WITH (" + "'connector.type' = 'jdbc'," + "'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," + "'connector.table' = 'flinksink'," + "'connector.driver' = 'com.mysql.cj.jdbc.Driver'," + "'connector.username' = 'root'," + "'connector.password' = 'root'," + "'connector.write.flush.max-rows'='3'\r\n" + ")" ); Table mysql_user = tableEnv.from("flinksink"); mysql_user.printSchema(); Table result = tableEnv.sqlQuery( "SELECT " + "id as componentname, " + //window_start, window_end, "COUNT(ts) as componentcount ,SUM(ts) as componentsum " + "FROM TABLE( " + "TUMBLE( TABLE EventTable , " + "DESCRIPTOR(pt), " + "INTERVAL '10' SECOND)) " + "GROUP BY id , window_start, window_end" ); //方式一:写入数据库// result.executeInsert("flinksink").print(); //;.insertInto("flinksink"); //方式二:写入数据库 tableEnv.createTemporaryView("ResultTable", result); tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();// tableEnv.toAppendStream(result, Row.class).print("toAppendStream"); //追加模式 env.execute(); }
- 打赏
- 赞
- 收藏
- 评论
- *举报
下一篇:(6)FlinkSQL将kafka数据写入到mysql方式一
Original: https://blog.51cto.com/u_14465598/5573672
Author: wx5d37d5fd4aa62
Title: (5)FlinkSQL将socket数据写入到mysql方式二
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/53688/
转载文章受原作者版权保护。转载请注明原作者出处!