Flink Table Api & SQL 初体验,Blink的使用

Flink Table Api & SQL 初体验,Blink的使用

概述

  • Flink具有Table API和SQL-用于统一流和批处理。
  • Table API是用于Scala和Java的语言集成查询API,它允许以非常直观的方式组合来自关系运算符(例如选择,过滤和联接)的查询。
  • Flink的SQL支持基于实现SQL标准的Apache Calcite。无论输入是批处理输入(DataSet)还是流输入(DataStream),在两个接口中指定的查询都具有相同的语义并指定相同的结果。

Table API和SQL尚未完成所有功能,正在积极开发中,支持程度需查看官方文档

使用

多表连接案例

pom依赖

flink 版本为:1.9.3


            org.apache.flink
            flink-java
            ${flink.version}
            provided

            org.apache.flink
            flink-streaming-java_${scala.binary.version}
            ${flink.version}

            org.apache.flink
            flink-table-api-java-bridge_2.11
            ${flink.version}

            org.apache.flink
            flink-table-planner-blink_2.11
            ${flink.version}

            org.apache.flink
            flink-table-api-java
            ${flink.version}

模拟一个实时流

import lombok.Data;
@Data
public class Product {
    public Integer id;
    public String seasonType;
}

自定义Source

import common.Product;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.ArrayList;
import java.util.Random;

public class ProductStremingSource implements SourceFunction {
    private boolean isRunning = true;

    @Override
    public void run(SourceContext ctx) throws Exception {
        while (isRunning){
            // 每一秒钟产生一条数据
            Product product = generateProduct();
            ctx.collect(product);
            Thread.sleep(1000);
        }
    }

    private Product generateProduct(){
        int i = new Random().nextInt(100);
        ArrayList list = new ArrayList();
        list.add("spring");
        list.add("summer");
        list.add("autumn");
        list.add("winter");
        Product product = new Product();
        product.setSeasonType(list.get(new Random().nextInt(4)));
        product.setId(i);
        return product;
    }
    @Override
    public void cancel() {

    }
}

主程序

public class TableStremingDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        // 使用Blink
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

        SingleOutputStreamOperator source = bsEnv.addSource(new MyStremingSource())
                .map(new MapFunction() {
                    @Override
                    public Item map(Item value) throws Exception {
                        return value;
                    }
                });
        // 分割流
        final OutputTag even = new OutputTag("even") {
        };
        final OutputTag old = new OutputTag("old") {
        };

        SingleOutputStreamOperator sideOutputData = source.process(new ProcessFunction() {
            @Override
            public void processElement(Item value, Context ctx, Collector out) throws Exception {
                if (value.getId() % 2 == 0) {
                    ctx.output(even,value);
                }else{
                    ctx.output(old,value);
                }
            }
        });

        DataStream evenStream = sideOutputData.getSideOutput(even);
        DataStream oldStream = sideOutputData.getSideOutput(old);
        // 注册两个 表 : evenTable,oddTable
        bsTableEnv.registerDataStream("evenTable",evenStream , "name,id");
        bsTableEnv.registerDataStream("oddTable", oldStream, "name,id");

        // 执行sql 输出Table
        Table queryTable = bsTableEnv.sqlQuery("select a.id,a.name,b.id,b.name from evenTable as a join oddTable as b on a.name = b.name");
        queryTable.printSchema();;
        // 获取流
        DataStream>> dataStream = bsTableEnv.toRetractStream(queryTable, TypeInformation.of(new TypeHint>(){}));
        dataStream.print();

        bsEnv.execute("demo");
    }
}

结果打印

Flink Table Api & SQL 初体验,Blink的使用
输出name相同的元素。

总结

简单的介绍了Flink Table Api & SQL和实现了两表连接的示例。

更多文章:www.ipooli.com

扫码关注公众号《ipoo》

Flink Table Api & SQL 初体验,Blink的使用

Original: https://www.cnblogs.com/ipoo/p/13168165.html
Author: ipoo
Title: Flink Table Api & SQL 初体验,Blink的使用

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/582206/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • 字符编码笔记:ASCII,Unicode 和 UTF-8

    转载自:阮一峰 链接:http://www.ruanyifeng.com/blog/2007/10/ascii_unicode_and_utf-8.html 今天中午,我突然想搞清…

    Linux 2023年6月7日
    095
  • phpcms如何在前台文章列表显示来源

    phpcms的文章来源分为两种,一种是在后台来源中添加完成的,这种”来源”的相关数据存放于数据库的copyfrom表中,通过id和news_data表相关联…

    Linux 2023年6月13日
    088
  • WPF 解决 ObservableCollection 提示 Cannot change ObservableCollection during a CollectionChanged event 异常

    本文告诉大家在使用 ObservableCollection 时,抛出 InvalidOperationException 异常,提示 Cannot change Observab…

    Linux 2023年6月6日
    087
  • 6.20(HTML和CSS–>练习案例)

    HTML脑图:how2j找的阶段性练习,话说VScode编辑器确实比DW好用,简洁免费(不是打广告哈哈) #0 <head> <meta charset=&quo…

    Linux 2023年6月7日
    0118
  • VSCode实现GDB图形界面远程调试

    如何利用VSCode实现GDB图形界面远程调试 前言 在习惯了集成开发环境的图形界面调试时,首次使用GDB远程调试必定很不习惯,下面讲述如何利用VSCode实现GDB图形界面远程调…

    Linux 2023年6月7日
    0119
  • RAID磁盘阵列

    RAID磁盘阵列 *本章重点:了解各RAID级别的原理优缺点及常用级别实现,企业中厂商大多提供了硬RAID方案。 1、什么是RAID? “RAID”一词是由…

    Linux 2023年6月7日
    095
  • Linux动静分离与Rewrite

    一、动静分离 1.1 单台机器动静分离 1、创建NFS挂载点(NFS服务端) mkdir /static vim /etc/exports /static 172.16.1.0/2…

    Linux 2023年5月27日
    0109
  • mysql二进制安装脚本部署

    mysql二进制安装脚本部署 mysql二进制安装脚本部署 单实例 使用函数的单实例 使用函数的单实例或者多实例 单实例 [root@localhost ~]# mkdir mys…

    Linux 2023年6月6日
    0120
  • redis 使用lua 生成流水号

    在实际的业务场景中,我们会用到流水号。之前的流水号做法是,使用redis的全局锁。然后对数据库进行更新,数据库更新 这个也会有一些问题,比如对于同一个流水号,多个线程去更新,由于事…

    Linux 2023年5月28日
    097
  • linux定时删除N天前的旧文件

    语句写法: find 对应目录 -mtime +天数 -name “文件名” -exec rm -rf {} \; 例1:find /usr/local/b…

    Linux 2023年6月13日
    0108
  • 标签泄露 Label leaking

    404. 抱歉,您访问的资源不存在。 可能是网址有误,或者对应的内容被删除,或者处于私有状态。 代码改变世界,联系邮箱 contact@cnblogs.com 园子的商业化努力-困…

    Linux 2023年6月7日
    096
  • django.template.exceptions.TemplateDoesNotExist: django_filters/rest_framework/form.html

    django.template.exceptions.TemplateDoesNotExist: django_filters/rest_framework/form.htmlER…

    Linux 2023年6月14日
    0157
  • QT编程中的char*,wchar_t*与QString之间的转换

    //QString to wchar_t:const wchar_t * encodedName = reinterpret_cast Original: https://www….

    Linux 2023年6月14日
    089
  • 设计模式——面向对象设计原则

    面向对象设计原则 都是为了高内聚低耦合原则。编程时基本都要遵守 分类原则:一种人只干一种事。 举例:(比较简单就不代码了) 人可以干的事情有很多:敲代码、唱歌、跳舞、打篮球&#82…

    Linux 2023年6月7日
    0161
  • 2022年5月16号开始整理habse

    关于本次整理的hbase内容是基于原理的学习的笔记 Original: https://www.cnblogs.com/yxb123/p/16277454.htmlAuthor: …

    Linux 2023年6月7日
    0122
  • 线程

    一、线程概念的引入 进程 之前我们已经了解了操作系统中进程的概念,程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。程序和进程的区…

    Linux 2023年6月14日
    097
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球