flink dataStream count

背景:

批(有界流)模式下,需要统计处理总数据条数
如果使用 DataSetApi,可通过 .count()直接获取:
本文主要描述 DataStream如何获取总条数

思路:

完整Demo

    @SneakyThrows
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置为批模式
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        // BatchSource 为类名称,a.text 位于 resource 目录下
        URL resource = BatchSource.class.getResource("/a.txt");
        DataStreamSource streamSource = env.readTextFile(resource.getPath());

        SingleOutputStreamOperator sumStream = streamSource
                .map((MapFunction) value -> 1)
                .keyBy((KeySelector) value -> value)
                .sum(0);

        sumStream.addSink(new SinkFunction() {
            @Override
            public void invoke(Integer value, Context context) throws Exception {
                // 拿到数据总条数,可以做存储数据库或其他使用
                System.out.println("sum is " + value);
            }
        });
        env.execute();
    }
    // 输出结果:sum is 11
附录: a.txt 内容:
a,1631669734,32
b,1631669734,32
c,1631669734,32
a,1631669735,31
b,1631669736,34
a,1631669737,23
a,1631669738,21
a,1631669741,21
a,1631669742,21
a,1631669746,34
a,1631669799,34

Original: https://www.cnblogs.com/lalala1/p/15754068.html
Author: 蝎子莱莱②号
Title: flink dataStream count

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/620121/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • Nginx中的Rewrite的重定向配置与实践

    简介:Rewrite是Nginx服务器提供的一个重要的功能,它可以实现URL重定向功能。回到顶部 一:理解地址重写 与 地址转发的含义。 地址重写与 地址转发是两个不同的概念。 地…

    Java 2023年5月30日
    0105
  • nginx访问控制

    一、基于IP的访问控制 1、http_access_module a、配置语法 Syntax: allow address | CIDR | unix: | all; Defaul…

    Java 2023年5月30日
    086
  • Maven的生命周期

    生命周期的定义 Maven的生命周期( lifecycle)是对构建过程进行的抽象。 它包含了项目的 清理、初始化、 编译、 测试、 打包、集成测试、验证、 部署和 站点生成等几乎…

    Java 2023年6月6日
    0101
  • wordpress固定链接+宝塔nginx配置伪静态访问URL

    一、站点设置 打开站点设置,选择伪静态,选择wordpress 二、wordpress设置 打开wordpress后台,选择 设置 —》固定链接 选择一个你喜欢的格式点…

    Java 2023年6月15日
    089
  • 如何给注册中心锦上添花?

    hello,大家好,我是小楼。 在上一篇文章《如何组装一个注册中心》中,我们看到了如何利用一些现有的技术方案来组装出一个生产可用的注册中心最小集。 有的同学看完表示学到了,也有同学…

    Java 2023年6月6日
    086
  • Java精品视频资源,自学Java人手一套!

    ​ 发现有很多粉丝是学java的,为了大家能够快速成长,所以我今天精心挑选了一些java相关的视频资源分享给大家,大家一定好好利用起来,这些技术学会之后,进大厂指日可待,加油。 一…

    Java 2023年6月7日
    060
  • Mybatis(日志)

    6、日志 6.1、日志工厂 如果一个数据库操作出现了异常,我们需要排错,日志就是最好的助手。 曾经:sout,debug 现在:日志工厂 SLF4J LOG4J(deprecate…

    Java 2023年6月13日
    0100
  • mybatis 报错:Caused by: org.apache.ibatis.binding.BindingException: Parameter ” not found

    问题描述: Caused by: org.apache.ibatis.binding.BindingException: Parameter ” not found 问…

    Java 2023年5月30日
    073
  • 幂等公共组件

    前言 今天想聊一聊幂等相关的知识,以及实现一个幂等公共组件需要重点涉及和思考的点。 概念 首先,什么是幂等,在实际代码生产过程中有什么作用呢? 在编程中一个 _幂等_操作的特点是其…

    Java 2023年6月14日
    083
  • oracle数据库备份+锁表+操作数据库

    package cn.com.threeInOneRoad.task; import java.io.File;import java.io.IOException;import …

    Java 2023年6月16日
    090
  • 230_RabbitMQ-高级-分布式事务

    简述 分布式事务的方式 两阶段提交(2PC)需要数据库厂商的支持,java组件有atomikos等。 + 存在的问题 补偿事务(TCC) 严选,阿里,蚂蚁金服。 本地消息表(异步确…

    Java 2023年6月7日
    078
  • 设计模式 19 备忘录模式

    备忘录模式(Memento Pattern)属于 行为型模式 2021 年 10 月 1 日下午,河南驻马店的一名 13 岁女中学生,因和同学发生不愉快喝下半瓶 百草枯。10 月 …

    Java 2023年6月6日
    081
  • 68.可能否

    sfsd posted @2022-09-28 08:39 随遇而安== 阅读(5 ) 评论() 编辑 Original: https://www.cnblogs.com/55zj…

    Java 2023年6月7日
    079
  • BlockingQueue阻塞队列

    BlockingQueue阻塞队列 BlockingQueue的四组API undefined public static void test01(){ ArrayBlocking…

    Java 2023年6月5日
    076
  • java MD5加密

    /** * MD5加密 * @param message 要进行MD5加密的字符串 * @return 加密结果为32位字符串 */ private static String g…

    Java 2023年6月16日
    082
  • 幂等性问题以及解决方案

    幂等性问题以及解决方案 幂等性问题以及解决方案 1、问题 2、回答 2.1、 说一说什么是幂等? 2.2、 那么,我们对于这幂等性问题,有什么解决方案呢? 2.2.1、唯一索引 2…

    Java 2023年6月7日
    087
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球