@[toc]
第 11 章 Table API 和 SQL


11.1 快速上手
如果我们对关系型数据库和 SQL 非常熟悉,那么 Table API 和 SQL 的使用其实非常简单:
只要得到一个”表”(Table),然后对它调用 Table API,或者直接写 SQL 就可以了。接下来我
们就以一个非常简单的例子上手,初步了解一下这种高层级 API 的使用方法。
11.1.1 需要引入的依赖
我们想要在代码中使用 Table API,必须引入相关的依赖。
org.apache.flink
flink-table-api-java-bridge_${scala.binary.version}
${flink.version}
这里的依赖是一个 Java 的”桥接器”(bridge),主要就是负责 Table API 和下层 DataStream
API 的连接支持,按照不同的语言分为 Java 版和 Scala 版。
如果我们希望在本地的集成开发环境(IDE)里运行 Table API 和 SQL,还需要引入以下依赖:
org.apache.flink
flink-table-planner-blink_${scala.binary.version}
${flink.version}
org.apache.flink
flink-streaming-scala_${scala.binary.version}
${flink.version}
这里主要添加的依赖是一个”计划器”(planner),它是 Table API 的核心组件,负责提供运行时环境,并生成程序的执行计划。这里我们用到的是新版的 blink planner。由于 Flink 安装包的 lib 目录下会自带 planner,所以在生产集群环境中提交的作业不需要打包这个依赖。而在 Table API 的内部实现上,部分相关的代码是用 Scala 实现的,所以还需要额外添加一个 Scala 版流处理的相关依赖。另外,如果想实现自定义的数据格式来做序列化,可以引入下面的依赖:
11.1.2 一个简单示例

package com.scy.chapter11;
import com.scy.chapter01.ClickSource;
import com.scy.chapter01.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.time.Duration;
import static org.apache.flink.table.api.Expressions.$;
public class SimpleTableExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1.读取数据得到DataStream
SingleOutputStreamOperator eventStream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner() {
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}));
//2.创建一个表执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//3.将DataStream转换成Table
Table eventTable = tableEnv.fromDataStream(eventStream);
//4.直接写 SQL进行转换
Table resultTable1 = tableEnv.sqlQuery("select user, url, timestamp
from " + eventTable); //关键字加
号
//5.基于Table直接转换
Table resultTable2 = eventTable.select($("user"), $("url"))
.where($("user").isEqual("mary"));
tableEnv.toDataStream(resultTable1).print("result1");
tableEnv.toDataStream(resultTable2).print("result2");
env.execute();
}
}

11.2 基本 API
通过上节中的简单示例,我们已经对 Table API 和 SQL 的用法有了大致的了解;本节就继
续展开,对 API 的相关用法做一个详细的说明。
11.2.1 程序架构


11.2.2 创建表环境

package com.scy.chapter11;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class CommonApiTest {
public static void main(String[] args) {
//0.这里我们引入了一个"流式表环境"(StreamTableEnvironment),它是继承自TableEnvironment 的子接口。调用它的 create()方法,只需要直接将当前的流执行环境
//(StreamExecutionEnvironment)传入,就可以创建出对应的流式表环境了。这也正是我们在上一节简单示例中使用的方式。
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//env.setParallelism(1);
//StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//与1.0等效
//1.定义环境配置来创建表执行环境
//1.0基于blink版本planner进行流处理
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
//1.1基于老版本planner进行流处理 .useOldPlanner()
//1.2基于老版本planner进行批处理
//ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
//BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(batchEnv);
//1.3基于blink版本planner进行批处理
EnvironmentSettings settings3 = EnvironmentSettings.newInstance()
.inBatchMode()
.useBlinkPlanner()
.build();
TableEnvironment tableEnv3 = TableEnvironment.create(settings3);
}
}
11.2.3 创建表

1. 连接器表(Connector Tables)


EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
//2.创建表
String createDDL = "CREATE TABLE clickTable (" +
" user_name STRING," +
" url STRING," +
" ts BIGINT " +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'input/clicks.txt'" +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(createDDL);
//创建用于输出的表
String createOutDDL = "CREATE TABLE outTable (" +
" url STRING," +
" user_name STRING," +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'output'" +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(createOutDDL);
tableEnv.useCatalog("custom_catalog");
tableEnv.useDatabase("custom_database");
2. 虚拟表(Virtual Tables)

11.2.4 表的查询
创建好了表,接下来自然就是对表进行查询转换了。对一个表的查询(Query)操作,就对应着流数据的转换(Transform)处理。
Flink 为我们提供了两种查询方式:SQL 和 Table API。
第 12 章 Flink CEP
Original: https://blog.51cto.com/u_12633461/5643880
Author: bigbangsheldon
Title: Flink1.13-java版教程(扩展)
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/54296/
转载文章受原作者版权保护。转载请注明原作者出处!