Flink Table Api & SQL 初体验,Blink的使用

Flink Table Api & SQL 初体验,Blink的使用

概述

  • Flink具有Table API和SQL-用于统一流和批处理。
  • Table API是用于Scala和Java的语言集成查询API,它允许以非常直观的方式组合来自关系运算符(例如选择,过滤和联接)的查询。
  • Flink的SQL支持基于实现SQL标准的Apache Calcite。无论输入是批处理输入(DataSet)还是流输入(DataStream),在两个接口中指定的查询都具有相同的语义并指定相同的结果。

Table API和SQL尚未完成所有功能,正在积极开发中,支持程度需查看官方文档

使用

多表连接案例

pom依赖

flink 版本为:1.9.3


            org.apache.flink
            flink-java
            ${flink.version}
            provided

            org.apache.flink
            flink-streaming-java_${scala.binary.version}
            ${flink.version}

            org.apache.flink
            flink-table-api-java-bridge_2.11
            ${flink.version}

            org.apache.flink
            flink-table-planner-blink_2.11
            ${flink.version}

            org.apache.flink
            flink-table-api-java
            ${flink.version}

模拟一个实时流

import lombok.Data;
@Data
public class Product {
    public Integer id;
    public String seasonType;
}

自定义Source

import common.Product;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.ArrayList;
import java.util.Random;

public class ProductStremingSource implements SourceFunction {
    private boolean isRunning = true;

    @Override
    public void run(SourceContext ctx) throws Exception {
        while (isRunning){
            // 每一秒钟产生一条数据
            Product product = generateProduct();
            ctx.collect(product);
            Thread.sleep(1000);
        }
    }

    private Product generateProduct(){
        int i = new Random().nextInt(100);
        ArrayList list = new ArrayList();
        list.add("spring");
        list.add("summer");
        list.add("autumn");
        list.add("winter");
        Product product = new Product();
        product.setSeasonType(list.get(new Random().nextInt(4)));
        product.setId(i);
        return product;
    }
    @Override
    public void cancel() {

    }
}

主程序

public class TableStremingDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        // 使用Blink
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

        SingleOutputStreamOperator source = bsEnv.addSource(new MyStremingSource())
                .map(new MapFunction() {
                    @Override
                    public Item map(Item value) throws Exception {
                        return value;
                    }
                });
        // 分割流
        final OutputTag even = new OutputTag("even") {
        };
        final OutputTag old = new OutputTag("old") {
        };

        SingleOutputStreamOperator sideOutputData = source.process(new ProcessFunction() {
            @Override
            public void processElement(Item value, Context ctx, Collector out) throws Exception {
                if (value.getId() % 2 == 0) {
                    ctx.output(even,value);
                }else{
                    ctx.output(old,value);
                }
            }
        });

        DataStream evenStream = sideOutputData.getSideOutput(even);
        DataStream oldStream = sideOutputData.getSideOutput(old);
        // 注册两个 表 : evenTable,oddTable
        bsTableEnv.registerDataStream("evenTable",evenStream , "name,id");
        bsTableEnv.registerDataStream("oddTable", oldStream, "name,id");

        // 执行sql 输出Table
        Table queryTable = bsTableEnv.sqlQuery("select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name = b.name");
        queryTable.printSchema();;
        // 获取流
        DataStream>> dataStream = bsTableEnv.toRetractStream(queryTable, TypeInformation.of(new TypeHint>(){}));
        dataStream.print();

        bsEnv.execute("demo");
    }
}

结果打印

Flink Table Api & SQL 初体验,Blink的使用
输出name相同的元素。

总结

简单的介绍了Flink Table Api & SQL和实现了两表连接的示例。

更多文章:www.ipooli.com

扫码关注公众号《ipoo》

Flink Table Api & SQL 初体验,Blink的使用

Original: https://www.cnblogs.com/ipoo/p/13168165.html
Author: ipoo
Title: Flink Table Api & SQL 初体验,Blink的使用

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/582206/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • Java List和Map遍历的方法,forEach()的使用

    注意: 不要在foreach循环里进行元素的remove/add操作。remove元素请使用Iterator方式,如果并发操作,需要对Iterator对象加锁。 Java 8之前 …

    Linux 2023年6月7日
    0110
  • Django_渲染详解

    Django_render 模板语法 模板引擎是一种可以让开发者把服务端数据填充到html网页中完成渲染效果的技术。它实现了把前端代码和服务端代码分离的作用,让项目中的业务逻辑代码…

    Linux 2023年6月7日
    0138
  • BootstrapTreeView 实现懒加载和点击事件。

    BootstrapTreeView的js下载位置:https://github.com/patternfly/patternfly-bootstrap-treeview。(注意不是…

    Linux 2023年6月7日
    0113
  • 详细记录一次stampstime字段引起pxc集群脑裂

    事故回顾 运维执行导入sql,导入后收到master2和master3节点宕机的报警;检查集群状态发现master1进入初始化模式,无法读写;master2和master3已经下线…

    Linux 2023年6月14日
    086
  • ArchLinux安装-2022-01-12

    这篇教程,是我基于B站up住theCW的视频教程整理的,其中添加了一些我在安装n次之后的经验(虽然失败过几次,但我现在安装不会再出差错,所以请放心的看此教程) 当然,我认为theC…

    Linux 2023年6月13日
    0103
  • grafana+prometheus如何查看tcp连接数量

    最后解决方案 经过和负责监控的大佬了解,获得了一个可行的方案:在每个pod中新增一个sidecar容器,在容器中部署node_exporter,或者在容器中放个自动查看端口连接数并…

    Linux 2023年6月13日
    0110
  • window.parent、window.top、window.self

    在应用有frameset或者iframe的页面时,parent是父窗口,top是最顶级父窗口(有的窗口中套了好几层frameset或者iframe),self是当前窗口。 1.wi…

    Linux 2023年6月7日
    085
  • CentOS/Redflag 7.3安装qemu 5.0记录

    安装实际上相当简单,只需下载源代码并编译即可。 [En] Installation is actually quite simple, just download the sour…

    Linux 2023年5月27日
    0114
  • 【4】2022年6月

    6月15日 今早10点教师资格面试成绩出来了,幸好一次性通过。笔面都一次性通过,比我预想的顺利,谢谢各位考官的照顾。 我知道自己面试的时候既感冒又受伤,状态是不好的,发挥不了真实的…

    Linux 2023年6月13日
    081
  • Docker安装使用及私有仓库搭建

    1 概念 1.1 基本概念 Docker daemon​ 守护进程,运行在宿主机上,用户通过DockerClient客户端Docker命令与Docker daemon交互。Dock…

    Linux 2023年5月27日
    098
  • Django补充

    django配置文件相关操作 django实际上有两个配置文件 一个是提供给用户可以自定义的基本配置 from 项目名 import settings 一个是全局的系统默认的配置 …

    Linux 2023年6月7日
    0111
  • IIC挂死问题解决过程

    猜测1:认为IIC device程序有问题 检查1:查看程序发现有可能溢出的部分,使用IIC 工具刷过量数据到slave,未出问题。 猜测2:认为IIC device寄存器进入异常…

    Linux 2023年6月6日
    0123
  • Mysql数据库服务端的安装

    一般提到Mysql数据库的安装在工作当中是说的安装数据库管理软件的服务端,服务端的安装可以安装在Windows环境,也可以安装在Linux环境。 Windows环境安装:目前安装比…

    Linux 2023年6月14日
    081
  • 任意精度计算器 bc (arbitrary precision calculator)

    2019/06/18 bc 学习之 https://www.runoob.com/linux/linux-comm-bc.html Original: https://www.cn…

    Linux 2023年6月6日
    0131
  • docker安装redis

    首先考虑需要安装的redis版本,我这里是安装的redis 6.0.16,如果宿主机没有,那么就docker pull redis:6.0.16 一、指定redis配置文件 我的宿…

    Linux 2023年5月28日
    0103
  • 在Linux下的文件IO操作

    系统调用 为什么用户程序不能直接访问系统内核提供的服务,为了更好地保护内核空间,程序的运行空间被划分为内核空间和用户空间(俗称内核状态和用户模式),它们在不同的级别上逻辑上是相互隔…

    Linux 2023年5月27日
    091
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球