2_Spark Streaming 数据接收器

1. TCP Source

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

/*
* 需求: 使用netcat工具向9999端口不断的发送数据
*         通过SparkStreaming读取端口数据
*
* 开启服务-发送数据
*     nc -l -p 9999
*
* 监听服务-消费数据
*     nc IP 9999
*
* TODO 通过读取 TCP 创建DStream
*    API:
*       socketTextStream(hostname,port,storageLevel):ReceiverInputDStream[String]
*    功能:
*       从 TCP 源中创建 DStream,TCP Socket中的数据将以(UTF8 和\n为分隔符)的形式被保存
*    参数:
*        @param hostname      Hostname to connect to for receiving data
*        @param port          Port to connect to for receiving data
*        @param storageLevel  存储级别
*    返回值:
*       ReceiverInputDStream[String]
*
* */

object ReadTCP extends App {
  //1.初始化 Spark 配置信息
  private val sparkConf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("ReadTCP")

  //2.初始化 StreamingContext(并指定采集周期)
  private val ssc: StreamingContext = new StreamingContext(sparkConf, Seconds(4))

  //3.通过监控socket端口创建DStream
  private var ds: ReceiverInputDStream[String] = ssc.socketTextStream("gaocun", 9999)

  //打印 DStream的元素(指定打印元素个数)
  ds.print(2)

  //Start the execution of the streams
  ssc.start()
  //等待执行结束(在执行过程中 任何的异常都会触发程序结束)
  ssc.awaitTermination()
}

2. HDFS Source

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/*
* 需求:
*    监控 HDFS目录下的新文件(新增、更新) 并获取文件内容
*
* 相关HDFS命令
*    #上传文件(文件存在 则报错)
*    hadoop fs -put sanguo.txt /sparkstreaming/5.txt
*    #追加文件
*    hadoop fs -appendToFile sanguo.txt /sparkstreaming/5.txt
*    #覆盖文件
*    hadoop fs -copyFromLocal -f sanguo.txt /sparkstreaming/5.txt
*
* TODO 通过读取 HDFS目录下文件变化 创建DStream
*    API:
*       textFileStream(directory: String): DStream[String]
*    功能:
*       监控 HDFS目录下的新文件(新增、更新),并读取文本文件
*    参数:
*        @param directory 指定 HDFS目录
*    注意:
*        1. 监控的文件格式 必须是 文本文件
*        2. 文件追加操作后,获取的是整个文件的内容
*
* */

object ReadHDFSFile extends App {
  val sparkConf = new SparkConf().setMaster("local[4]").setAppName("HdfsWordCount")

  private val ssc = new StreamingContext(sparkConf, Seconds(4))

  val inputPath = "hdfs://gaocun:8020//sparkstreaming"

  private val lines: DStream[String] = ssc.textFileStream(inputPath)

  lines.print(10)

  ssc.start()
  ssc.awaitTermination()
}

3. Queue Source

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream

import scala.collection.mutable

/*
* 需求:
*    监控 Queue中的RDD 并获取新增RDD
*
* TODO 通过读取 RDD的Queue 创建DStream
*    API:
*       queueStream(directory: String, oneAtATime: Boolean = true): InputDStream[T]
*    功能:
*       监控 Queue中新增的RDD ,获取采集周期内新增的RDD
*    参数:
*        @param queue 指定队列
*        @param oneAtATime 是否在每个采集周期内只消费一个RDD (True-是,False-否)
*    返回值:
*        InputDStream
* */

object CreateDSteamByQueue extends App {
  //创建 上下文对象
  val sparkConf = new SparkConf().setMaster("local[4]").setAppName("CreateDSteamByQueue")
  private val ssc = new StreamingContext(sparkConf, Seconds(4))

  //创建 RDD 队列
  private val rddQueue = new mutable.Queue[RDD[String]]()

  //创建 QueueInputDStream
  private val inputStream: InputDStream[String] = ssc.queueStream(rddQueue, oneAtATime = false)

  //打印结果
  inputStream.print()

  //Start the execution of the streams.

  ssc.start()

  //循环往 queue 中写入数据
  while (true) {
    val rdd1: RDD[String] = ssc.sparkContext.parallelize(List("赵", "钱"))
    val rdd2: RDD[String] = ssc.sparkContext.parallelize(List("赵1", "钱2"))
    rddQueue.enqueue(rdd1, rdd2)
    Thread.sleep(2000)
  }

  //Wait for the execution to stop
  ssc.awaitTermination()
}

4. Kafka Source

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategy, KafkaUtils, LocationStrategy}
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

/*
* 需求:
*    监控 Kafka指定Topic 并创建DStream
*
* TODO 通过 指定的Kafka Topic/partition 创建DStream
*    API:
*       createDirectStream[K, V](ssc: StreamingContext,
*                               locationStrategy: LocationStrategy,
*                               consumerStrategy: ConsumerStrategy[K, V]
*                               ): InputDStream[ConsumerRecord[K, V]]
*    参数:
*        @ssc StreamingContext
*        @locationStrategy : 指定kafka消息位置策略
*        @consumerStrategy : 指定kafka消费者策略
*        @tparam K type of Kafka message key
*        @tparam V type of Kafka message value
*
*    返回值:
*        InputDStream[ConsumerRecord[K, V]]
*
* TODO 关于 ConsumerRecord[K, V]
*    功能:
*       用来封装 Kafka的记录,包括下面几个属性
*           topic
*           partition => 分区编号
*           offset => 记录在对应Kafka分区中的偏移量
*           timestamp
*           checksum => 完整记录的校验和(CRC32)
*           serializedKeySize => key 的长度
*           serializedValueSize => value 的长度
*           key => key的内容(允许为空)
*           value => 记录的内容
*
* TODO 发送消息
*   kafka-console-producer.sh \
*   --bootstrap-server worker:9092 \
*   --topic 20220707
*
* */

object ReadKafka extends App {
  val sparkConf = new SparkConf().setMaster("local[4]").setAppName("HdfsWordCount")

  //Create the context
  private val ssc = new StreamingContext(sparkConf, Milliseconds(2000))

  //设置 Kafka参数
  val kafkaParams: Map[String, Object] = Map[String, Object](
    "bootstrap.servers" -> "120.48.114.124:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "test-consumer-group",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
  )

  //4.读取 Kafka 数据创建 DStream
  val topics = Array("20220707")
  val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
  )

  //5.打印 ConsumerRecord 相关内容
  private val dstream: DStream[String] = stream.map(
    (e: ConsumerRecord[String, String]) => {
      println("headers:" + e.headers())
      println("leaderEpoch:" + e.leaderEpoch())
      println("offset:" + e.offset())
      println("partition:" + e.partition())
      println("timestamp:" + e.timestamp())
      println("topic:" + e.topic())
      println("key:" + e.key())
      println("value:" + e.value())
      println("serializedKeySize:" + e.serializedKeySize())
      println("serializedValueSize:" + e.serializedValueSize())
      e.toString
    }
  )
  dstream.print()

  ssc.start()
  ssc.awaitTermination()
}

5. 自定义 Receiver

import com.dxm.sparkstreaming.utils.CommonUtils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

/*
*  需求 : 根据数据源 自定义数据采集器
*
*  TODO 自定义 Receiver API
*     功能:
*         接收器(数据采集器) 的抽象类,可以运行在工作节点上接收外部数据
*     实现方法:
*         // 启动负责接收数据的线程
*         onStart() {
*             // 采集数据的代码,必须启动新的线程来执行,应为 onStart方法是非阻塞的
*             // 采集的数据 可以调用 store(data)方法 将采集的数据存储到Spark内存中
*
*         }
*         // 停止负责接收数据的线程
*         onStop()
*     实现步骤:
*         1. 继承Receiver,定义泛型(采集数据类型),传递参数(指定采集数据后的存储基本)
*         2. 实现 onStart、onStop 方法
*
* 官网链接: https://spark.apache.org/docs/latest/streaming-custom-receivers.html
*
*
*
* */

// 自定义采集器
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {
  private var flag = true

  override def onStart(): Unit = {
    new Thread() {
      override def run(): Unit = {
        while (flag) {
          var message: String = s"采集数据为:${System.currentTimeMillis()}"
          store(message)
          Thread.sleep(500)
        }
      }
    }.start()

  }

  override def onStop(): Unit = flag = false
}

// 使用自定义采集器
object CreateDSteamByCustomerReceiver extends App {
  //创建 上下文对象
  private val ssc: StreamingContext = CommonUtils.getSparkSession("CreateDSteamByCustomerReceiver")

  private val inputstream: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver)

  inputstream.print()

  //Start the execution of the streams.

  ssc.start()

  //Wait for the execution to stop
  ssc.awaitTermination()
}

6. Receiver SourceCode

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets

import com.dxm.sparkstreaming.utils.CommonUtils
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

/*
* TODO 通过阅读 socketTextStream 源码,熟悉 SparkStreaming 数据采集器的执行原理
* 需求:
*    监控 TCP协议下的某个端口号,并获取他的内容
*
* 官网案例:
*     https://spark.apache.org/docs/latest/streaming-custom-receivers.html
*
* */

class CustomSocketReceiver(host: String, port: Int)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() {
        receive()
      }
    }.start()
  }

  def onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself if isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
    var socket: Socket = null
    var userInput: String = null
    try {
      // Connect to host:port
      socket = new Socket(host, port)

      // Until stopped or connection broken continue reading
      val reader = new BufferedReader(
        new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
      userInput = reader.readLine()
      while (!isStopped && userInput != null) {
        store(userInput)
        userInput = reader.readLine()
      }
      reader.close()
      socket.close()

      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again")
    } catch {
      case e: java.net.ConnectException =>
        // restart if could not connect to server
        restart("Error connecting to " + host + ":" + port, e)
      case t: Throwable =>
        // restart if there is any other error
        restart("Error receiving data", t)
    }
  }
}

object UserCustomSocketReceiver extends App {
  //创建 上下文对象
  private val ssc: StreamingContext = CommonUtils.getSparkSession("CustomSocketReceiver")

  private val inputstream: ReceiverInputDStream[String] = ssc.receiverStream(new CustomSocketReceiver("gaocun", 9999))

  inputstream.print()

  //Start the execution of the streams.

  ssc.start()

  //Wait for the execution to stop
  ssc.awaitTermination()
}

Original: https://www.cnblogs.com/bajiaotai/p/16563233.html
Author: 学而不思则罔!
Title: 2_Spark Streaming 数据接收器

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/683546/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

  • cgroup-v1在android中的应用实现浅析

    404. 抱歉,您访问的资源不存在。 可能是网址有误,或者对应的内容被删除,或者处于私有状态。 代码改变世界,联系邮箱 contact@cnblogs.com 园子的商业化努力-困…

    技术杂谈 2023年7月10日
    055
  • 【证券从业】金融基础知识-第二章 中国金融体系与多层次资本市场02

    注1:后续学习并整理到第八章,全书完结后再合并成一个笔记进行源文件分享 注2:本章内容巨多,大约分为两篇文章记录消化 posted @2022-05-31 22:14 陈景中 阅读…

    技术杂谈 2023年7月10日
    0104
  • Windows的三种坐标系:屏幕坐标系,非客户区坐标系,客户区坐标系

    屏幕坐标系:以屏幕的左上角为原点,如图所示GetWindowRect() 函数获得的 RECT 就是以屏幕坐标系算的。 非客户区坐标系(窗口坐标系)包括标题栏的部分。GetWind…

    技术杂谈 2023年5月31日
    078
  • perf火炬图【转】

    转自:https://www.cnblogs.com/linhaostudy/p/16098864.html 正文回到顶部 1、perf命令简要介绍 1.1 perf采集数据 让我…

    技术杂谈 2023年5月31日
    096
  • 特殊数表

    来自 command_block,为了适应 cnblogs 做了一些改动 . 以下是原文: 包括负数的二项式系数 (杨辉三角) ( \def\tinyS #1#2{\tiny\be…

    技术杂谈 2023年7月23日
    067
  • 3-在Django中使用使用数据库

    数据库设置 在上一章节中学习了如何创建Django项目,在Django项目中创建web应用,以及如何在Django主程序的URL中引用web应用中的URL。下面来了解如何在Djan…

    技术杂谈 2023年7月10日
    060
  • 技术管理进阶——什么是管理者之体力、脑力、心力

    原创不易,求分享、求一键三连 最近有个粉丝问了一个很有意思的问题: 小钗,他们常说的心力、体力、脑力,到底是什么呢? 事实上,我也没有对三力进行过系统性的研究,但稍加思考,竟然有一…

    技术杂谈 2023年6月1日
    0104
  • SpringCloud 学习总结

    SpringCloud 学习总结 学习回顾 1.Maven依赖管理 ​ 在微服务项目中,我们一般是先创建一个父项目模块对整个项目的依赖进行版本限定和依赖控制,子模块继承父模块后,不…

    技术杂谈 2023年6月21日
    092
  • cron表达式详解

    Cron表达式是一个字符串,字符串以5或6个空格隔开,分为6或7个域,每一个域代表一个含义,Cron有如下两种语法格式: (1) Seconds Minutes Hours Day…

    技术杂谈 2023年5月30日
    097
  • JavaDoc

    ; ; JavaDoc javadoc命令是用来生成自己API文档的。 参数信息: * @author 作者名 @version 版本号 @sinse 指明需要最早使用的JDK版本…

    技术杂谈 2023年6月21日
    098
  • bootstrap响应式前端页面

    bootstrap响应式学习参考源码,代码主要是通过bootstrap实现了响应式布局,简单易懂。 html;gutter:true 一、项目目录</p> <pr…

    技术杂谈 2023年5月31日
    088
  • Scanner的进阶使用及如何查看idea中java的类库

    Scanner类进阶使用与idea如何查看类库 代码演示 这是演示一个数字的判定 import java.util.Scanner; public class Demo04 { p…

    技术杂谈 2023年6月21日
    0108
  • 用户身份标识与账号体系实践

    互联网的账号自带备忘机制; 一、业务背景 通常在系统研发的过程中,需要不断适配各种业务场景,扩展服务的领域和能力,一般会将构建的产品矩阵划分出多条业务线,以便更好的管理; 由于各个…

    技术杂谈 2023年7月23日
    062
  • Hadoop集群搭建的详细过程

    Hadoop集群搭建 一、准备 三台虚拟机:master01,node1,node2 时间同步 1.date&#x547D;&#x4EE4;&#x67E5;…

    技术杂谈 2023年7月11日
    090
  • VMware 虚拟机图文安装和配置 Ubuntu Server 22.04 LTS 教程

    前言:本文将以 Ubuntu Server 22.04 LTS 为例,说明在 VMware 虚拟机中的安装和配置 Linux 操作系统的步骤。 一、VMWare 安装配置 二、Ub…

    技术杂谈 2023年7月11日
    0109
  • hdu 1845

    一看题意就是二分匹配问题,建边是双向的,两个集合都是n个点 这题的图很特殊,每个点都要与三个点相连,在纸上画了六个点的图就感觉此图最大匹配肯定是六,除以2就是原图的匹配了,就感觉这…

    技术杂谈 2023年6月1日
    089
亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球