每天5分钟Flink -WordCount及Flink SQL

hello,大家好,我是Johngo呀!

今天是 Flink 系列的第二篇。

目标:通过每天一小会儿,熟悉 Flink 的方方面面,为后面算法实现提供工具基础。

第二篇:《每天5分钟Flink -WordCount及Flink SQL》

JDK:1.8

Flink:1.13.2

Scala:2.11.12

创建Flink 工程网上已经很多说明方法了,这里先不赘述,以下全部的代码使用 IDEA 进行编码。

本文讲解的 WordCount 程序是大数据的入门程序。

WordCount 程序是在不同上下文环境下实现的,是一个入门版本,可以跟着一步一步实现起来。包括 Streaming 和 Batch 以及 SQL 的简单案例。

上述所有的 Flink 语义都会在后面分篇章详细赘述。

基础配置

首先pom.xml 中要配置的依赖是:

provided 选项在这表示此依赖只在代码编译的时候使用,运行和打包的时候不使用。

java依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
  <!-- <scope>provided</scope> -->
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
  <!-- <scope>provided</scope>-->
</dependency>

scala依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
  <!-- <scope>provided</scope> -->
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
  <!-- <scope>provided</scope>-->
</dependency>

另外,pom文件中镜像文件建议配置阿里云的maven仓库,国内下载速度会快,如果找不到对应的镜像文件,需要切换到国外仓库。

<repositories>
    <repository>
        <id>central</id>
        <name>aliyun maven</name>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        <layout>default</layout>
        <!-- 是否开启发布版构件下载 -->
        <releases>
            <enabled>true</enabled>
        </releases>
        <!-- 是否开启快照版构件下载 -->
        <snapshots>
            <enabled>false</enabled>
        </snapshots>
    </repository>
</repositories>

大致先看下我现在的项目结构以及今天要实现的几个类的位置.

每天5分钟Flink -WordCount及Flink SQL

语言界的 hello word,大数据界的 WordCount,都是一个入门Demo。

今天咱们也按照这个入门的 Demo,把 Flink 相关代码捋顺。

包括 Streaming、Batch 以及 Flink Sql 三方面分别来实现。

Streaming WordCount

先来分析一个 Streaming WordCount。

为了模仿流式计算,咱们在本地利用 netcat命令 nc -l {port}来进行模仿数据产出。

同时,咱们实现的功能是:每隔 1s 计算过去 2s 内产出数据各单词的个数,也就是实现每隔1s计算过去 2s 的 WordCount 程序。

将窗口内接收到的数据进行拆分致每一行,然后分别赋值为1,之后进行分组求和。

每天5分钟Flink -WordCount及Flink SQL

大致处理的流程如上所示,现在来一步一步实现这个案例。

先开始创建 Flink 的运行环境:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

然后指定了数据 Source 源,以及 Source 源的一些配置:

String hostname = "127.0.0.1";
int port = 8888;
String delimiter = "\n";
// 链接 socket 获取数据
DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);

之后就进行了数据的平铺,分组,窗口计算等操作。

另外,程序中实现了一个内部类WordWithCount,用来表示单词的 key 和 count。

利用 keyBy()函数对 key进行分组。

window函数表示每一个滑动窗口,SlidingProcessingTimeWindows实现每隔 1s 对过去 2s 进行计数。

下面整体看下代码:

public class SocketWindowWordCountJava {
    public static void main(String[] args) throws Exception {

        // 获取 Flink 运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String hostname = "127.0.0.1";
        int port = 8888;
        String delimiter = "\n";
        // 链接 socket 获取数据
        DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);

        // Flink 数据处理
        DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {

            @Override
            public void flatMap(String s, Collector<WordWithCount> out) throws Exception {
                String[] splits = s.split("\\s");
                for (String word : splits) {
                    out.collect(new WordWithCount(word, 1L));
                }
            }
        }).keyBy(value -> value.word)
                .window(SlidingProcessingTimeWindows.of(Time.seconds(2), Time.seconds(1)))
                .sum("count");
        windowCounts.print().setParallelism(1);
        env.execute("Socket Streaming wordCount");
    }

    public static class WordWithCount {
        public String word;
        public long count;

        public  WordWithCount() {}
        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}

现在把程序执行起来,先在本地起一个netcat程序,然后启动Flink程序:

$ nc -lk 8888
flink java
apple banana
aaa
ccc

之后,控制台进行了相应的打印:

每天5分钟Flink -WordCount及Flink SQL

从最后的字符串ccc来看,很显然第一次产出ccc的时候,进行了计算,count=1。在后 1s 的时候进行了前 2s 的计算。因此,依然打印了 ccc

用 java 实现完,接下来用 scala 也实现一下相同的逻辑,有兴趣的同学可作参考:

object SocketWindowWordCountScala {
  def main(args: Array[String]): Unit = {
    //获取运行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // 从socket获取数据
    val text = env.socketTextStream("127.0.0.1", 8888, delimiter = '\n')

    // 解析数据
    //注意:必须要添加这一行隐式转行,否则下面的flatmap方法执行会报错
    import org.apache.flink.api.scala._
    val windowCount = text.flatMap(line => line.split("\\s"))
      .map(w => WordWithCount(w, 1L))
      .keyBy(_.word)
      .window(SlidingProcessingTimeWindows.of(Time.seconds(2), Time.seconds(1)))
      .sum("count")

    // 打印到控制台
    windowCount.print().setParallelism(1)

    env.execute("Scala Window Count")

  }
  case class WordWithCount(word: String, count: Long)
}

依然是启动 flink 程序和 nc:

nc -lk 8888
flink java
apple banana
aaa
ccc

再看控制台的打印结果,是和咱们想实现的一致:

每天5分钟Flink -WordCount及Flink SQL

注意:窗口的使用方式在新版本中有较大的区别,这个咱们在后面会详细把这部分进行讲解。

Batch WordCount

批处理程序,这里用一个文本来作为数据源。

将文本中的数据进行拆分致每一行,然后分别赋值为1,之后进行分组求和。

每天5分钟Flink -WordCount及Flink SQL

处理逻辑依然如图所示,然后下面咱们也创建一个文本如图里的内容(src/main/datas/dm.csv):

Java,Fink
Scala 
Streaming
Flink,Java 
Scala
Batch,Scala

首先创建 Flink 运行环境

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

之后进行读取文件

DataSource text = env.readTextFile(filePath);

然后通过实现 FlatMapFunction 接口进行数据的打平操作(上面类 Tokenizer 的实现)。

最后进行分组求和,Batch WordCount 全部完成!

下面看 Batch 整体代码:

public class WordCountJava {
    public static void main(String[] args) throws Exception {
        String filePath = "src/main/datas/dm.csv";
        String resultPath = "src/main/datas/wc_rst.csv";
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource text = env.readTextFile(filePath);
        DataSet<Tuple2<String, Integer>> result = text.flatMap(new Tokenizer()).groupBy(0).sum(1);
        result.print();
        result.writeAsCsv(resultPath, "\n", ",");
        env.execute("batch Word Count");
    }

    private static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String text, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] tokens = text.toLowerCase().split(",|\\n");
            for (String token: tokens) {
                if (token.length()>0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}

程序中,通过读取src/main/datas/dm.csv中的数据,最后计算结果打印到控制台以及存储结果数据到src/main/datas/wc_rst.csv

执行起来,看打印结果:

每天5分钟Flink -WordCount及Flink SQL

求得给定文件的 WordCount 的结果。

下面用 Scala 实现一次:

object WordCountScala {
  def main(args: Array[String]): Unit = {
    val filePath = "src/main/datas/dm.csv"

    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.readTextFile(filePath)

    //引入隐式转换
    import org.apache.flink.api.scala._
    val result = text.flatMap(_.split(",|\\n"))
      .map((_, 1))
      .groupBy(0)
      .sum(1)
    result.print()
    env.execute("Word Count Scala")
  }
}

用 Scala 实现起来就很简单了。

Flink SQL WordCount

尤其是有过 MapReduce 和 Hive 经历的朋友,就可以和它们放在一起做比较,一个复杂,一个简单。

比如说下面的 SQL 语句,就一句就可以省去上面那么多的代码工作量。

SELECT word, COUNT(*) FROM table GROUP BY word;

下面利用 FlinkSQL 实现 WordCount 功能。

首先,pom 文件必须要添加的依赖:

<!-- use the Table API & SQL for defining pipelines.-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

<!-- run the Table API & SQL programs locally within your IDE,-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <!-- <scope>provided</scope>-->
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <!-- <scope>provided</scope>-->
</dependency>

<!-- SQL Client-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-common</artifactId>
    <version>${flink.version}</version>
    <!-- <scope>provided</scope>-->
</dependency>

先用 Java 来实现 FlinkSQL,将 “Alice Bob John Alice Bob Alice” 进行单词计数。

a. 首先创建 Flink 的运行环境以及 SQL api 环境:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

b. 创建一个字符串流,并且转为 (word, 1L) 的格式:

String names = "Alice Bob John Alice Bob Alice";
ArrayList<WordWithCount> name = new ArrayList<>();
String[] splits = names.split("\\s");
for(String split: splits) {
    name.add(new WordWithCount(split, 1L));
}

c. 注册成表,指定字段名,转为视图&查询

Table inputTable = tableEnv.fromDataStream(dataStream).as("name", "cnt");
inputTable.printSchema();

// 注册表为视图 & 查询
tableEnv.createTemporaryView("InputTable", inputTable);
Table resultTable = tableEnv.sqlQuery("SELECT name, SUM(cnt) FROM InputTable GROUP BY name");

d. 转为 Stream 并且打印出来

DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);
resultStream.print();

下面看整体代码:

public class WordCountWithSQLJava {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 创建一个数据集合
        String names = "Alice Bob John Alice Bob Alice";
        ArrayList<WordWithCount> name = new ArrayList<>();
        String[] splits = names.split("\\s");
        for(String split: splits) {
            name.add(new WordWithCount(split, 1L));
        }

        DataStreamSource<WordWithCount> dataStream = env.fromCollection(name);
        //DataSet 转sql, 指定字段名
        Table inputTable = tableEnv.fromDataStream(dataStream).as("name", "cnt");
        inputTable.printSchema();

        // 注册表为视图 & 查询
        tableEnv.createTemporaryView("InputTable", inputTable);
        Table resultTable = tableEnv.sqlQuery("SELECT name, SUM(cnt) FROM InputTable GROUP BY name");

        // 将结果数据转换为DataStream
        DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);

        resultStream.print();
        env.execute();
    }

    public static class WordWithCount {
        public String word;
        public long count;

        public  WordWithCount() {}
        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}

整体代码执行结果:

每天5分钟Flink -WordCount及Flink SQL

Java 实现后,下面再用 Scala 来实现一次,代码逻辑一致,可以参考:

object WordCountWithSQLScala {
  def main(args: Array[String]): Unit = {
    // 创建运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)

    // 创建一个数据集合
    val names = "Alice Bob John Alice Bob Alice"
    import org.apache.flink.api.scala._
    val wc_names = names
      .split("\\s")
      .map(w => WordWithCount(w, 1L))
    val dataStream = env.fromCollection(wc_names)

    // 转换为一个表(table)
    val inputTable = tableEnv.fromDataStream(dataStream).as("name", "cnt")
    inputTable.printSchema()

    // 将数据流转换为一个视图 & 查询
    tableEnv.createTemporaryView("InputTable", inputTable)
    val resultTable = tableEnv.sqlQuery("SELECT name, SUM(cnt) FROM InputTable GROUP BY name")

    // 将结果转为流打印
    val resultStream = tableEnv.toChangelogStream(resultTable)
    resultStream.print().setParallelism(1)

    val jobName = WordCountWithSQLScala.getClass.getSimpleName
    env.execute(jobName)

  }
  case class WordWithCount(word: String, count: Long)
}

代码执行的结果也一致:

每天5分钟Flink -WordCount及Flink SQL

总结

今天实现了大数据的经典案例 WordCount,然后在不同场景下的实现。包括 Streaming 和 Batch,以及 Flink SQL 的实现。

该篇文章还只是一个入门级的程序,后面将会各重要点进行详细阐述。

发布者:Johngo学长。文章已受到原创版权保护。
转载请注明出处:https://www.johngo689.com/2155/

(2)
上一篇 2021年9月17日 下午1:40
下一篇 2021年9月20日 下午1:15

相关推荐

发表评论

登录后才能评论

评论列表(1条)

免费咨询
免费咨询
扫码关注
扫码关注
联系站长

站长Johngo!

大数据和算法重度研究者!

持续产出大数据、算法、LeetCode干货,以及业界好资源!

2022012703491714

微信来撩,免费咨询:xiaozhu_tec

分享本页
返回顶部