背景:
批(有界流)模式下,需要统计处理总数据条数
如果使用DataSet
Api,可通过.count()
直接获取:
本文主要描述DataStream
如何获取总条数
思路:
- 当
DataStream
处于RuntimeExecutionMode.BATCH
模式下,sum
、reduce
等求和相关操作,仅会数据处理完成时,最后触发一次 - 参考:执行模式(流/批) | Apache Flink
完整Demo
@SneakyThrows
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置为批模式
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// BatchSource 为类名称,a.text 位于 resource 目录下
URL resource = BatchSource.class.getResource("/a.txt");
DataStreamSource streamSource = env.readTextFile(resource.getPath());
SingleOutputStreamOperator sumStream = streamSource
.map((MapFunction) value -> 1)
.keyBy((KeySelector) value -> value)
.sum(0);
sumStream.addSink(new SinkFunction() {
@Override
public void invoke(Integer value, Context context) throws Exception {
// 拿到数据总条数,可以做存储数据库或其他使用
System.out.println("sum is " + value);
}
});
env.execute();
}
// 输出结果:sum is 11
附录: a.txt
内容:
a,1631669734,32
b,1631669734,32
c,1631669734,32
a,1631669735,31
b,1631669736,34
a,1631669737,23
a,1631669738,21
a,1631669741,21
a,1631669742,21
a,1631669746,34
a,1631669799,34
Original: https://www.cnblogs.com/lalala1/p/15754068.html
Author: 蝎子莱莱②号
Title: flink dataStream count
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/620121/
转载文章受原作者版权保护。转载请注明原作者出处!