1. TCP Source
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/*
* 需求: 使用netcat工具向9999端口不断的发送数据
* 通过SparkStreaming读取端口数据
*
* 开启服务-发送数据
* nc -l -p 9999
*
* 监听服务-消费数据
* nc IP 9999
*
* TODO 通过读取 TCP 创建DStream
* API:
* socketTextStream(hostname,port,storageLevel):ReceiverInputDStream[String]
* 功能:
* 从 TCP 源中创建 DStream,TCP Socket中的数据将以(UTF8 和\n为分隔符)的形式被保存
* 参数:
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel 存储级别
* 返回值:
* ReceiverInputDStream[String]
*
* */
object ReadTCP extends App {
//1.初始化 Spark 配置信息
private val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ReadTCP")
//2.初始化 StreamingContext(并指定采集周期)
private val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(4))
//3.通过监控socket端口创建DStream
private var ds: ReceiverInputDStream[String] = ssc.socketTextStream("gaocun", 9999)
//打印 DStream的元素(指定打印元素个数)
ds.print(2)
//Start the execution of the streams
ssc.start()
//等待执行结束(在执行过程中 任何的异常都会触发程序结束)
ssc.awaitTermination()
}
2. HDFS Source
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/*
* 需求:
* 监控 HDFS目录下的新文件(新增、更新) 并获取文件内容
*
* 相关HDFS命令
* #上传文件(文件存在 则报错)
* hadoop fs -put sanguo.txt /sparkstreaming/5.txt
* #追加文件
* hadoop fs -appendToFile sanguo.txt /sparkstreaming/5.txt
* #覆盖文件
* hadoop fs -copyFromLocal -f sanguo.txt /sparkstreaming/5.txt
*
* TODO 通过读取 HDFS目录下文件变化 创建DStream
* API:
* textFileStream(directory: String): DStream[String]
* 功能:
* 监控 HDFS目录下的新文件(新增、更新),并读取文本文件
* 参数:
* @param directory 指定 HDFS目录
* 注意:
* 1. 监控的文件格式 必须是 文本文件
* 2. 文件追加操作后,获取的是整个文件的内容
*
* */
object ReadHDFSFile extends App {
val sparkConf = new SparkConf().setMaster("local[4]").setAppName("HdfsWordCount")
private val ssc = new StreamingContext(sparkConf, Seconds(4))
val inputPath = "hdfs://gaocun:8020//sparkstreaming"
private val lines: DStream[String] = ssc.textFileStream(inputPath)
lines.print(10)
ssc.start()
ssc.awaitTermination()
}
3. Queue Source
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import scala.collection.mutable
/*
* 需求:
* 监控 Queue中的RDD 并获取新增RDD
*
* TODO 通过读取 RDD的Queue 创建DStream
* API:
* queueStream(directory: String, oneAtATime: Boolean = true): InputDStream[T]
* 功能:
* 监控 Queue中新增的RDD ,获取采集周期内新增的RDD
* 参数:
* @param queue 指定队列
* @param oneAtATime 是否在每个采集周期内只消费一个RDD (True-是,False-否)
* 返回值:
* InputDStream
* */
object CreateDSteamByQueue extends App {
//创建 上下文对象
val sparkConf = new SparkConf().setMaster("local[4]").setAppName("CreateDSteamByQueue")
private val ssc = new StreamingContext(sparkConf, Seconds(4))
//创建 RDD 队列
private val rddQueue = new mutable.Queue[RDD[String]]()
//创建 QueueInputDStream
private val inputStream: InputDStream[String] = ssc.queueStream(rddQueue, oneAtATime = false)
//打印结果
inputStream.print()
//Start the execution of the streams.
ssc.start()
//循环往 queue 中写入数据
while (true) {
val rdd1: RDD[String] = ssc.sparkContext.parallelize(List("赵", "钱"))
val rdd2: RDD[String] = ssc.sparkContext.parallelize(List("赵1", "钱2"))
rddQueue.enqueue(rdd1, rdd2)
Thread.sleep(2000)
}
//Wait for the execution to stop
ssc.awaitTermination()
}
4. Kafka Source
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategy, KafkaUtils, LocationStrategy}
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
/*
* 需求:
* 监控 Kafka指定Topic 并创建DStream
*
* TODO 通过 指定的Kafka Topic/partition 创建DStream
* API:
* createDirectStream[K, V](ssc: StreamingContext,
* locationStrategy: LocationStrategy,
* consumerStrategy: ConsumerStrategy[K, V]
* ): InputDStream[ConsumerRecord[K, V]]
* 参数:
* @ssc StreamingContext
* @locationStrategy : 指定kafka消息位置策略
* @consumerStrategy : 指定kafka消费者策略
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
*
* 返回值:
* InputDStream[ConsumerRecord[K, V]]
*
* TODO 关于 ConsumerRecord[K, V]
* 功能:
* 用来封装 Kafka的记录,包括下面几个属性
* topic
* partition => 分区编号
* offset => 记录在对应Kafka分区中的偏移量
* timestamp
* checksum => 完整记录的校验和(CRC32)
* serializedKeySize => key 的长度
* serializedValueSize => value 的长度
* key => key的内容(允许为空)
* value => 记录的内容
*
* TODO 发送消息
* kafka-console-producer.sh \
* --bootstrap-server worker:9092 \
* --topic 20220707
*
* */
object ReadKafka extends App {
val sparkConf = new SparkConf().setMaster("local[4]").setAppName("HdfsWordCount")
//Create the context
private val ssc = new StreamingContext(sparkConf, Milliseconds(2000))
//设置 Kafka参数
val kafkaParams: Map[String, Object] = Map[String, Object](
"bootstrap.servers" -> "120.48.114.124:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "test-consumer-group",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//4.读取 Kafka 数据创建 DStream
val topics = Array("20220707")
val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
//5.打印 ConsumerRecord 相关内容
private val dstream: DStream[String] = stream.map(
(e: ConsumerRecord[String, String]) => {
println("headers:" + e.headers())
println("leaderEpoch:" + e.leaderEpoch())
println("offset:" + e.offset())
println("partition:" + e.partition())
println("timestamp:" + e.timestamp())
println("topic:" + e.topic())
println("key:" + e.key())
println("value:" + e.value())
println("serializedKeySize:" + e.serializedKeySize())
println("serializedValueSize:" + e.serializedValueSize())
e.toString
}
)
dstream.print()
ssc.start()
ssc.awaitTermination()
}
5. 自定义 Receiver
import com.dxm.sparkstreaming.utils.CommonUtils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
/*
* 需求 : 根据数据源 自定义数据采集器
*
* TODO 自定义 Receiver API
* 功能:
* 接收器(数据采集器) 的抽象类,可以运行在工作节点上接收外部数据
* 实现方法:
* // 启动负责接收数据的线程
* onStart() {
* // 采集数据的代码,必须启动新的线程来执行,应为 onStart方法是非阻塞的
* // 采集的数据 可以调用 store(data)方法 将采集的数据存储到Spark内存中
*
* }
* // 停止负责接收数据的线程
* onStop()
* 实现步骤:
* 1. 继承Receiver,定义泛型(采集数据类型),传递参数(指定采集数据后的存储基本)
* 2. 实现 onStart、onStop 方法
*
* 官网链接: https://spark.apache.org/docs/latest/streaming-custom-receivers.html
*
*
*
* */
// 自定义采集器
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {
private var flag = true
override def onStart(): Unit = {
new Thread() {
override def run(): Unit = {
while (flag) {
var message: String = s"采集数据为:${System.currentTimeMillis()}"
store(message)
Thread.sleep(500)
}
}
}.start()
}
override def onStop(): Unit = flag = false
}
// 使用自定义采集器
object CreateDSteamByCustomerReceiver extends App {
//创建 上下文对象
private val ssc: StreamingContext = CommonUtils.getSparkSession("CreateDSteamByCustomerReceiver")
private val inputstream: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver)
inputstream.print()
//Start the execution of the streams.
ssc.start()
//Wait for the execution to stop
ssc.awaitTermination()
}
6. Receiver SourceCode
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets
import com.dxm.sparkstreaming.utils.CommonUtils
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
/*
* TODO 通过阅读 socketTextStream 源码,熟悉 SparkStreaming 数据采集器的执行原理
* 需求:
* 监控 TCP协议下的某个端口号,并获取他的内容
*
* 官网案例:
* https://spark.apache.org/docs/latest/streaming-custom-receivers.html
*
* */
class CustomSocketReceiver(host: String, port: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() {
receive()
}
}.start()
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// Connect to host:port
socket = new Socket(host, port)
// Until stopped or connection broken continue reading
val reader = new BufferedReader(
new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
userInput = reader.readLine()
while (!isStopped && userInput != null) {
store(userInput)
userInput = reader.readLine()
}
reader.close()
socket.close()
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
// restart if could not connect to server
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
// restart if there is any other error
restart("Error receiving data", t)
}
}
}
object UserCustomSocketReceiver extends App {
//创建 上下文对象
private val ssc: StreamingContext = CommonUtils.getSparkSession("CustomSocketReceiver")
private val inputstream: ReceiverInputDStream[String] = ssc.receiverStream(new CustomSocketReceiver("gaocun", 9999))
inputstream.print()
//Start the execution of the streams.
ssc.start()
//Wait for the execution to stop
ssc.awaitTermination()
}
Original: https://www.cnblogs.com/bajiaotai/p/16563233.html
Author: 学而不思则罔!
Title: 2_Spark Streaming 数据接收器
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/683546/
转载文章受原作者版权保护。转载请注明原作者出处!