大家好!
今天是 Flink 系列的第四篇:《每天5分钟Flink – 自定义 DataSource 案例》。
目标:通过每天一小会儿,熟悉 Flink 的方方面面,为后面算法实现提供工具基础。
本节内容
本节 1 个目的:熟悉 Flink 自定义 Source 的具体使用方法引例,今天用一个自定义 MySQL 的数据源案例进行说明!
自定义Source,实现消费MySQL中的数据
这里,咱们通过实现一个自定义的Source,消费Mysql中数据。
环境版本
JDK:1.8
Flink:1.13.2
Scala:2.11.12
自定义Source – MySQL
添加pom依赖
具体的版本可以根据具体的依赖进行查询
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.15</version> </dependency>
因为要从 MySQL 中消费数据,这里可以自定义一张 MySQL 数据表 person。
创建 person 表
作为读取的数据源。
CREATE TABLEperson
( id int(10) unsigned NOT NULL AUTO_INCREMENT, name varchar(260) NOT NULL DEFAULT '' COMMENT '姓名', age int(11) unsigned NOT NULL DEFAULT '0' COMMENT '年龄', sex tinyint(2) unsigned NOT NULL DEFAULT '2' COMMENT '0:女, 1男', email text COMMENT '邮箱', PRIMARY KEY (id
) ) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8 COMMENT='人员定义';
随后插入一些数据,作为数据源的内容(可自定义插入更多的数据进行消费)
insert into person values (null, 'JohngoFlink12', 12, 1, 'www.johngo689.com'), (null, 'JohngoFlink13', 13, 0, 'www.johngo689.com'), (null, 'JohngoFlink14', 14, 0, 'www.johngo689.com'), (null, 'JohngoFlink15', 15, 0, 'www.johngo689.com'), (null, 'JohngoFlink16', 16, 1, 'www.johngo689.com'), (null, 'JohngoFlink17', 17, 1, 'www.johngo689.com'), (null, 'JohngoFlink18', 18, 0, 'www.johngo689.com'), (null, 'JohngoFlink19', 19, 0, 'www.johngo689.com'), (null, 'JohngoFlink20', 20, 1, 'www.johngo689.com'), (null, 'JohngoFlink21', 21, 0, 'www.johngo689.com');
定义 Person 类
public class Person { public int id; public String name; public int age; public int sex; public String email; public Person() { } public Person(int id, String name, int age, int sex, String email) { this.id = id; this.name = name; this.age = age; this.sex = sex; this.email = email; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public int getSex() { return sex; } public void setSex(int sex) { this.sex = sex; } public String getEmail() { return email; } public void setEmail(String email) { this.email = email; } @Override public String toString() { return "Person{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + ", sex=" + sex + ", email='" + email + '\'' + '}'; } }
自定义 Source 源
根据之前描述的自定义方式 https://mp.weixin.qq.com/s/NxajZYuu_AUWGj6G8qBG7w。下面自定义一个 MySQL 的数据源作为 Flink 消费源。
public class MyRichSourceMysql extends RichSourceFunction<Person> { boolean isRunning = true; PreparedStatement ps = null; Connection conn = null; private static Connection getConnection() { Connection con = null; try { Class.forName("com.mysql.jdbc.Driver"); con = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_prj?useUnicode=true&characterEncoding=UTF-8", "root", "root123456"); } catch (Exception e) { System.out.println("connect error then exit." + e.getMessage()); } return con; } @Override public void run(SourceContext ctx) throws Exception { Person person = new Person(); ResultSet resSet = ps.executeQuery(); while (isRunning && resSet.next()) { person.setId(resSet.getInt("id")); person.setName(resSet.getString("name")); person.setAge(resSet.getInt("age")); person.setSex(resSet.getInt("sex")); person.setEmail(resSet.getString("email")); ctx.collect(person); } } @Override public void cancel() { isRunning = false; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); conn = getConnection(); String sql_execute = "select * from person"; ps = this.conn.prepareStatement(sql_execute); } @Override public void close() throws Exception { if (conn != null) { conn.close(); } if (ps != null) { ps.close(); } } }
在 MySQL 中创建表以及插入数据之后,就可以自定义 Source 消费 MySQL 中的数据了。
- 第一步:继承 RichSourceFunction 类,因为富类 source 提供了open() 方法和 close() 方法,在 open() 方法可初始化 MySQL 的连接信息,并且初始化查询句柄,close() 方法中可中断连接;
- 第二步:额外定义一个 MySQL 数据源连接初始化;
Class.forName("com.mysql.jdbc.Driver");
con = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_prj?useUnicode=true&characterEncoding=UTF-8", "root", "root123456"); - 第三步: open() 方法可初始化 MySQL 的连接信息,并且初始化查询句柄,close() 方法中可中断连接;
- 第四步:run() 方法中进行数据读取,并封装到 Person 类中。
使用自定义 Source 源读取数据
main 函数使用自定义MySQL数据源进行数据接入:
public class StreamingWithMyRichSourceMysql { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Person> personInfo = env.addSource(new MyRichSourceMysql()).setParallelism(1); personInfo.print("Person Info: "); String jobName = StreamingWithMyRichSourceMysql.class.getSimpleName(); env.execute(jobName); } }
执行可以看看打印结果:

可以看到存储到 MySQL 中的数据就被取出来了!
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/2187/
转载文章受原作者版权保护。转载请注明原作者出处!