Flink Table Api & SQL 初体验,Blink的使用

Flink Table Api & SQL 初体验,Blink的使用

概述

  • Flink具有Table API和SQL-用于统一流和批处理。
  • Table API是用于Scala和Java的语言集成查询API,它允许以非常直观的方式组合来自关系运算符(例如选择,过滤和联接)的查询。
  • Flink的SQL支持基于实现SQL标准的Apache Calcite。无论输入是批处理输入(DataSet)还是流输入(DataStream),在两个接口中指定的查询都具有相同的语义并指定相同的结果。

Table API和SQL尚未完成所有功能,正在积极开发中,支持程度需查看官方文档

使用

多表连接案例

pom依赖

flink 版本为:1.9.3


            org.apache.flink
            flink-java
            ${flink.version}
            provided

            org.apache.flink
            flink-streaming-java_${scala.binary.version}
            ${flink.version}

            org.apache.flink
            flink-table-api-java-bridge_2.11
            ${flink.version}

            org.apache.flink
            flink-table-planner-blink_2.11
            ${flink.version}

            org.apache.flink
            flink-table-api-java
            ${flink.version}

模拟一个实时流

import lombok.Data;
@Data
public class Product {
    public Integer id;
    public String seasonType;
}

自定义Source

import common.Product;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.ArrayList;
import java.util.Random;

public class ProductStremingSource implements SourceFunction {
    private boolean isRunning = true;

    @Override
    public void run(SourceContext ctx) throws Exception {
        while (isRunning){
            // 每一秒钟产生一条数据
            Product product = generateProduct();
            ctx.collect(product);
            Thread.sleep(1000);
        }
    }

    private Product generateProduct(){
        int i = new Random().nextInt(100);
        ArrayList list = new ArrayList();
        list.add("spring");
        list.add("summer");
        list.add("autumn");
        list.add("winter");
        Product product = new Product();
        product.setSeasonType(list.get(new Random().nextInt(4)));
        product.setId(i);
        return product;
    }
    @Override
    public void cancel() {

    }
}

主程序

public class TableStremingDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        // 使用Blink
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

        SingleOutputStreamOperator source = bsEnv.addSource(new MyStremingSource())
                .map(new MapFunction() {
                    @Override
                    public Item map(Item value) throws Exception {
                        return value;
                    }
                });
        // 分割流
        final OutputTag even = new OutputTag("even") {
        };
        final OutputTag old = new OutputTag("old") {
        };

        SingleOutputStreamOperator sideOutputData = source.process(new ProcessFunction() {
            @Override
            public void processElement(Item value, Context ctx, Collector out) throws Exception {
                if (value.getId() % 2 == 0) {
                    ctx.output(even,value);
                }else{
                    ctx.output(old,value);
                }
            }
        });

        DataStream evenStream = sideOutputData.getSideOutput(even);
        DataStream oldStream = sideOutputData.getSideOutput(old);
        // 注册两个 表 : evenTable,oddTable
        bsTableEnv.registerDataStream("evenTable",evenStream , "name,id");
        bsTableEnv.registerDataStream("oddTable", oldStream, "name,id");

        // 执行sql 输出Table
        Table queryTable = bsTableEnv.sqlQuery("select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name = b.name");
        queryTable.printSchema();;
        // 获取流
        DataStream>> dataStream = bsTableEnv.toRetractStream(queryTable, TypeInformation.of(new TypeHint>(){}));
        dataStream.print();

        bsEnv.execute("demo");
    }
}

结果打印

Flink Table Api & SQL 初体验,Blink的使用
输出name相同的元素。

总结

简单的介绍了Flink Table Api & SQL和实现了两表连接的示例。

更多文章:www.ipooli.com

扫码关注公众号《ipoo》

Flink Table Api & SQL 初体验,Blink的使用

Original: https://www.cnblogs.com/ipoo/p/13168165.html
Author: ipoo
Title: Flink Table Api & SQL 初体验,Blink的使用

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/582206/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • Ubuntu常用命令

    Ubuntu(18.04)下更改用户名和主机名 更改主机名字: (1)修改hostname文件 这个文件中的内容是用来显示主机名的,修改这个文件后,立刻重启 (2)修改hosts文…

    Linux 2023年6月13日
    080
  • Canal实时解析mysql binlog数据实战

    一、说明 通过canal实时监听mysql binlog日志文件的变化,并将数据解析出来 二、环境准备 1、创建maven项目并修改pom.xml配置文件 com.alibaba….

    Linux 2023年6月13日
    070
  • 关于博客的解读

    写在前面 大家好,这里是满满! 最近也是想来写博客,我们学习过后的知识一旦过一段时间后,自然会有一些只是会遗忘,这时候大家都会去翻看以前的视频 ,笔记,遇到难一点的问题,可能就会去…

    Linux 2023年6月14日
    0107
  • 小记:音频格式转化ByPython(上)

    近日新买了个耳机,店家附送了一些周董的无损音乐资源,收到货后迫不及待的下载试听,才发现这些资源是wav格式的,导入播放器后歌名、作者、专辑等全是未知,当时想着是不是店家的资源有问题…

    Linux 2023年6月8日
    0103
  • OpenStack 发放云主机

    登陆网址 具体安装步骤欢迎参照我的博客:https://www.cnblogs.com/kongshuo/p/16618008.html用户名 admin 密码 redhat 创建…

    Linux 2023年6月8日
    0112
  • Linux at命令详解

    大家好,我是良许。 生活中,我们有太多场景需要使用到闹钟,比如早上 7 点起床,下午 4 点开会,晚上 8 点购物,等等。 在 Linux 系统里,我们同样也有类似的需求。比如我们…

    Linux 2023年6月14日
    067
  • Java 内功修炼 之 数据结构与算法(一)

    (1)双向链表通过上面单链表相关操作,可以知道 单链表的 查找方向唯一。而双向链表在 单链表的 基础上在 添加一个指针域(pre),这个指针域用来指向 当前节点的上一个节点,从而实…

    Linux 2023年6月11日
    0102
  • 基于CentOS系统安装OceanBase数据库

    一、OceanBase介绍 OceanBase是由蚂蚁集团完全自主研发的金融级分布式关系数据库,始创于2010年。OceanBase具有数据强一致、高可用、高性能、在线扩展、高度兼…

    Linux 2023年5月27日
    078
  • redisobject详解

    typedef struct redisObject { unsigned type:4; unsigned encoding:4; unsigned lru:REDIS_LRU_…

    Linux 2023年5月28日
    0100
  • Redis学习手册(Sorted-Sets数据类型)

    一、概述: Sorted-Sets和Sets类型极为相似,它们都是字符串的集合,都不允许重复的成员出现在一个Set中。它们之间的主要差别是Sorted-Sets中的每一个成员都会有…

    Linux 2023年5月28日
    076
  • CTF中的一些图形密码

    1.传统猪圈密码 猪圈密码又称为亦称朱高密码、共济会暗号、共济会密码或共济会员密码;是一种以特定符号来替换字母的加密方式在线解密网址:http://moersima.00cha.n…

    Linux 2023年6月6日
    0114
  • NRF24L01双向无线通信

    最近闲来无事,利用手头资源研究了一下基于nrf24L01的双向通信实验,整个系统如下图所示。原理:nrf24L01本身是一种单向通信的无线模块,但是,当nrf24L01工作在增强型…

    Linux 2023年6月14日
    085
  • 20191223 实验一 密码引擎

    任务一 OpenEuler系统安装 1.登录自己的华为云账号,参考附件图示,构建基于鲲鹏和OpenEuler的ECS。或者通过使用树莓派安装OpenEuler,或者自己通过虚拟机安…

    Linux 2023年6月8日
    095
  • 基于libevent的http服务器实现

    基于libevent的http服务器实现 //libevent的http服务器简单实现方式 #include #include #include #include //for st…

    Linux 2023年6月14日
    0103
  • 白话TCP/IP原理

    TCP/IP(Transmission-Control-Protocol/Internet-Protocol),中文译名为传输控制协议/因特网互联协议,又名网络通讯协议,是Inte…

    Linux 2023年6月14日
    070
  • Linux巡检脚本

    #!/bin/bash sys:centos6.x/7.x [ $(id -u) -ne 0 ] && echo "请&#x…

    Linux 2023年6月6日
    093
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球