大数据学习笔记——————-(18)

第18章 KAFKA与SPARK集成

18.1Kafka与spark集成

Kafka是Spark流式传输潜在消息和集成的平台。Kafka充当实时数据流的中心枢纽,并使用Spark Streaming中的复杂算法进行处理。一旦数据被处理,Spark Streaming可以将结果发布到另一个Kafka topic或存储在HDFS,数据库或控制面板中。

大数据学习笔记-------------------(18)

18.2 SparkConfAPI

SparkConf表示Spark应用程序的配置。用于将各种Spark参数设置为键值对。SparkConf类有如下方法:

Ø set(string key,stringvalue)———-设置配置变量

Ø remove(string key)——————–从配置中移除key

Ø setAppName(string name)———设置应用程序名称

Ø get(string key)————————–获取key

18.3StreamingContext API

Spark功能的主要入口点。SparkContext表示到Spark集群的连接,可用于在集群上创建RDD,累加器和广播变量。签名的定义如下所示:

public StreamingContext(String master, String appName, 
Duration batchDuration,
String sparkHome,
scala.collection.Seq<String> jars,
scala.collection.Map<String,String> environment)

master:连接打集群上的URL

appName:作业名称,并显示在集群WebUI上

batchDuration:定期将流数据分进批次

public StreamingContext(SparkConf conf, Duration batchDuration)

conf:Spark 参数

batchDuration:定期将流数据分进批次

18.4 KafkaUtilsAPI

KafkaUtils API用于将Kafka集群连接到Spark流。API具有如下定义方法 createStream签名:

public static ReceiverInputDStream> createStream(                                              StreamingContext ssc,                                               String zkQuorum,                                               String groupId,                                               scala.collection.immutable.Map topics,                                               StorageLevel storageLevel)

ssc:StreamingContext 对象

zkQuorum:zookeeper法定人数

groupId:消费者group id

topics:返回用于消费topics 的map

storageLever:Storage level用于存储接收的对象

KafkaUtilsAPI有另一个方法createDirectStream,用于创建一个输入流,直接从Kafka Brokers拉取消息,而不使用任何接收器。这个流可以保证来自一次转换中Kafka的每个消息。

示例应用程序。 要编译应用程序,请下载并安装”sbt”,scala构建工具(类似于maven)。主要应用程序代码如下所示:

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount ")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}

18.5 建立脚本

spark-kafka集成取决于spark,spark流和spark Kafka集成jar。 创建一个新文件”build.sbt”并指定应用程序详细信息及其依赖关系。 “sbt”将在编译和打包应用程序时下载必要的jar:

name := "Spark Kafka Project"version := "1.0"scalaVersion := "2.10.5"libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

18.6 编译/打包

运行以下命令以编译和打包应用程序的jar文件。将jar文件提交到spark控制台以运行应用程序: sbt package

18.7 提交到Spark

启动kafka 生产者CLI,创建一个新的topic叫”myfirst-topic”提供一些例子信息如下:

Anotherspark test message

在spark 控制台运行如下命令提交应用程序:

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark-kafka-project_2.10-1.0.jar localhost:2181

应用程序输出:

spark console messages ..(Test,1)(spark,1)(another,1)(message,1)spark console message ..

Original: https://blog.51cto.com/u_15685799/5385797
Author: WEL测试
Title: 大数据学习笔记——————-(18)

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/517354/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球