Spark Streaming应用启动过程分析

本文为SparkStreaming源码剖析的第三篇,主要分析SparkStreaming启动过程。
在调用 StreamingContext.start方法后,进入 JobScheduler.start方法中,各子元素 start方法的调用顺序如下:

<span class="hljs-keyword">private</span> <span class="hljs-keyword">var</span> eventLoop : EventLoop[JobSchedulerEvent] = <span class="hljs-keyword">null</span>
val listenerBus = <span class="hljs-keyword">new</span> StreamingListenerBus()
<span class="hljs-keyword">private</span> val jobGenerator = <span class="hljs-keyword">new</span> JobGenerator(<span class="hljs-keyword">this</span>)

eventLoop.start
listenerBus.start(ssc.sparkContext)
receiverTracker = <span class="hljs-keyword">new</span> ReceiverTracker(ssc)
inputInfoTracker = <span class="hljs-keyword">new</span> InputInfoTracker(ssc)
receiverTracker.start()
jobGenerator.start()

时序图如下:

Spark Streaming应用启动过程分析

在eventLoop, listenerBus以及jobGenerator中都维持了一个事件队列,以多线程的形式从这些队列中取出事件并处理。一般来说,运行一个生产者消费者系统时, 往往先开始运行的是消费者。所以在上面的代码中,越是早start的对象,越不是Spark Streaming启动事件的入口。理解了这段话对于理解后续的启动过程分析是有帮助的。无法理解的话也可以先理解后续分析再回头想想这一点。

接下来分析上图中的主要对象。

一、JobGenerator类

JobGenerator的构造方法如下,使用到了前面提到的JobScheduler对象。

<span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">JobGenerator</span><span class="hljs-params">(jobScheduler: JobScheduler)</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">Logging</span></span>

进入JobGenerator类。可以看到其 start方法与JobScheduler的start方法结构十分类似。在这里面也有一个EventLoop类型的eventLoop对象,只不过这个对象传入的是JobGeneratorEvent类型的事件。

eventLoop = new EventLoop[JobGeneratorEvent](<span class="hljs-string">"JobGenerator"</span>) {
  override protected <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">onReceive</span> <span class="hljs-params">(event: JobGeneratorEvent)</span>:</span> Unit = processEvent(event)

  override protected <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">onError</span> <span class="hljs-params">(e: Throwable )</span>:</span> Unit = {
    jobScheduler.reportError(<span class="hljs-string">"Error in job generator"</span> , e)
  }
}
eventLoop.start()

1、eventLoop处理事件

看一眼JobGeneratorEvent,发现JobGenerator中的eventLoop主要处理的是Job生成,metadata以及checkpoint相关的事件。

<span class="hljs-keyword">private</span>[scheduler] <span class="hljs-keyword">sealed</span> <span class="hljs-class"><span class="hljs-keyword">trait</span> <span class="hljs-title">JobGeneratorEvent</span></span>

<span class="hljs-keyword">private</span> [scheduler] <span class="hljs-class"><span class="hljs-keyword">case</span> <span class="hljs-keyword">class</span> <span class="hljs-title">GenerateJobs</span><span class="hljs-params">(time: Time)</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">JobGeneratorEvent</span></span>

<span class="hljs-keyword">private</span> [scheduler] <span class="hljs-class"><span class="hljs-keyword">case</span> <span class="hljs-keyword">class</span> <span class="hljs-title">ClearMetadata</span><span class="hljs-params">(time: Time)</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">JobGeneratorEvent</span></span>

<span class="hljs-keyword">private</span> [scheduler] <span class="hljs-class"><span class="hljs-keyword">case</span> <span class="hljs-keyword">class</span> <span class="hljs-title">DoCheckpoint</span><span class="hljs-params">(
    time: Time, clearCheckpointDataLater: Boolean)</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">JobGeneratorEvent</span></span>

<span class="hljs-keyword">private</span> [scheduler] <span class="hljs-class"><span class="hljs-keyword">case</span> <span class="hljs-keyword">class</span> <span class="hljs-title">ClearCheckpointData</span><span class="hljs-params">(time: Time)</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">JobGeneratorEvent</span></span>

JobGeneratorEvent对象开始执行时,会多线程启动 eventLoop对象通过执行 JobGenerator.processEvent方法处理JobGenerator事件。
看一下 JobGenerator.processEvent方法中调用的 JobGenerator.generateJobs方法是如何处理 GenerateJobs事件的。

<span class="hljs-keyword">private</span> def generateJobs (<span class="hljs-built_in">time</span>: Time) {
  Try {

    jobScheduler.receiverTracker.allocateBlocksToBatch(<span class="hljs-built_in">time</span>)

    graph.generateJobs(<span class="hljs-built_in">time</span>)
  } match {

    <span class="hljs-keyword">case</span> Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(<span class="hljs-built_in">time</span>)
      jobScheduler.submitJobSet(JobSet(<span class="hljs-built_in">time</span>, jobs, streamIdToInputInfos))

    <span class="hljs-keyword">case</span> Failure(e) =>
      jobScheduler.reportError(<span class="hljs-string">"Error generating jobs for time "</span> + <span class="hljs-built_in">time</span> , e)
  }

  eventLoop.<span class="hljs-built_in">post</span>(DoCheckpoint(<span class="hljs-built_in">time</span>, clearCheckpointDataLater = <span class="hljs-constant">false</span>))
}

有关 ReceiverTracker.allocateBlocksToBatch方法的执行逻辑,可以参考前面有关ReceiverTracker的部分。

2、eventLoop接收事件

在JobGenerator类中有一个RecurringTimer类型的timer对象,这个对象以设置的batch duration定时往eventLoop中推送GenerateJobs事件,这样前面这个代码片段中的processEvent方法就可以处理这些事件了。

<span class="hljs-keyword">private</span> val timer = <span class="hljs-built_in">new</span> RecurringTimer(clock , ssc.graph.batchDuration.<span class="hljs-built_in">milliseconds</span> ,
  longTime => eventLoop.<span class="hljs-built_in">post</span>(GenerateJobs(<span class="hljs-built_in">new</span> Time(longTime))) , <span class="hljs-string">"JobGenerator"</span>)

另外,还可以看到,在 JobGenerator.clearMetadata方法中,有提交检查点事件;在 JobGenerator.onBatrchCompletion方法中,有提交清除metadata事件;在 JobGenerator.onCheckpointCompletion方法中,有提交清除检查点数据事件。

JobGenerator对象中的 eventQueue生成和处理事件的流程图如下所示:

Spark Streaming应用启动过程分析

二、EventLoop[JobSchedulerEvent]类

1、JobSchedulerEvent类型

我们看一下eventLoop对象接收的事件类型JobSchedulerEvent都包含哪些,从下面代码中可以看出eventLoop对象主要是用来处理Job相关事件的。

<span class="hljs-keyword">private</span>[scheduler] <span class="hljs-keyword">sealed</span> <span class="hljs-class"><span class="hljs-keyword">trait</span> <span class="hljs-title">JobSchedulerEvent</span></span>

<span class="hljs-keyword">private</span> [scheduler] <span class="hljs-class"><span class="hljs-keyword">case</span> <span class="hljs-keyword">class</span> <span class="hljs-title">JobStarted</span><span class="hljs-params">(job: Job , startTime: Long)</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">JobSchedulerEvent</span></span>

<span class="hljs-keyword">private</span> [scheduler] <span class="hljs-class"><span class="hljs-keyword">case</span> <span class="hljs-keyword">class</span> <span class="hljs-title">JobCompleted</span><span class="hljs-params">(job: Job , completedTime: Long)</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">JobSchedulerEvent</span></span>

<span class="hljs-keyword">private</span> [scheduler] <span class="hljs-class"><span class="hljs-keyword">case</span> <span class="hljs-keyword">class</span> <span class="hljs-title">ErrorReported</span><span class="hljs-params">(msg: String , e: Throwable)</span> <span class="hljs-keyword">extends</span> <span class="hljs-title">JobSchedulerEvent</span></span>

2、事件队列eventQueue对象

(1)eventQueue处理事件
EventLoop类中可以看到,里面维持了一个 LinkedBlockingDeque类型的 eventQueue事件队列,接收到的事件都存在该队列中。
当执行其 start方法时,会多线程的执行 EventLoop中的 run方法。看一下其主要逻辑,

val <span class="hljs-keyword">event</span> = eventQueue.take()
<span class="hljs-keyword">try</span> {
  onReceive(<span class="hljs-keyword">event</span>)
}

eventQueue中取出事件,调用 EventLoop对象在 JobScheduler中被重写的 onReceive方法,最终进入 JobScheduler.processEvent方法中。根据取出的不同事件类型,分别执行不同的逻辑。

private def processEvent (<span class="hljs-attribute">event</span>: JobSchedulerEvent) {
  <span class="hljs-keyword">try</span> {
    event match {
      <span class="hljs-regexp">//</span> &#x5904;&#x7406;Job&#x5F00;&#x59CB;&#x4E8B;&#x4EF6;
      <span class="hljs-reserved">case</span> JobStarted<span class="hljs-function"><span class="hljs-params">(job , startTime)</span> =></span> handleJobStart(job , startTime)
      <span class="hljs-regexp">//</span> &#x5904;&#x7406;Job&#x5B8C;&#x6210;&#x4E8B;&#x4EF6;
      <span class="hljs-reserved">case</span> JobCompleted<span class="hljs-function"><span class="hljs-params">(job , completedTime)</span> =></span> handleJobCompletion(job , completedTime)
      <span class="hljs-regexp">//</span> &#x5904;&#x7406;Error&#x4E8B;&#x4EF6;
      <span class="hljs-reserved">case</span> ErrorReported<span class="hljs-function"><span class="hljs-params">(m , e)</span> =></span> handleError(m, e)
    }
  } <span class="hljs-keyword">catch</span> {
    <span class="hljs-reserved">case</span> <span class="hljs-attribute">e</span>: Throwable<span class="hljs-function"> =></span>
      reportError(<span class="hljs-string">"Error in job scheduler"</span> , e)
  }
}

继续进入 JobScheduler.handleJobStart方法。从这里看到, EventLoop取出对应事件后,最终是通过向 listenerBus对象中 post一个 event作进一步处理的。有关这个 listenerBus,可以参考下一节 StreamingListenerBus类的分析。

private def handleJobStart (job: Job , startTime: Long) {
  val jobSet = jobSets<span class="hljs-preprocessor">.get</span>(job<span class="hljs-preprocessor">.time</span>)
  val isFirstJobOfJobSet = !jobSet<span class="hljs-preprocessor">.hasStarted</span>
  jobSet<span class="hljs-preprocessor">.handleJobStart</span>(job)
  if (isFirstJobOfJobSet) {
    // <span class="hljs-string">"StreamingListenerBatchStarted"</span> should be posted after calling <span class="hljs-string">"handleJobStart"</span> to get the
    // correct <span class="hljs-string">"jobSet.processingStartTime"</span>.

    listenerBus<span class="hljs-preprocessor">.post</span>(StreamingListenerBatchStarted(jobSet<span class="hljs-preprocessor">.toBatchInfo</span>))
  }
  job<span class="hljs-preprocessor">.setStartTime</span>(startTime)
  listenerBus<span class="hljs-preprocessor">.post</span>(StreamingListenerOutputOperationStarted(job<span class="hljs-preprocessor">.toOutputOperationInfo</span>))
  logInfo(<span class="hljs-string">"Starting job "</span> + job<span class="hljs-preprocessor">.id</span> + <span class="hljs-string">" from job set of time "</span> + jobSet<span class="hljs-preprocessor">.time</span>)
}

(2)eventQueue生成事件
EventLoop是从 eventQueue中取出事件,那么往 eventQueue队列中存入事件的是谁?
JobScheduler类中的私有类 JobHandlerrun方法中可以看到,这里有 _eventLoop.post(JobStarted(job, clock.getTimeMillis())以及 _eventLoop.post(JobCompleted(job, clock.getTimeMillis())方法,分别往 eventQueue队列中存入 JobStartedJobCompleted方法。

eventQueue中的事件生成和处理流程图如下。

Spark Streaming应用启动过程分析

那么,我们只需要关注这个JobHandler.run方法是怎么执行起来的就行了。从下面的方法链中可以看到往EventLoop中提交JobSchedulerEvent的入口。从前面可以看到JobGenerator.start方法也是在JobScheduler.start方法中被执行起来的。

JobGenerator.start
-<span class="ruby">---><span class="hljs-constant">JobGenerator</span>.processEvent
</span>-<span class="ruby">-------><span class="hljs-constant">JobGenerator</span>.generateJobs
</span>-<span class="ruby">-----------><span class="hljs-constant">JobScheduler</span>.submitJobSet
</span>-<span class="ruby">---------------><span class="hljs-constant">JobSet</span>.jobs.foreach(job => jobExecutor.execute( new <span class="hljs-constant">JobHandler</span>(job)))</span>

JobGenerator中也维持了一个eventLoop对象,只不过这个对象处理的是JobGeneratorEvent事件。对于JobGenerator的进一步分析可以参考第四节。

三、StreamingListenerBus类

StreamingListenerBus是真正对这些不同场景的事件进行分发处理的对象。有关 ListenerBus可以参考Spark-1.6.0之Application运行信息记录器JobProgressListener

1、eventQueue 处理事件

StreamingListenerBus类继承自 AsynchronouseListenerBus,进入 StreamingListenerBus类,可以看到其中有一个 onPostEvent方法,通过接收到不同的 StreamingListenerEvent事件,调用不同的逻辑进行处理不同的事件。

override def onPostEvent (listener: StreamingListener , event: StreamingListenerEvent): Unit = {
  event match {

    <span class="hljs-keyword">case</span> receiverStarted: StreamingListenerReceiverStarted =>
      listener.onReceiverStarted(receiverStarted)

    <span class="hljs-keyword">case</span> receiverError: StreamingListenerReceiverError =>
      listener.onReceiverError(receiverError)

    <span class="hljs-keyword">case</span> receiverStopped: StreamingListenerReceiverStopped =>
      listener.onReceiverStopped(receiverStopped)

    <span class="hljs-keyword">case</span> batchSubmitted: StreamingListenerBatchSubmitted =>
      listener.onBatchSubmitted(batchSubmitted)

    <span class="hljs-keyword">case</span> batchStarted: StreamingListenerBatchStarted =>
      listener.onBatchStarted(batchStarted)

    <span class="hljs-keyword">case</span> batchCompleted: StreamingListenerBatchCompleted =>
      listener.onBatchCompleted(batchCompleted)

    <span class="hljs-keyword">case</span> outputOperationStarted: StreamingListenerOutputOperationStarted =>
      listener.onOutputOperationStarted(outputOperationStarted)

    <span class="hljs-keyword">case</span> outputOperationCompleted: StreamingListenerOutputOperationCompleted =>
      listener.onOutputOperationCompleted(outputOperationCompleted)
    <span class="hljs-keyword">case</span> _ =>
  }
}

那么 StreamingListenerBus是如何工作的呢?看一下其父类 AsynchronousListenerBus,其中有一个 eventQueue对象,

private val EVENT<span class="hljs-emphasis">_QUEUE_</span>CAPACITY = 10000
private val eventQueue = new LinkedBlockingQueue[<span class="hljs-link_label">E</span>](<span class="hljs-link_url">EVENT_QUEUE_CAPACITY</span>)

eventQueue对象用于存储 StreamingListenerEvent事件。这些事件基本上都在上面代码中有描述。

JobScheduler对象中调用 StreamingListenerBus.start多线程启动该对象后,就会在 AsynchronousListenerBus.run方法中从 eventQueue取出事件,并最终调用到上面代码中的 StreamingListenerBus.onPostEvent方法。
具体调用链路如下:

AsynchronousListenerBus.run
-<span class="ruby">---><span class="hljs-constant">ListenerBus</span>.postToAll
</span>-<span class="ruby">-------><span class="hljs-constant">StreamingListenerBus</span>.onPostEvent
</span>-<span class="ruby">-----------><span class="hljs-constant">StreamingJobProgressListener</span>.*</span>

到这里,主要分析了 StreamingListenerBus类中 eventQueue中的事件是如何被后续处理的,那么 eventQueue中的事件是如何生成的呢?

2、eventQueue接收事件

在第一节中 JobScheduler.processEvent方法之后,程序处理逻辑就进入到这里了。在 JobScheduler.processEvent方法中我们已经介绍过如何 JobScheduler.handleJobStart方法了。
根据eventLoop中接收到的不同类型 JobSchedulerEvent,最终调用不同的代码处理不同的事件。下面代码主要处理的是 JobStarted类型事件。

val listenerBus = new StreamingListenerBus()

private def handleJobStart (job: Job , startTime: Long ) {
  val jobSet = jobSets<span class="hljs-preprocessor">.get</span>(job<span class="hljs-preprocessor">.time</span>)
  val isFirstJobOfJobSet = !jobSet<span class="hljs-preprocessor">.hasStarted</span>
  jobSet<span class="hljs-preprocessor">.handleJobStart</span>(job)
  if (isFirstJobOfJobSet) {
    // &#x5F80;StreamingListenerBus&#x5BF9;&#x8C61;&#x7684;eventQueue&#x4E2D;&#x63D0;&#x4EA4;&#x4E8B;&#x4EF6;
    listenerBus<span class="hljs-preprocessor">.post</span>(StreamingListenerBatchStarted(jobSet<span class="hljs-preprocessor">.toBatchInfo</span>))
  }
  job<span class="hljs-preprocessor">.setStartTime</span>(startTime)
  // &#x5F80;StreamingListenerBus&#x5BF9;&#x8C61;&#x7684;eventQueue&#x4E2D;&#x63D0;&#x4EA4;&#x4E8B;&#x4EF6;
  listenerBus<span class="hljs-preprocessor">.post</span>(StreamingListenerOutputOperationStarted(job<span class="hljs-preprocessor">.toOutputOperationInfo</span>))
  logInfo(<span class="hljs-string">"Starting job "</span> + job<span class="hljs-preprocessor">.id</span> + <span class="hljs-string">" from job set of time "</span> + jobSet<span class="hljs-preprocessor">.time</span>)
}

在调用 listenerBus.post方法后,将进入到 AsynchronousListenerBus.post方法.

def post (<span class="hljs-keyword">event</span>: E) {
  <span class="hljs-keyword">if</span> ( stopped.<span class="hljs-keyword">get</span>) {

    logError( s<span class="hljs-string">" $name has already stopped! Dropping event $ event"</span> )
    <span class="hljs-keyword">return</span>
  }

  val eventAdded = eventQueue.offer(<span class="hljs-keyword">event</span>)
  <span class="hljs-keyword">if</span> (eventAdded) {
    eventLock.release()
  } <span class="hljs-keyword">else</span> {
    onDropEvent(<span class="hljs-keyword">event</span>)
  }
}

有关 StreamingListenerBus的处理逻辑如下图所示:

Spark Streaming应用启动过程分析

最终结合JobGenerator, JobScheduler以及StreamingListenerBus的事件流程图如下:

Spark Streaming应用启动过程分析

到这里,通过分析上面三个类型对象我们已经知道了Spark Streaming应用的启动过程。其他Spark应用一般是以一个RDD为源头,经过一系列的Transform和Action操作后,最终通过DAGScheduler、TaskScheduler等组件运行起来(具体可以参考Spark Scheduler模块源码分析之DAGSchedulerSpark Scheduler模块源码分析之TaskScheduler和SchedulerBackend)。

但是对于Spark Streaming应用,需要处理的数据并不是在应用运行起来前所确定的,并且上述对Spark Streaming应用的启动过程分析中也并没有涉及到处理的数据是如何输入的。那么Streaming应用的数据是如何进入应用的呢?请继续分析接下来的ReceiverTracker类。

四、ReceiverTracker类

ReceiverTracker对象在 JobScheduler.start方法中 new出来,随后调用 start方法进入 ReceiverTracker的逻辑。

<span class="hljs-attribute">receiverTracker </span>=<span class="hljs-string"> new ReceiverTracker(ssc)
receiverTracker.start()</span>

ReceiverTracker主要用于处理所有 ReceiverInputDStreams中的 receivers接收数据的逻辑。

1、接收输入数据

(1) ReceiverTracker.start方法
ReceiverTracker.start方法的主要逻辑是调用了 ReceiverTracker.launchReceivers。这个方法处理 receiverInputStreams中的每一个 receiver后,分发到worker节点,启动并运行。

private <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">launchReceivers</span> <span class="hljs-params">()</span>:</span> Unit = {
  val receivers = receiverInputStreams.map(nis => {
    val rcvr = nis.getReceiver()     // &#x5BF9;&#x4E0D;&#x540C;&#x7684;&#x6570;&#x636E;&#x6E90;&#x6709;&#x5176;&#x5177;&#x4F53;&#x5B9E;&#x73B0;
    rcvr.setReceiverId(nis.id)
    rcvr
  })

  // &#x5728;&#x975E;local&#x6A21;&#x5F0F;&#x4E0B;&#xFF0C;&#x8FD0;&#x884C;&#x4E00;&#x6BB5;&#x903B;&#x8F91;&#x8FD0;&#x7B97;&#xFF0C;&#x786E;&#x4FDD;&#x6240;&#x6709;&#x7684;slaves&#x90FD;&#x8D77;&#x6765;&#x540E;&#x518D;&#x7EE7;&#x7EED;&#x6267;&#x884C;&#xFF0C;&#x907F;&#x514D;&#x4E86;&#x5C06;receivers&#x5206;&#x914D;&#x5230;&#x540C;&#x4E00;&#x8282;&#x70B9;&#x4E0A;
  runDummySparkJob()

  logInfo(<span class="hljs-string">"Starting "</span> + receivers.length + <span class="hljs-string">" receivers"</span>)
  // endpoint&#x662F;RpcEndpointRef&#x7C7B;&#x578B;&#xFF0C;&#x901A;&#x8FC7;&#x5B83;&#x5C06;receivers&#x5206;&#x53D1;&#x5230;worker&#x8282;&#x70B9;
  endpoint.send(StartAllReceivers(receivers))
}

( 2)ReceiverTrackerEndpoint.receive方法
endpoint.send方法被调用后,根据传入的对象类型,将进入 ReceiverTrackerEndpoint.receive方法中,处理启动所有 Receivers的事件。

override <span class="hljs-function"><span class="hljs-keyword">def</span> <span class="hljs-title">receive</span> :</span> PartialFunction[Any , Unit] = {
  // &#x5904;&#x7406;StartAllReceivers&#x4E8B;&#x4EF6;
  case StartAllReceivers(receivers) =>
    val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers , getExecutors)
    <span class="hljs-keyword">for</span> (receiver <- receivers) { val executors="scheduledLocations(receiver.streamId)" updatereceiverscheduledexecutors(receiver.streamid, executors) receiverpreferredlocations (receiver.streamid)="receiver.preferredLocation" startreceiver(receiver, } case restartreceiver(receiver)>
  ...

}</->

最后进入 ReceiverTracker.startReceiver方法。

private def startReceiver (
    receiver: Receiver[_],
    scheduledLocations: Seq [TaskLocation]): Unit = {
  <span class="hljs-keyword">...</span>
  // &#x53D6;&#x51FA;&#x6BCF;&#x4E00;&#x4E2A;Receiver&#x5BF9;&#x8C61;
  val receiver = iterator.next()
  assert(iterator.hasNext == false)
  val supervisor = new ReceiverSupervisorImpl(receiver, SparkEnv.get , serializableHadoopConf.value, checkpointDirOption)
  supervisor.start()
  supervisor.awaitTermination()
  <span class="hljs-keyword">...</span>
}

ReceiverSupervisor.start方法中,开始真正的启动 Receivers

<span class="hljs-keyword">def</span> start () {
  onStart()
  startReceiver()
}

<span class="hljs-keyword">def</span> startReceiver (): Unit = synchronized {
  <span class="hljs-keyword">try</span> {
    <span class="hljs-keyword">if</span> (onReceiverStart()) {
      logInfo(<span class="hljs-string">"Starting receiver"</span> )
      receiverState = Started

      receiver.onStart()
      logInfo(<span class="hljs-string">"Called receiver onStart"</span> )
    } <span class="hljs-keyword">else</span> {

      stop( <span class="hljs-string">"Registered unsuccessfully because Driver refused to start receiver "</span> + streamId, None)
    }
  } <span class="hljs-keyword">catch</span> {
    <span class="hljs-keyword">case</span> NonFatal(t) =>
      stop(<span class="hljs-string">"Error starting receiver "</span> + streamId , Some(t))
  }
}

receiver.onStart方法处,Spark Streaming根据具体情况对应不同的实现类,进入具体的实现逻辑中。

本文中使用的是 SocketInputDStream。对应的为 SocketReceiverSocketReceiver直接继承自 Receiver类。

( 3)SocketReceiver.onStart方法

在这个方法中,启动一个线程不停的执行 receive方法接收数据。

def onStart () {
  // Start the thread that receives data over a connection
  new Thread( <span class="hljs-string">"Socket Receiver"</span>) {
    setDaemon(true)
    override def run () { receive() }
  }.start()
}

def receive() {
  <span class="hljs-keyword">...</span>
  socket = new Socket(host , port)
  logInfo(<span class="hljs-string">"Connected to "</span> + host + <span class="hljs-string">":"</span> + port)
  val iterator = bytesToObjects(socket.getInputStream())
  // &#x53EA;&#x6709;&#x5F53;&#x8FD9;&#x4E2A;&#x8FDE;&#x63A5;&#x5B58;&#x5728;&#xFF0C;&#x5E76;&#x4E14;&#x63A5;&#x6536;&#x5230;&#x6570;&#x636E;&#x65F6;&#x624D;&#x4F1A;&#x8FDB;&#x5165;&#x8BE5;&#x903B;&#x8F91;&#x3002;&#x5982;&#x4E0B;&#x56FE;&#x6240;&#x793A;
  <span class="hljs-keyword">while</span> (!isStopped && iterator.hasNext) {
    store(iterator.next)  // &#x63A5;&#x6536;&#x4E00;&#x90E8;&#x5206;&#x6570;&#x636E;&#x540E;&#xFF0C;&#x8C03;&#x7528;store&#x65B9;&#x6CD5;&#x5C06;&#x63A5;&#x6536;&#x5230;&#x7684;&#x6570;&#x636E;&#x7F13;&#x5B58;&#x5230;&#x5185;&#x5B58;&#x4E2D;
  }
  <span class="hljs-keyword">...</span>

Spark Streaming应用启动过程分析

( 4)ReceiverSupervisor后续流程
继续上一步中的 Receiver.store方法


<span class="hljs-keyword">def</span> store(dataItem: T ) {
  supervisor.pushSingle(dataItem)
}

后续将接收到的数据依次通过调用方法

`
ReceiverSupervisorImpl.pushSingle

Original: https://www.cnblogs.com/wuyida/p/6300209.html
Author: 吴一达
Title: Spark Streaming应用启动过程分析

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/8852/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

免费咨询
免费咨询
扫码关注
扫码关注
联系站长

站长Johngo!

大数据和算法重度研究者!

持续产出大数据、算法、LeetCode干货,以及业界好资源!

2022012703491714

微信来撩,免费咨询:xiaozhu_tec

分享本页
返回顶部
最近整理资源【免费获取】:   👉 程序员最新必读书单  | 👏 互联网各方向面试题下载 | ✌️计算机核心资源汇总