flink水位线

1. 时间语义

​ 有两个非常重要的时间概念:数据的产生时间(数据自带的创建时间)和处理时间(执行处理操作的机器的系统时间)。

​ 另外还有一个摄入时间,指的是数据进入flink数据流的时间,也就是source 算子读入数据的时间。

​ 一般以事件时间为基准,比如我们统计PV、UV 等指标,我们就需要以事件时间为基准。且flink的时间处理默认就是事件时间。

2. 水位线

1. 什么是水位线

​ 水位线可以看做是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间(实际就是用来度量事件时间的)。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

​ 水位线是是数据流中的一部分,随着数据一起流动,在不同任务之间传输。

​ 并不是每条数据都会生成水位线。水位线也是一条数据,是流数据的一部分,watermark是一个全局的值,不是某一个key下的值,所以即使不是同一个key的数据,其warmark也会增加。

1. 有序流中的水位线

​ 在有序数据流的情况下,如果每条数据都插入一个水位线会浪费大量的无用功,所以一般会每隔一段时间生成一个水位线,这个水位线的时间戳,就是当前最新数据的时间戳。

flink水位线

2. 乱序流中的水位线

​ 乱序是指数据的先后顺序不一致,主要就是基于数据的产生时间而言。插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线。也就是只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线。

​ 考虑大量数据的情况,也可以周期性插入水位线。这时要保存一下之前所有数据中的最大时间戳。

flink水位线

关于迟到的数据:

[En]

Data on lateness:

问题:上面的操作可以定义一个事件时钟,但有一个问题是它不能处理延迟数据。例如,当9S数据到达时,我们直接将时钟推到9S;如果有结束时间为0-9S的窗口,则应该关闭该窗口,并计算并输出采集的数据。但实际上,由于数据的无序,时间戳为7s和8s的数据可能要到9s以后才会到达,这会导致按照上面的逻辑丢失这部分数据。

[En]

Problem: the above operation can define an event clock, but one problem is that it cannot handle late data. For example, when the 9s data arrives, we push the clock directly to 9s; if there is a window with an end time of 0-9s, the window should be closed and the collected data should be calculated and output. But in fact, due to the disordered data, the data with a timestamp of 7s and 8s may not arrive until after 9s, which will lead to the loss of this part of the data according to the above logic.

解决方案:等待2s,即从已有数据的最大时间戳中减去2s(这个时间需要根据数据动态设置)。因此,当9S数据到达时,时钟前进到7S;在事件时钟前进到9S之前,您必须等待11S数据到达。修改后的水印如下:

[En]

Solution: wait for 2s, that is, subtract 2s from the maximum timestamp of the existing data (this time needs to be dynamically set according to the data). So when the 9s data arrives, the time clock advances to 7s; you have to wait for the 11s data to arrive before the event clock progresses to 9s. The modified water mark is as follows:

flink水位线

3. 水位线的特点

​ 水位线代表了当前的事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要。水位线用来保证窗口处理结果的正确性。

  • 水位线是插入数据流的标记,可视为一条特殊的数据
    [En]

    the water level line is a mark inserted into the data stream and can be thought of as a special piece of data*

  • 水印的主要内容是根据数据的时间戳生成的时间戳,用来表示当前事件时间的进度
    [En]

    the main content of the watermark is a timestamp generated based on the timestamp of the data, which is used to indicate the progress of the current event time*

  • 水印的时间戳必须单调递增,以确保任务的事件时间时钟不断向前移动。
    [En]

    the timestamp of the watermark must be monotonously incremented to ensure that the task’s event time clock keeps moving forward.*

  • 水印通过设置延迟,可确保无序数据得到正确处理
    [En]

    the watermark can ensure that out-of-order data is handled correctly by setting delay*

  • 一个水位线Watermark(t), 表示在当前流中事件事件已经达到了事件戳t,这代表t之前的所有数据都已经到齐了,之后流中不会出现时间戳

2. 如何生成水位线

​ 在flink的dataStream 中,有一个单独用于生成水位线的方法,assignTimestampsAndWatermarks 用于为流中的数据分配时间戳,并生成水位线来指示事件时间。

    public SingleOutputStreamOperator assignTimestampsAndWatermarks(
            WatermarkStrategy watermarkStrategy) {

​ 该方法接收的是一个水位策略WatermarkStrategy。WatermarkStrategy 包含一个时间戳分配器 TimestampAssigner 和 一个水位线生成器 WatermarkGenerator。

  1. 时间戳分配器

org.apache.flink.api.common.eventtime.TimestampAssigner

@Public
@FunctionalInterface
public interface TimestampAssigner {

    /**
     * The value that is passed to {@link #extractTimestamp} when there is no previous timestamp
     * attached to the record.

     */
    long NO_TIMESTAMP = Long.MIN_VALUE;

    /**
     * Assigns a timestamp to an element, in milliseconds since the Epoch. This is independent of
     * any particular time zone or calendar.

     *
     * The method is passed the previously assigned timestamp of the element. That previous
     * timestamp may have been assigned from a previous assigner. If the element did not carry a
     * timestamp before, this value is {@link #NO_TIMESTAMP} (= {@code Long.MIN_VALUE}: {@value
     * Long#MIN_VALUE}).

     *
     * @param element The element that the timestamp will be assigned to.

     * @param recordTimestamp The current internal timestamp of the element, or a negative value, if
     *     no timestamp has been assigned yet.

     * @return The new timestamp.

     */
    long extractTimestamp(T element, long recordTimestamp);
}

​ 主要负责从流数据元素的某个字段提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。

  1. 水位线生成器

org.apache.flink.api.common.eventtime.WatermarkGenerator

public interface WatermarkGenerator {

    /**
     * 每个数据到来都会调用
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    /**
     * 周期性调用的方法,可以由WatermarkOutput 发出水位线,周期时间为处理时间,默认为200ms。 可以通过executionEnvironment.getConfig().setAutoWatermarkInterval()设置
     */
    void onPeriodicEmit(WatermarkOutput output);
}

​ 主要负责按照既定的方式,基于时间戳生成水位线。

1. 内置水位线

​ 使用内置的有序和无序水位线。

package cn.qz.time;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class WatermarkTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource dataStreamSource = executionEnvironment.fromElements(
                new MyEvent("zs", "/user", 1000L),
                new MyEvent("zs", "/order", 1100L),
                new MyEvent("zs", "/product?id=1", 1200L),
                new MyEvent("ls", "/user", 1200L),
                new MyEvent("ls", "/product", 2000L),
                new MyEvent("ww", "/product", 4000L),
                new MyEvent("ww", "/order", 6000L),
                new MyEvent("zl", "/order", 10000L)
        );

        // 有序流
        /*dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner() {
                    @Override
                    public long extractTimestamp(MyEvent element, long recordTimestamp) {
                        *//**
         * 时间戳和水位线的单位都必须是毫秒
         *//*
                        return element.getTimestamp();
                    }
                }));*/
        // 无序流(且延迟时间是5s)
        dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(5))
                .withTimestampAssigner(new SerializableTimestampAssigner() {
                    @Override
                    public long extractTimestamp(MyEvent element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                }))
                // 根据user分组,开窗统计
                .keyBy(data -> data.user)
                .window(TumblingEventTimeWindows.of(Time.seconds(1)))
                .process(new WatermarkTestResult())
                .print();
        /**
         * 事实上。 有序流的水位线生成器本质和乱序流是一样的,相当于延迟设为0的乱序流水线生成器
         * forMonotonousTimestamps
         * forBoundedOutOfOrderness
         * @see AscendingTimestampsWatermarks
         */

        executionEnvironment.execute();
    }

    // 自定义处理窗口函数,输出当前的水位线和窗口信息以及每个窗口的数据信息
    public static class WatermarkTestResult extends ProcessWindowFunction {
        @Override
        public void process(String s, Context context, Iterable elements, Collector out) throws Exception {
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            Long currentWatermark = context.currentWatermark();
            Long count = elements.spliterator().getExactSizeIfKnown();

            // 收集元素, 然后汇总到结果集
            List result = new ArrayList<>();
            Iterator iterator = elements.iterator();
            while (iterator.hasNext()) {
                result.add(iterator.next().toString());
            }
            out.collect("窗口" + start + " ~ " + end + "中共有" + count + "个元素,窗口闭合计算时,水位线处于:" + currentWatermark + " result: " + result);
        }
    }
}

结果:

3> 窗口4000 ~ 5000中共有1个元素,窗口闭合计算时,水位线处于:9223372036854775807 result: [MyEvent(user=ww, url=/product, timestamp=4000)]
6> 窗口10000 ~ 11000中共有1个元素,窗口闭合计算时,水位线处于:9223372036854775807 result: [MyEvent(user=zl, url=/order, timestamp=10000)]
1> 窗口1000 ~ 2000中共有1个元素,窗口闭合计算时,水位线处于:9223372036854775807 result: [MyEvent(user=ls, url=/user, timestamp=1200)]
3> 窗口6000 ~ 7000中共有1个元素,窗口闭合计算时,水位线处于:9223372036854775807 result: [MyEvent(user=ww, url=/order, timestamp=6000)]
7> 窗口1000 ~ 2000中共有3个元素,窗口闭合计算时,水位线处于:9223372036854775807 result: [MyEvent(user=zs, url=/user, timestamp=1000), MyEvent(user=zs, url=/order, timestamp=1100), MyEvent(user=zs, url=/product?id=1, timestamp=1200)]
1> 窗口2000 ~ 3000中共有1个元素,窗口闭合计算时,水位线处于:9223372036854775807 result: [MyEvent(user=ls, url=/product, timestamp=2000)]

也可以强制将所有数据分组:(key By可以直接返回固定值)

[En]

You can also forcibly divide all data into a group: (keyBy can directly return a fixed value)

package cn.qz.time;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class WatermarkTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource dataStreamSource = executionEnvironment.fromElements(
                new MyEvent("zs", "/user", 1000L),
                new MyEvent("zs", "/order", 1100L),
                new MyEvent("zs", "/product?id=1", 1200L),
                new MyEvent("ls", "/user", 1200L),
                new MyEvent("ls", "/product", 2000L),
                new MyEvent("ww", "/product", 4000L),
                new MyEvent("ww", "/order", 6000L),
                new MyEvent("zl", "/order", 10000L)
        );

        // 有序流
        /*dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner() {
                    @Override
                    public long extractTimestamp(MyEvent element, long recordTimestamp) {
                        *//**
         * 时间戳和水位线的单位都必须是毫秒
         *//*
                        return element.getTimestamp();
                    }
                }));*/
        // 无序流(且延迟时间是5s)
        dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofMillis(5))
                .withTimestampAssigner(new SerializableTimestampAssigner() {
                    @Override
                    public long extractTimestamp(MyEvent element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                }))
                // 根据user分组,开窗统计
                .keyBy(data -> true)
                .window(TumblingEventTimeWindows.of(Time.seconds(1)))
                .process(new WatermarkTestResult())
                .print();
        /**
         * 事实上。 有序流的水位线生成器本质和乱序流是一样的,相当于延迟设为0的乱序流水线生成器
         * forMonotonousTimestamps
         * forBoundedOutOfOrderness
         * @see AscendingTimestampsWatermarks
         */

        executionEnvironment.execute();
    }

    // 自定义处理窗口函数,输出当前的水位线和窗口信息以及每个窗口的数据信息
    public static class WatermarkTestResult extends ProcessWindowFunction {
        @Override
        public void process(Boolean s, Context context, Iterable elements, Collector out) throws Exception {
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            Long currentWatermark = context.currentWatermark();
            Long count = elements.spliterator().getExactSizeIfKnown();

            // 收集元素, 然后汇总到结果集
            List result = new ArrayList<>();
            Iterator iterator = elements.iterator();
            while (iterator.hasNext()) {
                result.add(iterator.next().toString());
            }
            out.collect("窗口" + start + " ~ " + end + "中共有" + count + "个元素,窗口闭合计算时,水位线处于:" + currentWatermark + " result: " + result);
        }
    }
}

​ 这里注意: 乱序流中的水位线真正的时间戳,其实是 当前最大时间戳-延迟时间-1,单位毫秒。 减一的原因:

考虑时间戳为t的水位线,标识时间戳

Original: https://www.cnblogs.com/qlqwjy/p/16488771.html
Author: QiaoZhi
Title: flink水位线

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/7172/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

免费咨询
免费咨询
扫码关注
扫码关注
联系站长

站长Johngo!

大数据和算法重度研究者!

持续产出大数据、算法、LeetCode干货,以及业界好资源!

2022012703491714

微信来撩,免费咨询:xiaozhu_tec

分享本页
返回顶部