(5)Flink CEP SQL四种匹配模式效果演示

(5)Flink CEP SQL四种匹配模式效果演示

Flink CEP SQL中提供了四种匹配策略:

(1)skip to next row

从成功匹配的事件序列中第一个事件的下一个事件开始下一个匹配

[En]

Start the next match from the next event of the first event in the successfully matched event sequence

(2)skip past last row

从成功匹配的事件序列中最后一个事件的下一个事件开始下一个匹配

[En]

Start the next match from the next event of the last event in the successfully matched event sequence

(3)skip to first pattern Item

从匹配成功的事件序列中第一个对应于patternItem的事件开始进行下一次匹配

(4)skip to last pattern Item

从匹配成功的事件序列中最后一个对应于patternItem的事件开始进行下一次匹配

接下来,让我们编写代码来演示每个策略模式表达式的效果:

[En]

Next, let’s code to demonstrate the effect of each policy pattern expression:

(1)skip to next row

package com.examples;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
* Created by lj on 2022-08-08.
*/
public class CEPSQLExampleAfterMatch {
private static final Logger LOG = LoggerFactory.getLogger(CEPSQLExampleAfterMatch.class);

public static void main(String[] args) {
EnvironmentSettings settings = null;
StreamTableEnvironment tEnv = null;
try {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
tEnv = StreamTableEnvironment.create(env, settings);
final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
DataStream<Ticker> dataStream =
env.fromElements(
new Ticker(2, "Apple", 11, 2, LocalDateTime.parse("2021-12-10 10:00:01", dateTimeFormatter)),
new Ticker(3, "Apple", 16, 2, LocalDateTime.parse("2021-12-10 10:00:02", dateTimeFormatter)),
new Ticker(4, "Apple", 13, 2, LocalDateTime.parse("2021-12-10 10:00:03", dateTimeFormatter)),
new Ticker(5, "Apple", 15, 2, LocalDateTime.parse("2021-12-10 10:00:04", dateTimeFormatter)),
new Ticker(6, "Apple", 14, 1, LocalDateTime.parse("2021-12-10 10:00:05", dateTimeFormatter)),
new Ticker(7, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:06", dateTimeFormatter)),
new Ticker(8, "Apple", 23, 2, LocalDateTime.parse("2021-12-10 10:00:07", dateTimeFormatter)),
new Ticker(9, "Apple", 22, 2, LocalDateTime.parse("2021-12-10 10:00:08", dateTimeFormatter)),
new Ticker(10, "Apple", 25, 2, LocalDateTime.parse("2021-12-10 10:00:09", dateTimeFormatter)),
new Ticker(11, "Apple", 11, 1, LocalDateTime.parse("2021-12-10 10:00:11", dateTimeFormatter)),
new Ticker(12, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:12", dateTimeFormatter)),
new Ticker(13, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:13", dateTimeFormatter)),
new Ticker(14, "Apple", 25, 1, LocalDateTime.parse("2021-12-10 10:00:14", dateTimeFormatter)),
new Ticker(15, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:15", dateTimeFormatter)),
new Ticker(16, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:16", dateTimeFormatter)),
new Ticker(17, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:17", dateTimeFormatter)),
new Ticker(18, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:18", dateTimeFormatter)));

Table table = tEnv.fromDataStream(dataStream, Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("symbol", DataTypes.STRING())
.column("price", DataTypes.BIGINT())
.column("tax", DataTypes.BIGINT())
.column("rowtime", DataTypes.TIMESTAMP(3))
.watermark("rowtime", "rowtime - INTERVAL '1' SECOND")
.build());
tEnv.createTemporaryView("CEP_SQL_1", table);

String sql = "SELECT * " +
"FROM CEP_SQL_1 " +
" MATCH_RECOGNIZE ( " +
" PARTITION BY symbol " + //按symbol分区,将相同卡号的数据分到同一个计算节点上。
" ORDER BY rowtime " + //在窗口内,对事件时间进行排序。
" MEASURES " + //定义如何根据匹配成功的输入事件构造输出事件
" FIRST(e1.id) as id,"+
" AVG(e1.price) as avgPrice,"+
" FIRST(e1.rowtime) AS e1_start_tstamp, " +
" LAST(e2.rowtime) AS e2_fast_tstamp " +
" ONE ROW PER MATCH " + //匹配成功输出一条
" AFTER MATCH skip to next row " +
" PATTERN ( e1+ e2) WITHIN INTERVAL '2' MINUTE" +
" DEFINE " + //定义各事件的匹配条件
" e1 AS " +
" e1.price < 19 , " +
" e2 AS " +
" e2.price >= 19 " +
" ) MR";


TableResult res = tEnv.executeSql(sql);
res.print();
tEnv.dropTemporaryView("CEP_SQL_1");

} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}


public static class Ticker {
public long id;
public String symbol;
public long price;
public long tax;
public LocalDateTime rowtime;

public Ticker() {
}

public Ticker(long id, String symbol, long price, long item, LocalDateTime rowtime) {
this.id = id;
this.symbol = symbol;
this.price = price;
this.tax = tax;
this.rowtime = rowtime;
}
}
}

(5)Flink CEP SQL四种匹配模式效果演示

(2)skip past last row

package com.examples;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
* Created by lj on 2022-08-08.
*/
public class CEPSQLExampleAfterMatch {
private static final Logger LOG = LoggerFactory.getLogger(CEPSQLExampleAfterMatch.class);

public static void main(String[] args) {
EnvironmentSettings settings = null;
StreamTableEnvironment tEnv = null;
try {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
tEnv = StreamTableEnvironment.create(env, settings);
final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
DataStream<Ticker> dataStream =
env.fromElements(
new Ticker(2, "Apple", 11, 2, LocalDateTime.parse("2021-12-10 10:00:01", dateTimeFormatter)),
new Ticker(3, "Apple", 16, 2, LocalDateTime.parse("2021-12-10 10:00:02", dateTimeFormatter)),
new Ticker(4, "Apple", 13, 2, LocalDateTime.parse("2021-12-10 10:00:03", dateTimeFormatter)),
new Ticker(5, "Apple", 15, 2, LocalDateTime.parse("2021-12-10 10:00:04", dateTimeFormatter)),
new Ticker(6, "Apple", 14, 1, LocalDateTime.parse("2021-12-10 10:00:05", dateTimeFormatter)),
new Ticker(7, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:06", dateTimeFormatter)),
new Ticker(8, "Apple", 23, 2, LocalDateTime.parse("2021-12-10 10:00:07", dateTimeFormatter)),
new Ticker(9, "Apple", 22, 2, LocalDateTime.parse("2021-12-10 10:00:08", dateTimeFormatter)),
new Ticker(10, "Apple", 25, 2, LocalDateTime.parse("2021-12-10 10:00:09", dateTimeFormatter)),
new Ticker(11, "Apple", 11, 1, LocalDateTime.parse("2021-12-10 10:00:11", dateTimeFormatter)),
new Ticker(12, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:12", dateTimeFormatter)),
new Ticker(13, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:13", dateTimeFormatter)),
new Ticker(14, "Apple", 25, 1, LocalDateTime.parse("2021-12-10 10:00:14", dateTimeFormatter)),
new Ticker(15, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:15", dateTimeFormatter)),
new Ticker(16, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:16", dateTimeFormatter)),
new Ticker(17, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:17", dateTimeFormatter)),
new Ticker(18, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:18", dateTimeFormatter)));

Table table = tEnv.fromDataStream(dataStream, Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("symbol", DataTypes.STRING())
.column("price", DataTypes.BIGINT())
.column("tax", DataTypes.BIGINT())
.column("rowtime", DataTypes.TIMESTAMP(3))
.watermark("rowtime", "rowtime - INTERVAL '1' SECOND")
.build());
tEnv.createTemporaryView("CEP_SQL_2", table);

String sql = "SELECT * " +
"FROM CEP_SQL_2 " +
" MATCH_RECOGNIZE ( " +
" PARTITION BY symbol " + //按symbol分区,将相同卡号的数据分到同一个计算节点上。
" ORDER BY rowtime " + //在窗口内,对事件时间进行排序。
" MEASURES " + //定义如何根据匹配成功的输入事件构造输出事件
" e1.id as id,"+
" AVG(e1.price) as avgPrice,"+
" FIRST(e1.rowtime) AS e1_start_tstamp, " +
" LAST(e1.rowtime) AS e1_fast_tstamp, " +
" e2.rowtime AS end_tstamp " +
" ONE ROW PER MATCH " + //匹配成功输出一条
" AFTER MATCH skip past last row " +
" PATTERN (e1+ e2) WITHIN INTERVAL '2' MINUTE" +
" DEFINE " + //定义各事件的匹配条件
" e1 AS " +
" e1.price < 19 , " +
" e2 AS " +
" e2.price >= 19 " +
" ) MR";


TableResult res = tEnv.executeSql(sql);
res.print();
tEnv.dropTemporaryView("CEP_SQL_2");

} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}


public static class Ticker {
public long id;
public String symbol;
public long price;
public long tax;
public LocalDateTime rowtime;

public Ticker() {
}

public Ticker(long id, String symbol, long price, long item, LocalDateTime rowtime) {
this.id = id;
this.symbol = symbol;
this.price = price;
this.tax = tax;
this.rowtime = rowtime;
}
}
}

(5)Flink CEP SQL四种匹配模式效果演示

(3)skip to first pattern Item

package com.examples;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
* Created by lj on 2022-08-08.
*/
public class CEPSQLExampleAfterMatch {
private static final Logger LOG = LoggerFactory.getLogger(CEPSQLExampleAfterMatch.class);

public static void main(String[] args) {
EnvironmentSettings settings = null;
StreamTableEnvironment tEnv = null;
try {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
tEnv = StreamTableEnvironment.create(env, settings);
final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
DataStream<Ticker> dataStream =
env.fromElements(
new Ticker(2, "Apple", 11, 2, LocalDateTime.parse("2021-12-10 10:00:01", dateTimeFormatter)),
new Ticker(3, "Apple", 16, 2, LocalDateTime.parse("2021-12-10 10:00:02", dateTimeFormatter)),
new Ticker(4, "Apple", 13, 2, LocalDateTime.parse("2021-12-10 10:00:03", dateTimeFormatter)),
new Ticker(5, "Apple", 15, 2, LocalDateTime.parse("2021-12-10 10:00:04", dateTimeFormatter)),
new Ticker(6, "Apple", 14, 1, LocalDateTime.parse("2021-12-10 10:00:05", dateTimeFormatter)),
new Ticker(7, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:06", dateTimeFormatter)),
new Ticker(8, "Apple", 23, 2, LocalDateTime.parse("2021-12-10 10:00:07", dateTimeFormatter)),
new Ticker(9, "Apple", 22, 2, LocalDateTime.parse("2021-12-10 10:00:08", dateTimeFormatter)),
new Ticker(10, "Apple", 25, 2, LocalDateTime.parse("2021-12-10 10:00:09", dateTimeFormatter)),
new Ticker(11, "Apple", 11, 1, LocalDateTime.parse("2021-12-10 10:00:11", dateTimeFormatter)),
new Ticker(12, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:12", dateTimeFormatter)),
new Ticker(13, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:13", dateTimeFormatter)),
new Ticker(14, "Apple", 25, 1, LocalDateTime.parse("2021-12-10 10:00:14", dateTimeFormatter)),
new Ticker(15, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:15", dateTimeFormatter)),
new Ticker(16, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:16", dateTimeFormatter)),
new Ticker(17, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:17", dateTimeFormatter)),
new Ticker(18, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:18", dateTimeFormatter)));

Table table = tEnv.fromDataStream(dataStream, Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("symbol", DataTypes.STRING())
.column("price", DataTypes.BIGINT())
.column("tax", DataTypes.BIGINT())
.column("rowtime", DataTypes.TIMESTAMP(3))
.watermark("rowtime", "rowtime - INTERVAL '1' SECOND")
.build());
tEnv.createTemporaryView("CEP_SQL_3", table);

String sql = "SELECT * " +
"FROM CEP_SQL_3 " +
" MATCH_RECOGNIZE ( " +
" PARTITION BY symbol " + //按symbol分区,将相同卡号的数据分到同一个计算节点上。
" ORDER BY rowtime " + //在窗口内,对事件时间进行排序。
" MEASURES " + //定义如何根据匹配成功的输入事件构造输出事件
" FIRST(e1.id) as id,"+
" AVG(e1.price) as avgPrice,"+
" FIRST(e1.rowtime) AS e1_start_tstamp, " +
" LAST(e1.rowtime) AS e1_fast_tstamp, " +
" e2.rowtime AS end_tstamp " +
" ONE ROW PER MATCH " + //匹配成功输出一条
" AFTER MATCH skip to first e1 " +
" PATTERN (e0 e1+ e2) WITHIN INTERVAL '2' MINUTE" +
" DEFINE " + //定义各事件的匹配条件
" e1 AS " +
" e1.price < 19 , " +
" e2 AS " +
" e2.price >= 19 " +
" ) MR";


TableResult res = tEnv.executeSql(sql);
res.print();
tEnv.dropTemporaryView("CEP_SQL_3");

} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}


public static class Ticker {
public long id;
public String symbol;
public long price;
public long tax;
public LocalDateTime rowtime;

public Ticker() {
}

public Ticker(long id, String symbol, long price, long item, LocalDateTime rowtime) {
this.id = id;
this.symbol = symbol;
this.price = price;
this.tax = tax;
this.rowtime = rowtime;
}
}
}

(5)Flink CEP SQL四种匹配模式效果演示

(4)skip to last pattern Item

package com.examples;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
* Created by lj on 2022-08-08.
*/
public class CEPSQLExampleAfterMatch {
private static final Logger LOG = LoggerFactory.getLogger(CEPSQLExampleAfterMatch.class);

public static void main(String[] args) {
EnvironmentSettings settings = null;
StreamTableEnvironment tEnv = null;
try {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
tEnv = StreamTableEnvironment.create(env, settings);
final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
DataStream<Ticker> dataStream =
env.fromElements(
new Ticker(2, "Apple", 11, 2, LocalDateTime.parse("2021-12-10 10:00:01", dateTimeFormatter)),
new Ticker(3, "Apple", 16, 2, LocalDateTime.parse("2021-12-10 10:00:02", dateTimeFormatter)),
new Ticker(4, "Apple", 13, 2, LocalDateTime.parse("2021-12-10 10:00:03", dateTimeFormatter)),
new Ticker(5, "Apple", 15, 2, LocalDateTime.parse("2021-12-10 10:00:04", dateTimeFormatter)),
new Ticker(6, "Apple", 14, 1, LocalDateTime.parse("2021-12-10 10:00:05", dateTimeFormatter)),
new Ticker(7, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:06", dateTimeFormatter)),
new Ticker(8, "Apple", 23, 2, LocalDateTime.parse("2021-12-10 10:00:07", dateTimeFormatter)),
new Ticker(9, "Apple", 22, 2, LocalDateTime.parse("2021-12-10 10:00:08", dateTimeFormatter)),
new Ticker(10, "Apple", 25, 2, LocalDateTime.parse("2021-12-10 10:00:09", dateTimeFormatter)),
new Ticker(11, "Apple", 11, 1, LocalDateTime.parse("2021-12-10 10:00:11", dateTimeFormatter)),
new Ticker(12, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:12", dateTimeFormatter)),
new Ticker(13, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:13", dateTimeFormatter)),
new Ticker(14, "Apple", 25, 1, LocalDateTime.parse("2021-12-10 10:00:14", dateTimeFormatter)),
new Ticker(15, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:15", dateTimeFormatter)),
new Ticker(16, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:16", dateTimeFormatter)),
new Ticker(17, "Apple", 19, 1, LocalDateTime.parse("2021-12-10 10:00:17", dateTimeFormatter)),
new Ticker(18, "Apple", 15, 1, LocalDateTime.parse("2021-12-10 10:00:18", dateTimeFormatter)));

Table table = tEnv.fromDataStream(dataStream, Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("symbol", DataTypes.STRING())
.column("price", DataTypes.BIGINT())
.column("tax", DataTypes.BIGINT())
.column("rowtime", DataTypes.TIMESTAMP(3))
.watermark("rowtime", "rowtime - INTERVAL '1' SECOND")
.build());
tEnv.createTemporaryView("CEP_SQL_4", table);

String sql = "SELECT * " +
"FROM CEP_SQL_4 " +
" MATCH_RECOGNIZE ( " +
" PARTITION BY symbol " + //按symbol分区,将相同卡号的数据分到同一个计算节点上。
" ORDER BY rowtime " + //在窗口内,对事件时间进行排序。
" MEASURES " + //定义如何根据匹配成功的输入事件构造输出事件
" e1.id as id,"+
" AVG(e1.price) as avgPrice,"+
" FIRST(e1.rowtime) AS e1_start_tstamp, " +
" LAST(e1.rowtime) AS e1_fast_tstamp, " +
" e2.rowtime AS end_tstamp " +
" ONE ROW PER MATCH " + //匹配成功输出一条
" AFTER MATCH skip to last e1 " +
" PATTERN (e0 e1+ e2) WITHIN INTERVAL '2' MINUTE" +
" DEFINE " + //定义各事件的匹配条件
" e1 AS " +
" e1.price < 19 , " +
" e2 AS " +
" e2.price >= 19 " +
" ) MR";


TableResult res = tEnv.executeSql(sql);
res.print();
tEnv.dropTemporaryView("CEP_SQL_4");

} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}


public static class Ticker {
public long id;
public String symbol;
public long price;
public long tax;
public LocalDateTime rowtime;

public Ticker() {
}

public Ticker(long id, String symbol, long price, long item, LocalDateTime rowtime) {
this.id = id;
this.symbol = symbol;
this.price = price;
this.tax = tax;
this.rowtime = rowtime;
}
}
}

(5)Flink CEP SQL四种匹配模式效果演示

Original: https://blog.51cto.com/u_14465598/5615923
Author: wx5d37d5fd4aa62
Title: (5)Flink CEP SQL四种匹配模式效果演示

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/508209/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球