上一篇文章DAGScheduler源码浅析主要从提交Job的流程角度介绍了DAGScheduler源码中的重要函数和关键点,这篇DAGScheduler源码浅析2主要参考fxjwind的 Spark源码分析 – DAGScheduler 一文,介绍一下DAGScheduler文件中之前没有介绍的几个重要函数。
在Spark 1.0版本之前,在DAGScheduler类中加入eventQueue私有成员,设置eventLoop Thread循环读取事件进行处理。在Spark 1.0源码中,事件处理通过Actor的方式进行,涉及的DAGEventProcessActor类进行主要的事件处理工作。可能由于scala不再支持原生actor方式,而将akka actor作为官方标准的原因,在我查看Spark 1.4的源码中,DAGScheduler重新采用eventQueue的方式进行事件处理,为了代码逻辑更加清晰,耦合性更小,1.4的源码中编写了DAGSchedulerEventProcessLoop类进行事件处理。
private[scheduler]classDAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler) extendsEventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop")withLogging {
这里DAGSchedulerEventProcessLoop继承了EventLoop类,其中:
private[spark]abstractclassEventLoop[E](name: String)extendsLogging{ privatevaleventQueue: BlockingQueue[E] =newLinkedBlockingDeque[E]() privatevalstopped =newAtomicBoolean(false) privatevaleventThread =newThread(name) { setDaemon(true) overridedefrun(): Unit = { try{ while(!stopped.get) { valevent = eventQueue.take() try{ onReceive(event) } catch{ caseNonFatal(e) => { try{ onError(e) } catch{ caseNonFatal(e) => logError("Unexpected error in "+ name, e) } } } } } catch{ caseie: InterruptedException =>// exit even if eventQueue is not empty caseNonFatal(e) => logError("Unexpected error in "+ name, e) } } } ......
我们可以看到,DAGScheduler通过向DAGSchedulerEventProcessLoop对象投递event,即向eventQueue发送event,eventThread不断从eventQueue中获取event并调用onReceive函数进行处理。
overridedefonReceive(event: DAGSchedulerEvent): Unit = eventmatch{ caseJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) ......
JobWaiter首先实现JobListener的taskSucceeded和jobFailed函数,当DAGScheduler收到tasksuccess或fail的event就会调用相应的函数在tasksuccess会判断当所有task都success时,就表示jobFinished而awaitResult,就是一直等待jobFinished被置位。
可以看到在submitJob函数中创建了JobWaiter实例,作为参数传入的事件实例中,最终在调用handleJobSubmitted函数中,如果发生错误,就会调用JobWaiter的jobFailed函数。
下面是JobWaiter类的代码:
private[spark]classJobWaiter[T]( dagScheduler: DAGScheduler, val jobId: Int, totalTasks: Int, resultHandler: (Int, T) => Unit) extendsJobListener { privatevarfinishedTasks =0 // Is the job as a whole finished (succeeded or failed)? @volatile privatevar_jobFinished = totalTasks ==0 defjobFinished = _jobFinished // If the job is finished, this will be its result. In the case of 0 task jobs (e.g. zero // partition RDDs), we set the jobResult directly to JobSucceeded. privatevarjobResult: JobResult =if(jobFinished) JobSucceededelsenull /** * Sends a signal to the DAGScheduler to cancel the job. The cancellation itself is handled * asynchronously. After the low level scheduler cancels all the tasks belonging to this job, it * will fail this job with a SparkException. */ defcancel() { dagScheduler.cancelJob(jobId) } overridedeftaskSucceeded(index: Int, result: Any): Unit = synchronized { if(_jobFinished) { thrownewUnsupportedOperationException("taskSucceeded() called on a finished JobWaiter") } resultHandler(index, result.asInstanceOf[T]) finishedTasks += 1 if(finishedTasks == totalTasks) { _jobFinished = true jobResult = JobSucceeded this.notifyAll() } } overridedefjobFailed(exception: Exception): Unit = synchronized { _jobFinished = true jobResult = JobFailed(exception) this.notifyAll() } defawaitResult(): JobResult = synchronized { while(!_jobFinished) { this.wait() } returnjobResult } }
这一小节内容介绍了DAGScheduler.scala文件中的几个小细节,下一篇文章我会就DAGScheduler.scala文件中stage划分和依赖性进行分析介绍。
转载请注明作者Jason Ding及其出处
GitCafe博客主页(http://jasonding1354.gitcafe.io/)
Github博客主页(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354进入我的博客主页