Apache Beam WordCount编程实战及源码解读

概述:Apache Beam WordCount编程实战及源码解读,并通过intellij IDEA和terminal两种方式调试运行WordCount程序,Apache Beam对大数据的批处理和流处理,提供一套先进的统一的编程模型,并可以运行大数据处理引擎上。完整项目Github源码

Apache Beam WordCount编程实战及源码解读

负责公司大数据处理相关架构,但是具有多样性,极大的增加了开发成本,急需统一编程处理,Apache Beam,一处编程,处处运行,故将折腾成果分享出来。

1.Apache Beam编程实战–前言,Apache Beam的特点与关键概念。

Apache Beam 于2017年1月10日成为Apache新的顶级项目。

1.1.Apache Beam 特点:

  • 统一:对于批处理和流媒体用例使用单个编程模型。
  • 方便:支持多个pipelines环境运行,包括:Apache Apex, Apache Flink, Apache Spark, 和 Google Cloud Dataflow。
  • 可扩展:编写和分享新的SDKs,IO连接器和transformation库
    部分翻译摘自官网:Apacher Beam 官网

1.2.Apache Beam关键概念:

1.2.1.Apache Beam SDKs

主要是开发API,为批处理和流处理提供统一的编程模型。目前(2017)支持JAVA语言,而Python正在紧张开发中。

1.2.3. 他们的对如下的支持情况详见 ;

Apache Beam WordCount编程实战及源码解读

2.Apache Beam编程实战–Apache Beam源码解读

基于maven,intellij IDEA,pom.xm查看 完整项目Github源码 。直接通过IDEA的项目导入功能即可导入完整项目,等待MAVEN下载依赖包,然后按照如下解读步骤即可顺利运行。

2.1.源码解析-Apache Beam 数据流处理原理解析:

关键步骤:

  • 创建Pipeline
  • 将转换应用于Pipeline
  • 读取输入文件
  • 应用ParDo转换
  • 应用SDK提供的转换(例如:Count)
  • 写出输出
  • 运行Pipeline

Apache Beam WordCount编程实战及源码解读
<span class="hljs-javadoc">/**
 * MIT.

 * Author: wangxiaolei(&#x738B;&#x5C0F;&#x96F7;).

 * Date:17-2-20.

 * Project:ApacheBeamWordCount.

 */</span>

<span class="hljs-keyword">import</span> org.apache.beam.sdk.Pipeline;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.io.TextIO;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.options.Default;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.options.Description;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.options.PipelineOptions;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.options.PipelineOptionsFactory;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.options.Validation.Required;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.transforms.Aggregator;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.transforms.Count;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.transforms.DoFn;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.transforms.MapElements;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.transforms.PTransform;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.transforms.ParDo;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.transforms.SimpleFunction;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.transforms.Sum;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.values.KV;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.values.PCollection;

public <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">WordCount</span> {</span>

    <span class="hljs-javadoc">/**
     *1.a.&#x901A;&#x8FC7;Dofn&#x7F16;&#x7A0B;Pipeline&#x4F7F;&#x5F97;&#x4EE3;&#x7801;&#x5F88;&#x7B80;&#x6D01;&#x3002;b.&#x5BF9;&#x8F93;&#x5165;&#x7684;&#x6587;&#x672C;&#x505A;&#x5355;&#x8BCD;&#x5212;&#x5206;&#xFF0C;&#x8F93;&#x51FA;&#x3002;
     */</span>
    static <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">ExtractWordsFn</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">DoFn</span><<span class="hljs-title">String</span>, <span class="hljs-title">String</span>> {</span>
        <span class="hljs-keyword">private</span> <span class="hljs-keyword">final</span> Aggregator<long, long> emptyLines =
                createAggregator(<span class="hljs-string">"emptyLines"</span>, Sum.ofLongs());

        <span class="hljs-annotation">@ProcessElement</span>
        public void processElement(ProcessContext c) {
            <span class="hljs-keyword">if</span> (c.element().trim().isEmpty()) {
                emptyLines.addValue(<span class="hljs-number">1</span>L);
            }

            String[] words = c.element().split(<span class="hljs-string">"[^a-zA-Z']+"</span>);

            <span class="hljs-keyword">for</span> (String word : words) {
                <span class="hljs-keyword">if</span> (!word.isEmpty()) {
                    c.output(word);
                }
            }
        }
    }

    <span class="hljs-javadoc">/**
     *2.&#x683C;&#x5F0F;&#x5316;&#x8F93;&#x5165;&#x7684;&#x6587;&#x672C;&#x6570;&#x636E;&#xFF0C;&#x5C06;&#x8F6C;&#x6362;&#x5355;&#x8BCD;&#x4E3A;&#x5E76;&#x8BA1;&#x6570;&#x7684;&#x6253;&#x5370;&#x5B57;&#x7B26;&#x4E32;&#x3002;
     */</span>
    public static <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">FormatAsTextFn</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">SimpleFunction</span><<span class="hljs-title">KV</span><<span class="hljs-title">String</span>, <span class="hljs-title">Long</span>>, <span class="hljs-title">String</span>> {</span>
        <span class="hljs-annotation">@Override</span>
        public String apply(KV<string, long> input) {
            <span class="hljs-keyword">return</span> input.getKey() + <span class="hljs-string">": "</span> + input.getValue();
        }
    }
    <span class="hljs-javadoc">/**
     *3.&#x5355;&#x8BCD;&#x8BA1;&#x6570;&#xFF0C;PTransform(PCollection Transform)&#x5C06;PCollection&#x7684;&#x6587;&#x672C;&#x884C;&#x8F6C;&#x6362;&#x6210;&#x683C;&#x5F0F;&#x5316;&#x7684;&#x53EF;&#x8BA1;&#x6570;&#x5355;&#x8BCD;&#x3002;
     */</span>
    public static <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">CountWords</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">PTransform</span><<span class="hljs-title">PCollection</span><<span class="hljs-title">String</span>>,</span>
            PCollection<kv<string, long>>> {
        <span class="hljs-annotation">@Override</span>
        public PCollection<kv<string, long>> expand(PCollection<string> lines) {

            PCollection<string> words = lines.apply(
                    ParDo.of(<span class="hljs-keyword">new</span> ExtractWordsFn()));

            PCollection<kv<string, long>> wordCounts =
                    words.apply(Count.<string>perElement());

            <span class="hljs-keyword">return</span> wordCounts;
        }
    }

    <span class="hljs-javadoc">/**
     *4.&#x53EF;&#x4EE5;&#x81EA;&#x5B9A;&#x4E49;&#x4E00;&#x4E9B;&#x9009;&#x9879;&#xFF08;Options&#xFF09;&#xFF0C;&#x6BD4;&#x5982;&#x6587;&#x4EF6;&#x8F93;&#x5165;&#x8F93;&#x51FA;&#x8DEF;&#x5F84;
     */</span>
    public interface WordCountOptions <span class="hljs-keyword">extends</span> PipelineOptions {

        <span class="hljs-javadoc">/**
         * &#x6587;&#x4EF6;&#x8F93;&#x5165;&#x9009;&#x9879;&#xFF0C;&#x53EF;&#x4EE5;&#x901A;&#x8FC7;&#x547D;&#x4EE4;&#x884C;&#x4F20;&#x5165;&#x8DEF;&#x5F84;&#x53C2;&#x6570;&#xFF0C;&#x8DEF;&#x5F84;&#x9ED8;&#x8BA4;&#x4E3A;gs://apache-beam-samples/shakespeare/kinglear.txt
         */</span>
        <span class="hljs-annotation">@Description</span>(<span class="hljs-string">"Path of the file to read from"</span>)
        <span class="hljs-annotation">@Default</span>.String(<span class="hljs-string">"gs://apache-beam-samples/shakespeare/kinglear.txt"</span>)
        String getInputFile();
        void setInputFile(String value);

        <span class="hljs-javadoc">/**
         * &#x8BBE;&#x7F6E;&#x7ED3;&#x679C;&#x6587;&#x4EF6;&#x8F93;&#x51FA;&#x8DEF;&#x5F84;,&#x5728;intellij IDEA&#x7684;&#x8FD0;&#x884C;&#x8BBE;&#x7F6E;&#x9009;&#x9879;&#x4E2D;&#x6216;&#x8005;&#x5728;&#x547D;&#x4EE4;&#x884C;&#x4E2D;&#x6307;&#x5B9A;&#x8F93;&#x51FA;&#x6587;&#x4EF6;&#x8DEF;&#x5F84;&#xFF0C;&#x5982;./pom.xml
         */</span>
        <span class="hljs-annotation">@Description</span>(<span class="hljs-string">"Path of the file to write to"</span>)
        <span class="hljs-annotation">@Required</span>
        String getOutput();
        void setOutput(String value);
    }
    <span class="hljs-javadoc">/**
     * 5.&#x8FD0;&#x884C;&#x7A0B;&#x5E8F;
     */</span>
    public static void main(String[] args) {
        WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(WordCountOptions.<span class="hljs-keyword">class</span>);
        Pipeline p = Pipeline.create(options);

        p.apply(<span class="hljs-string">"ReadLines"</span>, TextIO.Read.from(options.getInputFile()))
                .apply(<span class="hljs-keyword">new</span> CountWords())
                .apply(MapElements.via(<span class="hljs-keyword">new</span> FormatAsTextFn()))
                .apply(<span class="hljs-string">"WriteCounts"</span>, TextIO.Write.to(options.getOutput()));

        p.run().waitUntilFinish();
    }
}</string></kv<string,></string></string></kv<string,></kv<string,></string,></long,>

3.1.intellij IDEA(社区版)中Spark大数据框架运行Pipeline计算程序

  • Spark运行
  • 设置VM options
<span class="hljs-attribute">-DPapex</span><span class="hljs-attribute">-runner</span>
  • 设置Programe arguments
<span class="hljs-literal">-</span><span class="hljs-literal">-</span><span class="hljs-string">.</span> <span class="hljs-literal">-</span><span class="hljs-literal">-</span>

Apache Beam WordCount编程实战及源码解读
  • Apex运行
  • 设置VM options
<span class="hljs-attribute">-DPapex</span><span class="hljs-attribute">-runner</span>
  • 设置Programe arguments
<span class="hljs-literal">-</span><span class="hljs-literal">-</span><span class="hljs-string">.</span> <span class="hljs-literal">-</span><span class="hljs-literal">-</span>
  • Flink运行等等
  • 设置VM options
<span class="hljs-attribute">-DPflink</span><span class="hljs-attribute">-runner</span>
  • 设置Programe arguments
<span class="hljs-literal">-</span><span class="hljs-literal">-</span><span class="hljs-string">.</span> <span class="hljs-literal">-</span><span class="hljs-literal">-</span>

4.终端运行(Terminal)(不推荐,第一次下载过程很慢,开发体验较差)

4.1.以下命令是下载 官方 示例源码,第一次运行下载较慢,如果失败了就多运行几次,( 推荐下载,完整项目Github源码 )直接用上述解读在intellij IDEA中运行。

mvn archetype:generate       -DarchetypeRepository=https://repository<span class="hljs-preprocessor">.apache</span><span class="hljs-preprocessor">.org</span>/content/groups/snapshots       -DarchetypeGroupId=org<span class="hljs-preprocessor">.apache</span><span class="hljs-preprocessor">.beam</span>       -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples       -DarchetypeVersion=LATEST       -DgroupId=org<span class="hljs-preprocessor">.example</span>       -DartifactId=word-count-beam       -Dversion=<span class="hljs-string">"0.1"</span>       -Dpackage=org<span class="hljs-preprocessor">.apache</span><span class="hljs-preprocessor">.beam</span><span class="hljs-preprocessor">.examples</span>       -DinteractiveMode=false

Apache Beam WordCount编程实战及源码解读

4.2.打包并运行

mvn compile exec:java -Dexec<span class="hljs-preprocessor">.mainClass</span>=org<span class="hljs-preprocessor">.apache</span><span class="hljs-preprocessor">.beam</span><span class="hljs-preprocessor">.examples</span><span class="hljs-preprocessor">.WordCount</span>      -Dexec<span class="hljs-preprocessor">.args</span>=<span class="hljs-string">"--runner=SparkRunner --inputFile=pom.xml --output=counts"</span> -Pspark-runner

Apache Beam WordCount编程实战及源码解读

4.3.成功运行结果

4.3.1.显示运行成功

Apache Beam WordCount编程实战及源码解读

4.3.2.WordCount输出计算结果

Apache Beam WordCount编程实战及源码解读

Original: https://www.cnblogs.com/lanzhi/p/6467639.html
Author: 岚之山
Title: Apache Beam WordCount编程实战及源码解读

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/552393/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • solr查询score机制

    首先,solr使用的是默认的评分机制,要搞明白lucene默认评分机制,需要首先了解一下lucene的查询对象。 1、termquery 2、boolean query 3、ran…

    技术杂谈 2023年5月31日
    091
  • Spring Boot下的一种导入Excel文件的代码框架

    1、前言 ​ Spring Boot下如果只是导入一个简单的Excel文件,是容易的。网上类似的文章不少,有的针对具体的实体类,代码可重用性不高;有的利用反射机制或自定义注解,开发…

    技术杂谈 2023年6月21日
    098
  • 浅谈kali : arpspoof工具原理

    介绍 arpspoof是一个通过ARP协议伪造数据包实现中间人攻击的kali工具。 中间人攻击虽然古老,但仍处于受到黑客攻击的危险中,可能会严重导致危害服务器和用户。仍然有很多变种…

    技术杂谈 2023年7月25日
    082
  • java.sql.SQLSyntaxErrorException: Unknown database报错

    一般是jdbc链接的数据库名字写错了,检查数据库的名字。 Original: https://www.cnblogs.com/333czh/p/15622694.htmlAutho…

    技术杂谈 2023年6月21日
    073
  • 数据库架构演变概要

    一. 背景 为了适应业务增长,数据库数据量快速增长,性能日趋下降,稳定性不佳的实际情况,急需架构逐步演变适应未来的业务发展。 二. 现状 【稳定性】数据库为单点,没有高可用和稳定性…

    技术杂谈 2023年7月23日
    064
  • 【证券从业】金融基础知识-第四章 股票03

    注1:后续学习并整理到第八章,全书完结后再合并成一个笔记进行源文件分享 注2:本章内容巨多,大约分为三篇文章记录消化 posted @2022-06-08 01:28 陈景中 阅读…

    技术杂谈 2023年7月10日
    083
  • Windows内核安全与驱动开发

    编辑推荐 本书适合计算机安全软件从业人员、计算机相关专业院校学生以及有一定C语言和操作系统基础知识的编程爱好者阅读。 内容简单介绍 本书的前身是《天书夜读——从汇编语言到Windo…

    技术杂谈 2023年5月31日
    084
  • VMware 虚拟机图文安装和配置 Ubuntu Server 22.04 LTS 教程

    前言:本文将以 Ubuntu Server 22.04 LTS 为例,说明在 VMware 虚拟机中的安装和配置 Linux 操作系统的步骤。 一、VMWare 安装配置 二、Ub…

    技术杂谈 2023年7月11日
    0112
  • 神秘的backlog参数与TCP连接队列

    原创:打码日记(微信公众号ID:codelogs),欢迎分享,转载请保留出处。 简介 这要从一次压测项目说起,那是我们公司的系统与另几家同行公司的系统做性能比拼,性能数据会直接影响…

    技术杂谈 2023年7月25日
    082
  • Base64编码出现换行符

    Base64是一种字符串编码格式,Base64采用A-Z a-z 0-9 “+” “/”这一共64个字符来编码原始字符(还有垫字符&…

    技术杂谈 2023年5月31日
    079
  • nmap端口状态解析

    nmap端口状态解析 状态 说明 open 应用程序在该端口接收 TCP 连接或者 UDP 报文 closed 关闭的端口对于nmap也是可访问的, 它接收nmap探测报文并作出响…

    技术杂谈 2023年5月31日
    0104
  • Linux 添加大于2TB磁盘扩容逻辑卷

    一、查看新添加磁盘以及分区情况 # lsblk 二、使用parted进行分区 # parted /dev/sdb // /dev/sdb 为上面查看到的新添加未分区的磁盘 (par…

    技术杂谈 2023年7月24日
    058
  • ==和equals的区别是什么?

    ==:基本类型比较的是值的大小,引用类型比较的是内存地址,是不是同一个对象,equals:默认比较同一个对象的内容 == 和 equals 的区别是什么? == : 它的作用是判断…

    技术杂谈 2023年7月24日
    077
  • 什么是Servlet?

    Servlet 是sun 公司提供的一门用于开发动态web 资源的技术,是Java 语言中编写Web 服务器扩展功能的重要技术,同时它也是JSP 技术的底层运行基础。 Servle…

    技术杂谈 2023年5月31日
    086
  • 关于IDEA中Tomcat中文乱码的解决方案

    进入Tomcat/config文件夹下,打开编辑logging.properties 然后查看该文件内是否存在 java.util.logging.ConsoleHandler.e…

    技术杂谈 2023年7月23日
    078
  • 最新一线大厂Redis使用21条军规及详细解读

    说明:个人原创,本人在一线互联网大厂维护着几千套集群,关于redis使用的一些坑进行了经验总结,希望能给大家带来一些帮助 适用场景:并发量大、访问量大的业务 规范:介绍军规内容 解…

    技术杂谈 2023年7月25日
    0206
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球