概述:Apache Beam WordCount编程实战及源码解读,并通过intellij IDEA和terminal两种方式调试运行WordCount程序,Apache Beam对大数据的批处理和流处理,提供一套先进的统一的编程模型,并可以运行大数据处理引擎上。完整项目Github源码
负责公司大数据处理相关架构,但是具有多样性,极大的增加了开发成本,急需统一编程处理,Apache Beam,一处编程,处处运行,故将折腾成果分享出来。
1.Apache Beam编程实战–前言,Apache Beam的特点与关键概念。
Apache Beam 于2017年1月10日成为Apache新的顶级项目。
1.1.Apache Beam 特点:
- 统一:对于批处理和流媒体用例使用单个编程模型。
- 方便:支持多个pipelines环境运行,包括:Apache Apex, Apache Flink, Apache Spark, 和 Google Cloud Dataflow。
- 可扩展:编写和分享新的SDKs,IO连接器和transformation库
部分翻译摘自官网:Apacher Beam 官网
1.2.Apache Beam关键概念:
1.2.1.Apache Beam SDKs
主要是开发API,为批处理和流处理提供统一的编程模型。目前(2017)支持JAVA语言,而Python正在紧张开发中。
1.2.3. 他们的对如下的支持情况详见 ;
2.Apache Beam编程实战–Apache Beam源码解读
基于maven,intellij IDEA,pom.xm查看 完整项目Github源码 。直接通过IDEA的项目导入功能即可导入完整项目,等待MAVEN下载依赖包,然后按照如下解读步骤即可顺利运行。
2.1.源码解析-Apache Beam 数据流处理原理解析:
关键步骤:
- 创建Pipeline
- 将转换应用于Pipeline
- 读取输入文件
- 应用ParDo转换
- 应用SDK提供的转换(例如:Count)
- 写出输出
- 运行Pipeline
<span class="hljs-javadoc">/**
* MIT.
* Author: wangxiaolei(王小雷).
* Date:17-2-20.
* Project:ApacheBeamWordCount.
*/</span>
<span class="hljs-keyword">import</span> org.apache.beam.sdk.Pipeline;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.io.TextIO;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.options.Default;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.options.Description;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.options.PipelineOptions;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.options.PipelineOptionsFactory;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.options.Validation.Required;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.transforms.Aggregator;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.transforms.Count;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.transforms.DoFn;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.transforms.MapElements;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.transforms.PTransform;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.transforms.ParDo;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.transforms.SimpleFunction;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.transforms.Sum;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.values.KV;
<span class="hljs-keyword">import</span> org.apache.beam.sdk.values.PCollection;
public <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">WordCount</span> {</span>
<span class="hljs-javadoc">/**
*1.a.通过Dofn编程Pipeline使得代码很简洁。b.对输入的文本做单词划分,输出。
*/</span>
static <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">ExtractWordsFn</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">DoFn</span><<span class="hljs-title">String</span>, <span class="hljs-title">String</span>> {</span>
<span class="hljs-keyword">private</span> <span class="hljs-keyword">final</span> Aggregator<long, long> emptyLines =
createAggregator(<span class="hljs-string">"emptyLines"</span>, Sum.ofLongs());
<span class="hljs-annotation">@ProcessElement</span>
public void processElement(ProcessContext c) {
<span class="hljs-keyword">if</span> (c.element().trim().isEmpty()) {
emptyLines.addValue(<span class="hljs-number">1</span>L);
}
String[] words = c.element().split(<span class="hljs-string">"[^a-zA-Z']+"</span>);
<span class="hljs-keyword">for</span> (String word : words) {
<span class="hljs-keyword">if</span> (!word.isEmpty()) {
c.output(word);
}
}
}
}
<span class="hljs-javadoc">/**
*2.格式化输入的文本数据,将转换单词为并计数的打印字符串。
*/</span>
public static <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">FormatAsTextFn</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">SimpleFunction</span><<span class="hljs-title">KV</span><<span class="hljs-title">String</span>, <span class="hljs-title">Long</span>>, <span class="hljs-title">String</span>> {</span>
<span class="hljs-annotation">@Override</span>
public String apply(KV<string, long> input) {
<span class="hljs-keyword">return</span> input.getKey() + <span class="hljs-string">": "</span> + input.getValue();
}
}
<span class="hljs-javadoc">/**
*3.单词计数,PTransform(PCollection Transform)将PCollection的文本行转换成格式化的可计数单词。
*/</span>
public static <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">CountWords</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">PTransform</span><<span class="hljs-title">PCollection</span><<span class="hljs-title">String</span>>,</span>
PCollection<kv<string, long>>> {
<span class="hljs-annotation">@Override</span>
public PCollection<kv<string, long>> expand(PCollection<string> lines) {
PCollection<string> words = lines.apply(
ParDo.of(<span class="hljs-keyword">new</span> ExtractWordsFn()));
PCollection<kv<string, long>> wordCounts =
words.apply(Count.<string>perElement());
<span class="hljs-keyword">return</span> wordCounts;
}
}
<span class="hljs-javadoc">/**
*4.可以自定义一些选项(Options),比如文件输入输出路径
*/</span>
public interface WordCountOptions <span class="hljs-keyword">extends</span> PipelineOptions {
<span class="hljs-javadoc">/**
* 文件输入选项,可以通过命令行传入路径参数,路径默认为gs://apache-beam-samples/shakespeare/kinglear.txt
*/</span>
<span class="hljs-annotation">@Description</span>(<span class="hljs-string">"Path of the file to read from"</span>)
<span class="hljs-annotation">@Default</span>.String(<span class="hljs-string">"gs://apache-beam-samples/shakespeare/kinglear.txt"</span>)
String getInputFile();
void setInputFile(String value);
<span class="hljs-javadoc">/**
* 设置结果文件输出路径,在intellij IDEA的运行设置选项中或者在命令行中指定输出文件路径,如./pom.xml
*/</span>
<span class="hljs-annotation">@Description</span>(<span class="hljs-string">"Path of the file to write to"</span>)
<span class="hljs-annotation">@Required</span>
String getOutput();
void setOutput(String value);
}
<span class="hljs-javadoc">/**
* 5.运行程序
*/</span>
public static void main(String[] args) {
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(WordCountOptions.<span class="hljs-keyword">class</span>);
Pipeline p = Pipeline.create(options);
p.apply(<span class="hljs-string">"ReadLines"</span>, TextIO.Read.from(options.getInputFile()))
.apply(<span class="hljs-keyword">new</span> CountWords())
.apply(MapElements.via(<span class="hljs-keyword">new</span> FormatAsTextFn()))
.apply(<span class="hljs-string">"WriteCounts"</span>, TextIO.Write.to(options.getOutput()));
p.run().waitUntilFinish();
}
}</string></kv<string,></string></string></kv<string,></kv<string,></string,></long,>
3.1.intellij IDEA(社区版)中Spark大数据框架运行Pipeline计算程序
- Spark运行
- 设置VM options
<span class="hljs-attribute">-DPapex</span><span class="hljs-attribute">-runner</span>
- 设置Programe arguments
<span class="hljs-literal">-</span><span class="hljs-literal">-</span><span class="hljs-string">.</span> <span class="hljs-literal">-</span><span class="hljs-literal">-</span>
- Apex运行
- 设置VM options
<span class="hljs-attribute">-DPapex</span><span class="hljs-attribute">-runner</span>
- 设置Programe arguments
<span class="hljs-literal">-</span><span class="hljs-literal">-</span><span class="hljs-string">.</span> <span class="hljs-literal">-</span><span class="hljs-literal">-</span>
- Flink运行等等
- 设置VM options
<span class="hljs-attribute">-DPflink</span><span class="hljs-attribute">-runner</span>
- 设置Programe arguments
<span class="hljs-literal">-</span><span class="hljs-literal">-</span><span class="hljs-string">.</span> <span class="hljs-literal">-</span><span class="hljs-literal">-</span>
4.终端运行(Terminal)(不推荐,第一次下载过程很慢,开发体验较差)
4.1.以下命令是下载 官方 示例源码,第一次运行下载较慢,如果失败了就多运行几次,( 推荐下载,完整项目Github源码 )直接用上述解读在intellij IDEA中运行。
mvn archetype:generate -DarchetypeRepository=https://repository<span class="hljs-preprocessor">.apache</span><span class="hljs-preprocessor">.org</span>/content/groups/snapshots -DarchetypeGroupId=org<span class="hljs-preprocessor">.apache</span><span class="hljs-preprocessor">.beam</span> -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples -DarchetypeVersion=LATEST -DgroupId=org<span class="hljs-preprocessor">.example</span> -DartifactId=word-count-beam -Dversion=<span class="hljs-string">"0.1"</span> -Dpackage=org<span class="hljs-preprocessor">.apache</span><span class="hljs-preprocessor">.beam</span><span class="hljs-preprocessor">.examples</span> -DinteractiveMode=false
4.2.打包并运行
mvn compile exec:java -Dexec<span class="hljs-preprocessor">.mainClass</span>=org<span class="hljs-preprocessor">.apache</span><span class="hljs-preprocessor">.beam</span><span class="hljs-preprocessor">.examples</span><span class="hljs-preprocessor">.WordCount</span> -Dexec<span class="hljs-preprocessor">.args</span>=<span class="hljs-string">"--runner=SparkRunner --inputFile=pom.xml --output=counts"</span> -Pspark-runner
4.3.成功运行结果
4.3.1.显示运行成功
4.3.2.WordCount输出计算结果
Original: https://www.cnblogs.com/lanzhi/p/6467639.html
Author: 岚之山
Title: Apache Beam WordCount编程实战及源码解读
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/552393/
转载文章受原作者版权保护。转载请注明原作者出处!