Flink1.13-java版教程(扩展)

第 11 章 Table API 和 SQL

Flink1.13-java版教程(扩展)

11.1 快速上手

如果我们对关系型数据库和 SQL 非常熟悉,那么 Table API 和 SQL 的使用其实非常简单: 只要得到一个”表”(Table),然后对它调用 Table API,或者直接写 SQL 就可以了。接下来我 们就以一个非常简单的例子上手,初步了解一下这种高层级 API 的使用方法。

11.1.1 需要引入的依赖

我们想要在代码中使用 Table API,必须引入相关的依赖。


    org.apache.flink
    flink-table-api-java-bridge_${scala.binary.version}
    ${flink.version}

这里的依赖是一个 Java 的”桥接器”(bridge),主要就是负责 Table API 和下层 DataStream API 的连接支持,按照不同的语言分为 Java 版和 Scala 版。

如果我们希望在本地的集成开发环境(IDE)里运行 Table API 和 SQL,还需要引入以下依赖:


    org.apache.flink
    flink-table-planner-blink_${scala.binary.version}
    ${flink.version}

    org.apache.flink
    flink-streaming-scala_${scala.binary.version}
    ${flink.version}

这里主要添加的依赖是一个”计划器”(planner),它是 Table API 的核心组件,负责提供运行时环境,并生成程序的执行计划。这里我们用到的是新版的 blink planner。由于 Flink 安装包的 lib 目录下会自带 planner,所以在生产集群环境中提交的作业不需要打包这个依赖。而在 Table API 的内部实现上,部分相关的代码是用 Scala 实现的,所以还需要额外添加一个 Scala 版流处理的相关依赖。另外,如果想实现自定义的数据格式来做序列化,可以引入下面的依赖:

11.1.2 一个简单示例

Flink1.13-java版教程(扩展)
package com.scy.chapter11;

import com.scy.chapter01.ClickSource;
import com.scy.chapter01.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

public class SimpleTableExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //1.读取数据得到DataStream
        SingleOutputStreamOperator eventStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner() {
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        }));

        //2.创建一个表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //3.将DataStream转换成Table
        Table eventTable = tableEnv.fromDataStream(eventStream);

        //4.直接写 SQL进行转换
        Table resultTable1 = tableEnv.sqlQuery("select user, url, timestamp from " + eventTable); //关键字加号

        //5.基于Table直接转换
        Table resultTable2 = eventTable.select($("user"), $("url"))
                .where($("user").isEqual("mary"));

        tableEnv.toDataStream(resultTable1).print("result1");
        tableEnv.toDataStream(resultTable2).print("result2");

        env.execute();
    }
}

Flink1.13-java版教程(扩展)

11.2 基本 API

通过上节中的简单示例,我们已经对 Table API 和 SQL 的用法有了大致的了解;本节就继 续展开,对 API 的相关用法做一个详细的说明。

11.2.1 程序架构

Flink1.13-java版教程(扩展)

11.2.2 创建表环境

Flink1.13-java版教程(扩展)
package com.scy.chapter11;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class CommonApiTest {
    public static void main(String[] args) {
        //0.这里我们引入了一个"流式表环境"(StreamTableEnvironment),它是继承自TableEnvironment 的子接口。调用它的 create()方法,只需要直接将当前的流执行环境
        //(StreamExecutionEnvironment)传入,就可以创建出对应的流式表环境了。这也正是我们在上一节简单示例中使用的方式。
        //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setParallelism(1);
        //StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //与1.0等效

        //1.定义环境配置来创建表执行环境
        //1.0基于blink版本planner进行流处理
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .useBlinkPlanner()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        //1.1基于老版本planner进行流处理 .useOldPlanner()

        //1.2基于老版本planner进行批处理
        //ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
        //BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(batchEnv);

        //1.3基于blink版本planner进行批处理
        EnvironmentSettings settings3 = EnvironmentSettings.newInstance()
                .inBatchMode()
                .useBlinkPlanner()
                .build();
        TableEnvironment tableEnv3 = TableEnvironment.create(settings3);
    }
}

11.2.3 创建表

Flink1.13-java版教程(扩展)

1. 连接器表(Connector Tables)

Flink1.13-java版教程(扩展)
EnvironmentSettings settings = EnvironmentSettings.newInstance()
        .inStreamingMode()
        .useBlinkPlanner()
        .build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

//2.创建表
String createDDL = "CREATE TABLE clickTable (" +
        " user_name STRING," +
        " url STRING," +
        " ts BIGINT " +
        ") WITH (" +
        " 'connector' = 'filesystem'," +
        " 'path' = 'input/clicks.txt'" +
        " 'format' = 'csv'" +
        ")";

tableEnv.executeSql(createDDL);

//创建用于输出的表
String createOutDDL = "CREATE TABLE outTable (" +
        " url STRING," +
        " user_name STRING," +
        ") WITH (" +
        " 'connector' = 'filesystem'," +
        " 'path' = 'output'" +
        " 'format' = 'csv'" +
        ")";
tableEnv.executeSql(createOutDDL);

tableEnv.useCatalog("custom_catalog");
tableEnv.useDatabase("custom_database");

2. 虚拟表(Virtual Tables)

Flink1.13-java版教程(扩展)

11.2.4 表的查询

创建好了表,接下来自然就是对表进行查询转换了。对一个表的查询(Query)操作,就对应着流数据的转换(Transform)处理。 Flink 为我们提供了两种查询方式:SQL 和 Table API。

第 12 章 Flink CEP

Original: https://blog.51cto.com/u_12633461/5643880
Author: bigbangsheldon
Title: Flink1.13-java版教程(扩展)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/508517/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球