FlinkSql之TableAPI详解

Flink 的 Table APISQL是流批统一的 API。 这意味着 Table API & SQL 在无论有限的批式输入还是无限的流式输入下,都具有相同的语义。 因为传统的关系代数以及 SQL 最开始都是为了批式处理而设计的, 关系型查询在流式场景下不如在批式场景下容易理解.

动态表(Dynamic Tables)是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。

与表示批处理数据的静态表不同,动态表是 随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个 连续查询(Continuous Query)。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。

TableAPI

首先需要导入依赖

 <dependency>
     <groupId>org.apache.flinkgroupId>
     <artifactId>flink-table-planner-blink_${scala.binary.version}artifactId>
     <version>${flink.version}version>
     <scope>providedscope>
 dependency>
 <dependency>
     <groupId>org.apache.flinkgroupId>
     <artifactId>flink-streaming-scala_${scala.binary.version}artifactId>
     <version>${flink.version}version>
     <scope>providedscope>
 dependency>
 <dependency>
     <groupId>org.apache.flinkgroupId>
     <artifactId>flink-csvartifactId>
     <version>${flink.version}version>
 dependency>
 <dependency>
     <groupId>org.apache.flinkgroupId>
     <artifactId>flink-jsonartifactId>
     <version>${flink.version}version>
 dependency>
 
 

这里需要注意的问题:

1.TableAPI 中将动态表转换为流时有两种方法

 DataStream<Row> rowDataStream = tableEnvironment.toAppendStream(result, Row.class);

toAppendStream方法只能在查询时使用,不能使用包含聚合函数等更新语句

 DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnvironment.toRetractStream(select, Row.class);

toRetractStream则可以使用

2.上述两种方法内传入的参数Row.class,表示将表中查询出的数据封装为行类型,也就是对每行进行封装,解决查询出的数据列少于或者多于原表。如何能够确保所查询的数据与之前封装的Bean有完全一致的结构则也可以封装为原Bean.class

代码实现:

package net.cyan.FlinkSql; ​ import net.cyan.POJO.WaterSensor; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; ​ import static org.apache.flink.table.api.Expressions.$; ​

使用TableAPI读取文件时,我们首先需要知道去哪里读取也就是文件路径、读取文件的格式、读取出来的数据的结构也就是结果表的表结构及表名

package net.cyan.FlinkSql; ​ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ​ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Csv; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.table.types.DataType; ​ import static org.apache.flink.table.api.Expressions.$; ​ public class Demo2_readWriteText {     public static void main(String[] args) {

基本流程

1>需要创建表的运行环境

2>创建查询出的数据写出结构

3> 创建kafka连接

4> 进行查询

5> 创建写入kafka连接

6> 写入

完整代码如下

package net.cyan.FlinkSql; ​ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Json; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Schema; ​ public class Demo5_readWriteKafka {     public static void main(String[] args) {

Original: https://www.cnblogs.com/CYan521/p/16845742.html
Author: 再美不及姑娘你
Title: FlinkSql之TableAPI详解

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/713564/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 20212218林思凡实验三《Python程序设计》实验报告

    20212218 2021-2022-2 《Python程序设计》实验三报告 课程:《Python程序设计》 班级: 2122 姓名: 林思凡 学号:20212218 实验教师:王…

    技术杂谈 2023年7月24日
    071
  • 周热点回顾(6.13-6.19)

    热点随笔: · React Suspense 尝鲜,处理前后端IO异步操作 (葡萄城技术团队)· 2 万字 + 20张图| 细说 Redis 九种数据类型和应用场景 (小林codi…

    技术杂谈 2023年5月31日
    092
  • Open Physics

    1、开放物理计划。 开放物理计划,英文Open Physics。是AMD公司为自己的3A平台打造的物理模拟计算平台,以OpenCL为基础,由CPU+GPU联合计算完成。所谓&#82…

    技术杂谈 2023年5月31日
    0100
  • 关于维度信息维护和字典表的一些看法

    在不同的公司的不同项目场景下,绝大多数情况下都需要维护一些基本的维度信息(也称为字典信息,下面全部使用维度信息代替描述),比如旅游相关的网站,可能会维护: 货币类型:美元,人民币,…

    技术杂谈 2023年6月1日
    084
  • FileReader error: The object is already busy reading Blobs

    解决方案: reader读取时候,先判断一下readyState值,如果不是loading的话再执行读取。 readyState&#xA0; FileReader <…

    技术杂谈 2023年6月1日
    073
  • Visual Studio for Mac 中的默认键盘快捷键

    可以通过使用相应的键盘快捷键来访问 Visual Studio for Mac 中的各种命令。 本文档列出了 Visual Studio for Mac 键绑定方案的默认快捷键。 …

    技术杂谈 2023年5月31日
    085
  • Django中六个常用的自定义装饰器

    装饰器作用 decorator是当今最流行的设计模式之一,很多使用它的人并不知道它是一种设计模式。这种模式有什么特别之处? 有兴趣可以看看Python Wiki上例子,使用它可以很…

    技术杂谈 2023年7月10日
    068
  • 使用matlab进行机械学习(1)

    机械学习可粗略分为两大类:监督学习(数据有标签)和无监督学习(无标签)。监督学习包括线性回归,逻辑回归,神经网络,SVM等。无监督学习包括聚类算法以及降维算法等(提取目标特征)。这…

    技术杂谈 2023年7月11日
    050
  • Django的ModelAdmin自带

    需要自定义数据表中哪些字段可以显示,哪些字段可以编辑,并对数据表中的条目进行排序,同时定义过滤选项。Django的ModelAdmin自带的list_display, list_f…

    技术杂谈 2023年7月10日
    072
  • i++和++i

    ++ 是 自增运算符 不给变量赋值 最后 i 的值都是一样的 给变量赋值 i++先赋值 后自增 ++i 先自增 后赋值 不能理解请 一条++操作配合一条输出语句 其他6条注释掉 执…

    技术杂谈 2023年6月21日
    069
  • Podman基础用法

    Podman基础 1、什么是Podman? Podman是一种开源的Linux原生工具,旨在根据开放容器倡议(Open Container Initiative,OCI)标准开发、…

    技术杂谈 2023年6月21日
    0106
  • Javaweb学习-JSP

    ; ; 从JSP开始学习创建web项目 posted @2022-03-24 21:21 HelloHui 阅读(9 ) 评论() 编辑 Original: https://www…

    技术杂谈 2023年6月21日
    0131
  • idea 的Low Memory问题

    今天使用 idea 出现 Low MemoryThe IDE is running low on memory and this might affect performance….

    技术杂谈 2023年5月31日
    0126
  • 秒杀项目总结

    seckill-demo 秒杀练习项目:主要目的是通过本项目练习SpingBoot学习过程中所学知识,逐渐将学习到的技能融合至项目中. github地址: seckill-demo…

    技术杂谈 2023年7月25日
    088
  • Merge into的使用详解-你Merge了没有

    Merge是一个非常有用的功能,类似于Mysql里的insert into on duplicate key. Oracle在9i引入了merge命令,通过这个merge你能够在一…

    技术杂谈 2023年6月1日
    092
  • go源码解读

    https://www.cnblogs.com/ricklz/category/1217225.html?page=1 Original: https://www.cnblogs….

    技术杂谈 2023年5月30日
    088
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球