hello,大家好!
今天是 Flink 系列的第三篇。
目标:通过每天一小会儿,熟悉 Flink 的方方面面,为后面算法实现提供工具基础。
第三篇:《每天5分钟Flink – DataSource和自定义Source》
JDK:1.8
Flink:1.13.2
Scala:2.11.12
简说:Source 源
Flink 默认提供了大量已经实现好的 source API,同时,也可以通过 Flink 提供的接口实现自定义 Source 源。
如果在生产环境中,恰好是 Flink 已经实现的 Source 源,那么可以很容易的进行相应功能实现。
比如:
- 基于socket:
socketTextStream()
- 基于文件:
readTextFile(source_path)
- 基于集合:
fromCollection()
- 基于其他内置
connectors
Apache Kafka (source/sink) Apache Cassandra (sink) Elasticsearch (sink) Hadoop FileSystem (sink) RabbitMQ (source/sink) Apache ActiveMQ (source/sink) Redis (sink)
如果是Flink没有默认的 Source
源,那可通过自定义 Source
源进行数据读取。
- 非并行度 Source 源:实现
SourceFunction
接口; - 并行度 Source 源:实现
ParallelSourceFunction
接口或RichParallelSourceFunction
接口。
有趣的案例:
以下提供的案例均为自定义 Source!
以下有三个案例,均可根据代码直接进行跑通
- 自定义Source,实现自定义&并行度为1的source
- 自定义Source,实现一个支持并行度的source
- 自定义Source,实现一个支持并行度的富类source
No1.自定义Source,实现自定义&并行度为1的source
通过实现SourceFunction
接口,自定义每秒产生一个数字。
需要注意,SourceFunction 和 SourceContext 都需要指定数据类型,如果不指定,代码运行的时候会报错!
public class MyNoParalleSource implements SourceFunction<Long> { private long count = 1L; private boolean isRunning = true; @Override public void run(SourceContext<Long> ctx) throws Exception { while(isRunning) { ctx.collect(count); count++; // 每秒产生一个数据 Thread.sleep(1000); } } @Override public void cancel() { isRunning = false; } }
下面在主类中进行该自定义接口的使用:
public class StreamingWithMyNoPralalleSource { public static void main(String[] args) throws Exception { // 获取 flink 的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 获取数据源 DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1); DataStream<Long> num = text.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { // 打印产出的数据 System.out.println(value); return value; } }); DataStream<Long> sum = num.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2))).sum(0); sum.print().setParallelism(1); // 打印窗口数据 String jobName = StreamingWithMyNoPralalleSource.class.getSimpleName(); env.execute(jobName); } }
上面代码中的 setParallelism(1)
,可写可不写,因为默认就是单并行度的数据流。
首先进行 Flink 执行环境的设定:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
然后进行指定自定义Source 源:
DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);
然后窗口设定为滚动窗口,每 2s 进行求和打印。
DataStream<Long> sum = num.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2))).sum(0);
执行主函数,就可以看到将自定义 Source 中的数据进行打印了出来。

下面再看一个可以支持并行度的自定义Source源。
No2.自定义Source,实现一个支持并行度的source
自定义实现一个支持并行度的 source,通过实现ParallelSourceFunction
进行代码编写。
实现其中的 run()
方法,每秒进行数字的生成。
public class MyParalleSource implements ParallelSourceFunction<Long> { private boolean isRunning = true; private long count = 0; // 自定义数据源的方法 @Override public void run(SourceContext<Long> ctx) throws Exception { while (isRunning) { ctx.collect(count); count++; Thread.sleep(1000); // 1s 产生一次数据 } } // 停止任务的时候会调用 @Override public void cancel() { isRunning = false; } }
下面在主类中进行自定义 Source 的调用
public class StreamingWithMyPralalleSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Long> text = env.addSource(new MyParalleSource()).setParallelism(2); DataStream<Long> num = text.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("接收的数据" + value); return value; } }); DataStream<Long> sum = num.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2))).sum(0); sum.print().setParallelism(2); String jobName = StreamingWithMyPralalleSource.class.getSimpleName(); env.execute(jobName); } }
设定了2个并行度:
DataStream<Long> text = env.addSource(new MyParalleSource()).setParallelism(2);
所以窗口计算每 2s 计算的就是前 2s 产出的 2 个并行产出的数据,看执行结果:

以上就是设置了 2 个并行度后,然后利用自定义 Source 源,进行数据采集和打印。
有没有感觉上面自定义 Source 会有些单调,功能不够丰富,例如想要获取其他数据连接资源,是没有实现的地方的。
下面,进行自定义实现一个富类接口,来解决上面遇到的问题。
No3.自定义Source,实现一个支持并行度的富类source
所以 Flink 方面还可以实现一个接口类 RichParallelSourceFunction
,可以在需要实现方法open()
中获取资源链接,在close中关闭资源链接。
public class MyRichParalleSource extends RichParallelSourceFunction<Long> { private long count = 1L; private boolean isRunning = true; /** * 在这个run方法中实现一个循环,循环产生数据 */ @Override public void run(SourceContext<Long> ctx) throws Exception { while (isRunning) { ctx.collect(count); count++; Thread.sleep(1000); } } /** * 取消一个cancel的时候会调用的方法 */ @Override public void cancel() { isRunning = false; } /** * 这个方法只会在最开始的时候被调用一次, 实现获取链接的代码 */ @Override public void open(Configuration parameters) throws Exception { System.out.println("open打开连接,获取相关数据..."); super.open(parameters); } /** * 实现关闭链接的代码 */ @Override public void close() throws Exception { System.out.println("close关闭连接,中断数据获取..."); super.close(); } }
在多了两个方法 open()
和 close()
的情况下,可进行更多的操作。
还是用主类进行调用实现:
public class StreamingWithMyRichPralalleSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Long> text = env.addSource(new MyRichParalleSource()).setParallelism(2); DataStream<Long> num = text.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println("接收到的数据" + value); return value; } }); DataStream<Long> sum = num.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1))).sum(0); sum.print().setParallelism(2); String jobName = StreamingWithMyRichPralalleSource.class.getSimpleName(); env.execute(jobName); } }
执行看下效果:

可以看到,在数据接收的开始,进行了其他资源连接的打开。
基于以上的例子,下一期 Flink 专题来说说Kafka Source 以及 MySQL自定义Source案例。
发布者:Johngo学长。文章已受到原创版权保护。
转载请注明出处:https://www.johngo689.com/2173/
评论列表(1条)
Flink第三篇