每天5分钟Flink – DataSource和自定义Source

hello,大家好!

今天是 Flink 系列的第三篇。

目标:通过每天一小会儿,熟悉 Flink 的方方面面,为后面算法实现提供工具基础。

第三篇:《每天5分钟Flink – DataSource和自定义Source》

JDK:1.8

Flink:1.13.2

Scala:2.11.12

简说:Source 源

Flink 默认提供了大量已经实现好的 source API,同时,也可以通过 Flink 提供的接口实现自定义 Source 源。

如果在生产环境中,恰好是 Flink 已经实现的 Source 源,那么可以很容易的进行相应功能实现。

比如:

  • 基于socket:socketTextStream()
  • 基于文件:readTextFile(source_path)
  • 基于集合:fromCollection()
  • 基于其他内置connectorsApache Kafka (source/sink) Apache Cassandra (sink) Elasticsearch (sink) Hadoop FileSystem (sink) RabbitMQ (source/sink) Apache ActiveMQ (source/sink) Redis (sink)

如果是Flink没有默认的 Source源,那可通过自定义 Source源进行数据读取。

  • 非并行度 Source 源:实现 SourceFunction接口;
  • 并行度 Source 源:实现 ParallelSourceFunction接口或 RichParallelSourceFunction接口。

有趣的案例:

以下提供的案例均为自定义 Source!

以下有三个案例,均可根据代码直接进行跑通

  1. 自定义Source,实现自定义&并行度为1的source
  2. 自定义Source,实现一个支持并行度的source
  3. 自定义Source,实现一个支持并行度的富类source

No1.自定义Source,实现自定义&并行度为1的source

通过实现SourceFunction接口,自定义每秒产生一个数字。

需要注意,SourceFunction 和 SourceContext 都需要指定数据类型,如果不指定,代码运行的时候会报错!

public class MyNoParalleSource implements SourceFunction<Long> {
    private long count = 1L;
    private boolean isRunning = true;
  
    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        while(isRunning) {
            ctx.collect(count);
            count++;
            // 每秒产生一个数据
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

下面在主类中进行该自定义接口的使用:

public class StreamingWithMyNoPralalleSource {
    public static void main(String[] args) throws Exception {
        // 获取 flink 的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 获取数据源
        DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);

        DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                // 打印产出的数据
                System.out.println(value);
                return value;
            }
        });
        DataStream<Long> sum = num.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2))).sum(0);
        sum.print().setParallelism(1); // 打印窗口数据

        String jobName = StreamingWithMyNoPralalleSource.class.getSimpleName();
        env.execute(jobName);
    }
}

上面代码中的 setParallelism(1),可写可不写,因为默认就是单并行度的数据流。

首先进行 Flink 执行环境的设定:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

然后进行指定自定义Source 源:

DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);

然后窗口设定为滚动窗口,每 2s 进行求和打印。

DataStream<Long> sum = num.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2))).sum(0);

执行主函数,就可以看到将自定义 Source 中的数据进行打印了出来。

每天5分钟Flink - DataSource和自定义Source

下面再看一个可以支持并行度的自定义Source源。

No2.自定义Source,实现一个支持并行度的source

自定义实现一个支持并行度的 source,通过实现ParallelSourceFunction进行代码编写。

实现其中的 run()方法,每秒进行数字的生成。

public class MyParalleSource implements ParallelSourceFunction<Long> {

    private boolean isRunning = true;
    private long count = 0;

    // 自定义数据源的方法
    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        while (isRunning) {
            ctx.collect(count);
            count++;
            Thread.sleep(1000); // 1s 产生一次数据
        }
    }

    // 停止任务的时候会调用
    @Override
    public void cancel() {
        isRunning = false;
    }
}

下面在主类中进行自定义 Source 的调用

public class StreamingWithMyPralalleSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Long> text = env.addSource(new MyParalleSource()).setParallelism(2);

        DataStream<Long> num = text.map(new MapFunction<Long, Long>() {
            @Override
            public Long map(Long value) throws Exception {
                System.out.println("接收的数据" + value);
                return value;
            }
        });
        DataStream<Long> sum = num.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2))).sum(0);
        sum.print().setParallelism(2);

        String jobName = StreamingWithMyPralalleSource.class.getSimpleName();
        env.execute(jobName);
    }
}

设定了2个并行度:

DataStream<Long> text = env.addSource(new MyParalleSource()).setParallelism(2);

所以窗口计算每 2s 计算的就是前 2s 产出的 2 个并行产出的数据,看执行结果:

每天5分钟Flink - DataSource和自定义Source

以上就是设置了 2 个并行度后,然后利用自定义 Source 源,进行数据采集和打印。

有没有感觉上面自定义 Source 会有些单调,功能不够丰富,例如想要获取其他数据连接资源,是没有实现的地方的。

下面,进行自定义实现一个富类接口,来解决上面遇到的问题。

No3.自定义Source,实现一个支持并行度的富类source

所以 Flink 方面还可以实现一个接口类 RichParallelSourceFunction,可以在需要实现方法open() 中获取资源链接,在close中关闭资源链接。

public class MyRichParalleSource extends RichParallelSourceFunction<Long> {

    private long count = 1L;
    private boolean isRunning = true;

    /**
     * 在这个run方法中实现一个循环,循环产生数据
     */
    @Override
    public void run(SourceContext<Long> ctx) throws Exception {
        while (isRunning) {
            ctx.collect(count);
            count++;
            Thread.sleep(1000);
        }
    }

    /**
     * 取消一个cancel的时候会调用的方法
     */
    @Override
    public void cancel() {
        isRunning = false;
    }

    /**
     * 这个方法只会在最开始的时候被调用一次, 实现获取链接的代码
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("open打开连接,获取相关数据...");
        super.open(parameters);
    }

    /**
     * 实现关闭链接的代码
     */
    @Override
    public void close() throws Exception {
        System.out.println("close关闭连接,中断数据获取...");
        super.close();
    }
}

在多了两个方法 open()和 close() 的情况下,可进行更多的操作。

还是用主类进行调用实现:

public class StreamingWithMyRichPralalleSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Long> text = env.addSource(new MyRichParalleSource()).setParallelism(2);
        DataStream<Long> num = text.map(new MapFunction<Long, Long>() {

            @Override
            public Long map(Long value) throws Exception {
                System.out.println("接收到的数据" + value);
                return value;
            }
        });
        DataStream<Long> sum = num.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1))).sum(0);
        sum.print().setParallelism(2);

        String jobName = StreamingWithMyRichPralalleSource.class.getSimpleName();
        env.execute(jobName);
    }
}

执行看下效果:

每天5分钟Flink - DataSource和自定义Source

可以看到,在数据接收的开始,进行了其他资源连接的打开。

基于以上的例子,下一期 Flink 专题来说说Kafka Source 以及 MySQL自定义Source案例。

发布者:Johngo学长。文章已受到原创版权保护。
转载请注明出处:https://www.johngo689.com/2173/

(1)
上一篇 2021年9月21日 下午9:25
下一篇 2021年9月24日 下午3:01

相关推荐

发表评论

登录后才能评论

评论列表(1条)

免费咨询
免费咨询
扫码关注
扫码关注
联系站长

站长Johngo!

大数据和算法重度研究者!

持续产出大数据、算法、LeetCode干货,以及业界好资源!

2022012703491714

微信来撩,免费咨询:xiaozhu_tec

分享本页
返回顶部