Flink Time

基础概念

支持三种时间概念:

  • Processing Time 时间递增
  • Ingestion Time : 摄入时间,数据进入Flink框架的时间,在Source Operator中设置,每个事件拿到当前时间作为时间戳,后续的时间窗口基于该时间
  • Event Time 支持一定程度的乱序
    上一个 checkpoint 或者 savepoint 进行重放,是不是希望结果完全相同。如果希望结果完全相同,就只能用 Event Time;如果接受结果不同,则可以用 Processing Time。

watermark

一个watermark 代表了 watermark所包含的timestamp 数值,表示后来的数据已经再也没有小于或等于这个时间的了.

Flink 支持两种 watermark 生成方式:

  • 在SourceFunction中产生

collectWithTimestamp 方法发送一条数据
第一个参数就是我们要发送的数据
第二个参数就是这个数据所对应的时间戳
emitWatermark 去产生一条 watermark: 表示接下来不会再有时间戳小于等于这个数值记录

  • 在使用DataStream API 的时候指定
DataStream.assignTimestampsAndWatermarks

建议生成的工作越靠近 DataSource 越好。这样会方便让程序逻辑里面更多的 operator 去判断某些数据是否乱序。

code demo:

object WaterMakerTest {
    def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)

        val dataStream: DataStream[Order] = env.socketTextStream("localhost", 9999).map(item => {
            val itemArray = item.split(",")
            Order(itemArray(0).toLong, itemArray(1), itemArray(2).toDouble)
        })

        val outputStream: DataStream[Order] = dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Order](Time.seconds(0)) {
            override def extractTimestamp(element: Order): Long = element.timestamp * 1000L
        }).keyBy("category").timeWindow(Time.seconds(5)).apply(new MyWindowFunction)

        dataStream.print("Data")
        outputStream.print("Result")

        env.execute()
    }
}

class MyWindowFunction extends WindowFunction[Order, Order, Tuple, TimeWindow] {
    override def apply(key: Tuple, window: TimeWindow, input: Iterable[Order], out: Collector[Order]): Unit = {
        val timestamp = window.maxTimestamp()
        var sum: Double = 0
        for (elem <- input) { sum +="elem.price" } val category="key.asInstanceOf[Tuple1[String]].f0" out.collect(order(timestamp, category, sum)) case class order(timestamp: long, category: string, price: double) < code></->

主要了解Flink的时间概念以及Watermark的作用,它可以处理乱序数据,通过watermark来定义关窗的时间点. 可以在SourceFunction和DataStream API 指定生成 Watermark.

Original: https://www.cnblogs.com/bigdata1024/p/16271916.html
Author: chaplinthink
Title: Flink Time

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/683953/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 技术管理进阶——一线Leader怎么做?经理的速成宝典

    原创不易,求分享、求一键三连本期培训材料关注公众号后回复: 经理培训,获得 前段时间有个同学问我有没有一线Leader的 速成培训课程,很好的问题,首先我们需要定义一下什么是小Le…

    技术杂谈 2023年6月1日
    0102
  • 获取当前程序的路径

    以下是获取当前程序路的代码:1 //该函数用于获取执行文件的目录,bExit为TRUE时判断获取的目录是否存在,缺省为TRUE 2 BOOL GetModuleFilePath(C…

    技术杂谈 2023年7月23日
    084
  • mysql 存储ipv6

    自定义列 https://groups.google.com/g/sqlalchemy/c/lZw0GipVYFw https://docs.sqlalchemy.org/en/1…

    技术杂谈 2023年7月11日
    063
  • 归并排序算法

    java代码: 1 public static void main(String[] args) { 2 int arr[] = {2,7,1,5,9,6,10}; //要排序的数…

    技术杂谈 2023年7月11日
    064
  • 关于产品UE的胡思乱想

    1、产品的目标是 取悦用户 不能只盯着功能实现,而不考虑用户使用。 我们的目标不应该不过让用户使用我们的产品。而是让用户在使用我们产品过程中感到 “愉悦”。…

    技术杂谈 2023年5月31日
    078
  • 类成员变量的初始化

    1-1 类成员变量初始化的分类 类成员变量的初始化可简单分为两类:非静态成员变量的初始化(以下简称”普通初始化”)和静态成员变量的初始化(”静态…

    技术杂谈 2023年7月23日
    070
  • 哈夫曼树的构建与最小带权路径长度

    注意:哈夫曼树并不唯一,但带权路径长度一定是相同的。 二叉树:每个结点最多含有两个子树的树称为二叉树。 定理:对于具有n个叶子结点的哈夫曼树,共有2n-1个结点。 哈夫曼树介绍 1…

    技术杂谈 2023年6月21日
    0109
  • MySQL安装和配置

    一、关闭防火墙并安装epel源 1、关闭selinux ①修改selinux的配置文件 [root@localhost ~]# vim /etc/selinux/config SE…

    技术杂谈 2023年7月11日
    067
  • WTL中最简单的实现窗口拖动的方法(转)

    目前,很多基于对话框的应用程序中对话框都是不带框架的,也就是说对话框没有标题栏。众所周知,窗口的移动都是通过鼠标拖动窗口的标题栏来实现的,那么现在应用程序中的对话框没有了标题栏,用…

    技术杂谈 2023年5月31日
    085
  • Markdown语法

    Markdown语法 1.标题 示例: 这是一级标题 ## 这是二级标题 ### 这是三级标题 #### 这是四级标题 ##### 这是五级标题 ###### 这是六级标题 渲染结…

    技术杂谈 2023年7月24日
    077
  • DevOps工程师

    DevOps工程师 1. DevOps工程师的任务是什么? 设计、构建、测试和部署可伸缩的分布式系统,实现从开发到部署的自动化 管理代码库(如Git、SVN、BitBucket等)…

    技术杂谈 2023年5月31日
    082
  • 使用 deb 包的方式安装 TensorRT

    使用 deb 包的方式安装 TensorRT 完成日期:2022 年 4 月 17 日最后修改:—-此文使用 markdown 完成 要在 ubuntu 下使用 Ten…

    技术杂谈 2023年7月11日
    091
  • 系统集成

    404. 抱歉,您访问的资源不存在。 可能是网址有误,或者对应的内容被删除,或者处于私有状态。 代码改变世界,联系邮箱 contact@cnblogs.com 园子的商业化努力-困…

    技术杂谈 2023年5月31日
    0101
  • 期末考试代码

    import javax.swing.*; import java.awt.*; //import java.awt.event.InputEvent; //import java…

    技术杂谈 2023年6月21日
    082
  • lsmod——显示已载入系统的模块

    lsmod——显示已载入系统的模块 lsmod 其实就是list modules的缩写,即 列出所有模块. 功能说明:显示已载入系统的模块。 语法:lsmod 说明:执行lsmod…

    技术杂谈 2023年5月31日
    083
  • springboot整合swagger3.0配置集UI界面优化

    添加pom依赖 //swagger &#x754C;&#x9762;&#x4F18;&#x5316;&#x4F9D;&#x8D56;…

    技术杂谈 2023年7月25日
    068
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球