(6)FlinkSQL将kafka数据写入到mysql方式一

(6)FlinkSQL将kafka数据写入到mysql方式一

原创

wx5d37d5fd4aa62©著作权

文章标签 Flink FlinkSQL FlinkStreaming NBI大数据 NBI可视化 文章分类 Hadoop 大数据

©著作权归作者所有:来自51CTO博客作者wx5d37d5fd4aa62的原创作品,请联系作者获取转载授权,否则将追究法律责任

(6)FlinkSQL将kafka数据写入到mysql方式一

这里不展开zookeeper、kafka安装配置

(1)首先需要启动zookeeper和kafka

(6)FlinkSQL将kafka数据写入到mysql方式一

(2)定义一个kafka生产者

package com.producers;import com.alibaba.fastjson.JSONObject;import com.pojo.Event;import com.pojo.WaterSensor;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;import java.util.Random;/** * Created by lj on 2022-07-09. */public class Kafaka_Producer {    public final static String bootstrapServers = "127.0.0.1:9092";    public static void main(String[] args) {        Properties props = new Properties();        //设置Kafka服务器地址        props.put("bootstrap.servers", bootstrapServers);        //设置数据key的序列化处理类        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        //设置数据value的序列化处理类        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        KafkaProducer producer = new KafkaProducer<>(props);        try {            int i = 0;            Random r=new Random();   //不传入种子            String[] lang = {"flink","spark","hadoop","hive","hbase","impala","presto","superset","nbi"};            while(true) {                Thread.sleep(2000);                WaterSensor waterSensor = new WaterSensor(lang[r.nextInt(lang.length)],i,i);                i++;                String msg = JSONObject.toJSONString(waterSensor);                System.out.println(msg);                RecordMetadata recordMetadata = producer.send(new ProducerRecord<>("kafka_data_waterSensor", null, null,  msg)).get();//                System.out.println("recordMetadata: {"+ recordMetadata +"}");            }        } catch (Exception e) {            System.out.println(e.getMessage());        }    }}

(3)定义一个消息对象

package com.pojo;import java.io.Serializable;/** * Created by lj on 2022-07-05. */public class WaterSensor implements Serializable {    private String id;    private long ts;    private int vc;    public WaterSensor(){    }    public WaterSensor(String id,long ts,int vc){        this.id = id;        this.ts = ts;        this.vc = vc;    }    public int getVc() {        return vc;    }    public void setVc(int vc) {        this.vc = vc;    }    public String getId() {        return id;    }    public void setId(String id) {        this.id = id;    }    public long getTs() {        return ts;    }    public void setTs(long ts) {        this.ts = ts;    }}

(4)从kafka接入数据,并写入到mysql

public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);        //读取kafka的数据        Properties properties = new Properties();        properties.setProperty("bootstrap.servers","127.0.0.1:9092");        properties.setProperty("group.id", "consumer-group");        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        properties.setProperty("auto.offset.reset", "latest");        DataStreamSource streamSource = env.addSource(                new FlinkKafkaConsumer(                        "kafka_waterSensor",                        new SimpleStringSchema(),                        properties)        );        SingleOutputStreamOperator waterDS = streamSource.map(new MapFunction() {            @Override            public WaterSensor map(String s) throws Exception {                JSONObject json  = (JSONObject)JSONObject.parse(s);                return new WaterSensor(json.getString("id"),json.getLong("ts"),json.getInteger("vc"));            }        });        // 将流转化为表        Table table = tableEnv.fromDataStream(waterDS,                $("id"),                $("ts"),                $("vc"),                $("pt").proctime());        tableEnv.createTemporaryView("EventTable", table);        tableEnv.executeSql("CREATE TABLE flinksink (" +                "componentname STRING," +                "componentcount BIGINT NOT NULL," +                "componentsum BIGINT" +                ") WITH (" +                "'connector.type' = 'jdbc'," +                "'connector.url' = 'jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai'," +                "'connector.table' = 'flinksink'," +                "'connector.driver' =  'com.mysql.cj.jdbc.Driver'," +                "'connector.username' = 'root'," +                "'connector.password' = 'root'," +                "'connector.write.flush.max-rows'='3'\r\n" +                ")"        );        Table mysql_user = tableEnv.from("flinksink");        mysql_user.printSchema();        Table result = tableEnv.sqlQuery(                "SELECT " +                        "id as componentname, " +                //window_start, window_end,                        "COUNT(ts) as componentcount ,SUM(ts) as componentsum " +                        "FROM TABLE( " +                        "TUMBLE( TABLE EventTable , " +                        "DESCRIPTOR(pt), " +                        "INTERVAL '10' SECOND)) " +                        "GROUP BY id , window_start, window_end"        );        //方式一:写入数据库//        result.executeInsert("flinksink").print(); //;.insertInto("flinksink");        //方式二:写入数据库        tableEnv.createTemporaryView("ResultTable", result);        tableEnv.executeSql("insert into flinksink SELECT * FROM ResultTable").print();//        tableEnv.toAppendStream(result, Row.class).print("toAppendStream");           //追加模式        env.execute();    }

(5)效果演示

(6)FlinkSQL将kafka数据写入到mysql方式一

(6)FlinkSQL将kafka数据写入到mysql方式一
  • 打赏
  • 收藏
  • 评论
  • *举报

上一篇:(5)FlinkSQL将socket数据写入到mysql方式二

下一篇:(7)FlinkSQL将kafka数据写入到mysql方式二

Original: https://blog.51cto.com/u_14465598/5573673
Author: wx5d37d5fd4aa62
Title: (6)FlinkSQL将kafka数据写入到mysql方式一

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/508217/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球