上一篇文章 《DAGScheduler源码浅析》 中,介绍了handleJobSubmitted函数,它作为生成finalStage的重要函数存在,这一篇文章中,我将就DAGScheduler生成Stage过程继续学习,同时介绍Stage的相关源码。
Stage的调度是由DAGScheduler完成的。由RDD的有向无环图DAG切分出了Stage的有向无环图DAG。Stage的DAG通过最后执行的Stage为根进行广度优先遍历,遍历到最开始执行的Stage执行,如果提交的Stage仍有未完成的父母Stage,则Stage需要等待其父Stage执行完才能执行。同时DAGScheduler中还维持了几个重要的Key-Value集合结构,用来记录Stage的状态,这样能够避免过早执行和重复提交Stage。waitingStages中记录仍有未执行的父母Stage,防止过早执行。runningStages中保存正在执行的Stage,防止重复执行。failedStages中保存执行失败的Stage,需要重新执行,这里的设计是出于容错的考虑。
// Stages we need to run whose parents aren't done private[scheduler]valwaitingStages =newHashSet[Stage] // Stages we are running right now private[scheduler]valrunningStages =newHashSet[Stage] // Stages that must be resubmitted due to fetch failures private[scheduler]valfailedStages =newHashSet[Stage]
RDD的窄依赖是指父RDD的所有输出都会被指定的子RDD消费,即输出路径是固定的;宽依赖是指父RDD的输出会由不同的子RDD消费,即输出路径不固定。
调度器会计算RDD之间的依赖关系,将拥有持续窄依赖的RDD归并到同一个Stage中,而宽依赖则作为划分不同Stage的判断标准。
导致窄依赖的Transformation操作:map、flatMap、filter、sample;导致宽依赖的Transformation操作:sortByKey、reduceByKey、groupByKey、cogroupByKey、join、cartensian。
Stage分为两种:
ShuffleMapStage, in which case its tasks’ results are input for another stage
其实就是,非最终stage, 后面还有其他的stage, 所以它的输出一定是需要shuffle并作为后续的输入。
这种Stage是以Shuffle为输出边界,其输入边界可以是从外部获取数据,也可以是另一个ShuffleMapStage的输出
其输出可以是另一个Stage的开始。
ShuffleMapStage的最后Task就是ShuffleMapTask。
在一个Job里可能有该类型的Stage,也可以能没有该类型Stage。
ResultStage, in which case its tasks directly compute the action that initiated a job (e.g. count(), save(), etc)
最终的stage, 没有输出, 而是直接产生结果或存储。
这种Stage是直接输出结果,其输入边界可以是从外部获取数据,也可以是另一个ShuffleMapStage的输出。
ResultStage的最后Task就是ResultTask,在一个Job里必定有该类型Stage。
一个Job含有一个或多个Stage,但至少含有一个ResultStage。
RDD转换本身存在ShuffleDependency,像ShuffleRDD、CoGroupdRDD、SubtractedRDD会返回ShuffleDependency。
如果RDD中存在ShuffleDependency,就会创建一个新的Stage。
Stage划分完毕就明确了以下内容:
确认Partition以决定需要产生多少不同的Task,ShuffleMap类型判断来决定生成的Task类型。Spark中有两种Task,分别是ShuffleMapTask和ResultTask。
stage的RDD参数只有一个RDD, final RDD, 而不是一系列的RDD。因为在一个stage中的所有RDD都是map, partition不会有任何改变, 只是在data依次执行不同的map function所以对于TaskScheduler而言, 一个RDD的状况就可以代表这个stage。
Stage参数说明:
val id: Int //Stage的序号数值越大,优先级越高
], //归属于本Stage的最后一个rdd
val numTasks: Int, //创建的Task数目,等于父RDD的输出Partition数目
, , ]], //是否存在SuffleDependency,宽依赖val parents: List[Stage], //父Stage列表
val jobId: Int //作业ID
private[spark]classStage( val id: Int, val rdd: RDD[_], val numTasks: Int, val shuffleDep: Option[ShuffleDependency[_, _, _]], // Output shuffle if stage is a map stage val parents: List[Stage], val jobId: Int, val callSite: CallSite) extendsLogging { valisShuffleMap = shuffleDep.isDefined valnumPartitions = rdd.partitions.size valoutputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil) varnumAvailableOutputs =0 /** Set of jobs that this stage belongs to. */ valjobIds =newHashSet[Int] /** For stages that are the final (consists of only ResultTasks), link to the ActiveJob. */ varresultOfJob: Option[ActiveJob] = None varpendingTasks =newHashSet[Task[_]] privatevarnextAttemptId =0 valname = callSite.shortForm valdetails = callSite.longForm /** Pointer to the latest [StageInfo] object, set by DAGScheduler. */ varlatestInfo: StageInfo = StageInfo.fromStage(this) defisAvailable: Boolean = { if(!isShuffleMap) { true } else{ numAvailableOutputs == numPartitions } } defaddOutputLoc(partition: Int, status: MapStatus) { valprevList = outputLocs(partition) outputLocs(partition) = status :: prevList if(prevList == Nil) { numAvailableOutputs += 1 } } defremoveOutputLoc(partition: Int, bmAddress: BlockManagerId) { valprevList = outputLocs(partition) valnewList = prevList.filterNot(_.location == bmAddress) outputLocs(partition) = newList if(prevList != Nil && newList == Nil) { numAvailableOutputs -= 1 } } /** * Removes all shuffle outputs associated with this executor. Note that this will also remove * outputs which are served by an external shuffle server (if one exists), as they are still * registered with this execId. */ defremoveOutputsOnExecutor(execId: String) { varbecameUnavailable =false for(partition <-0until numPartitions) { valprevList = outputLocs(partition) valnewList = prevList.filterNot(_.location.executorId == execId) outputLocs(partition) = newList if(prevList != Nil && newList == Nil) { becameUnavailable = true numAvailableOutputs -= 1 } } if(becameUnavailable) { logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format( this, execId, numAvailableOutputs, numPartitions, isAvailable)) } } /** Return a new attempt id, starting with 0. */ defnewAttemptId(): Int = { valid = nextAttemptId nextAttemptId += 1 id } defattemptId: Int = nextAttemptId overridedeftoString ="Stage "+ id overridedefhashCode(): Int = id overridedefequals(other: Any): Boolean = othermatch{ casestage: Stage => stage !=null&& stage.id == id case_ =>false } }
dagScheduler.handleJobSubmitted
—> dagScheduler.submitStage
—> dagScheduler.submitMissingTasks
—> taskScheduler.submitTasks
。
函数handleJobSubmitted和submitStage主要负责依赖性分析,对其处理逻辑做进一步的分析。handleJobSubmitted最主要的工作是生成Stage,并根据finalStage来产生ActiveJob。
private[scheduler]defhandleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], allowLocal: Boolean, callSite: CallSite, listener: JobListener, properties: Properties) { varfinalStage: Stage =null try{ // New stage creation may throw an exception if, for example, jobs are run on a // HadoopRDD whose underlying HDFS files have been deleted. finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite) } catch{ //错误处理,告诉监听器作业失败,返回.... casee: Exception => logWarning("Creating new stage failed due to exception - job: "+ jobId, e) listener.jobFailed(e) return } if(finalStage !=null) { valjob =newActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format( job.jobId, callSite.shortForm, partitions.length, allowLocal)) logInfo("Final stage: "+ finalStage +"("+ finalStage.name +")") logInfo("Parents of final stage: "+ finalStage.parents) logInfo("Missing parents: "+ getMissingParentStages(finalStage)) valshouldRunLocally = localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1 valjobSubmissionTime = clock.getTimeMillis() if(shouldRunLocally) { // 很短、没有父stage的本地操作,比如 first() or take() 的操作本地执行 // Compute very short actions like first() or take() with no parent stages locally. listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties)) runLocally(job) } else{ // collect等操作走的是这个过程,更新相关的关系映射,用监听器监听,然后提交作业 jobIdToActiveJob(jobId) = job activeJobs += job finalStage.resultOfJob = Some(job) valstageIds = jobIdToStageIds(jobId).toArray valstageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) // 提交stage submitStage(finalStage) } } // 提交stage submitWaitingStages() }
/** * Create a Stage -- either directly for use as a result stage, or as part of the (re)-creation * of a shuffle map stage in newOrUsedStage. The stage will be associated with the provided * jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage * directly. */ privatedefnewStage( rdd: RDD[_], numTasks: Int, shuffleDep: Option[ShuffleDependency[_, _, _]], jobId: Int, callSite: CallSite) : Stage = { valparentStages = getParentStages(rdd, jobId) valid = nextStageId.getAndIncrement() valstage =newStage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stage }
其中,Stage的初始化参数:在创建一个Stage之前,需要知道该Stage需要从多少个Partition读入数据,这个数值直接影响要创建多少个Task。也就是说,创建Stage时,已经清楚该Stage需要从多少不同的Partition读入数据,并写出到多少个不同的Partition中,输入和输出的个数均已明确。
getParentStages函数:
通过不停的遍历它之前的rdd,如果碰到有依赖是ShuffleDependency类型的,就通过getShuffleMapStage方法计算出来它的Stage来。
/** * Get or create the list of parent stages for a given RDD. The stages will be assigned the * provided jobId if they haven't already been created with a lower jobId. */ privatedefgetParentStages(rdd: RDD[_], jobId: Int): List[Stage] = { valparents =newHashSet[Stage] valvisited =newHashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting valwaitingForVisit =newStack[RDD[_]] defvisit(r: RDD[_]) { if(!visited(r)) { visited += r // Kind of ugly: need to register RDDs with the cache here since // we can't do it in its constructor because # of partitions is unknown for(dep <- r.dependencies) { dep match{ caseshufDep: ShuffleDependency[_, _, _] => parents += getShuffleMapStage(shufDep, jobId) case_ => waitingForVisit.push(dep.rdd) } } } } waitingForVisit.push(rdd) while(!waitingForVisit.isEmpty) { visit(waitingForVisit.pop()) } parents.toList }
用户所提交的job在得到DAGScheduler的调度后,会被包装成ActiveJob,同时会启动JobWaiter阻塞监听job的完成状况。
同时依据job中RDD的dependency和dependency属性(NarrowDependency,ShufflerDependecy),DAGScheduler会根据依赖关系的先后产生出不同的stage DAG(result stage, shuffle map stage)。
在每一个stage内部,根据stage产生出相应的task,包括ResultTask或是ShuffleMapTask,这些task会根据RDD中partition的数量和分布,产生出一组相应的task,并将其包装为TaskSet提交到TaskScheduler上去。
/** * Tracks information about an active job in the DAGScheduler. */ private[spark]classActiveJob( val jobId: Int, val finalStage: Stage, val func: (TaskContext, Iterator[_]) => _, valpartitions: Array[Int], valcallSite: CallSite, vallistener: JobListener, valproperties: Properties) { valnumPartitions = partitions.length valfinished = Array.fill[Boolean](numPartitions)(false) varnumFinished =0 }
submitStage函数中会根据依赖关系划分stage,通过递归调用从finalStage一直往前找它的父stage,直到stage没有父stage时就调用submitMissingTasks方法提交改stage。这样就完成了将job划分为一个或者多个stage。
submitStage处理流程:
/** Submits stage, but first recursively submits any missing parents. */ privatedefsubmitStage(stage: Stage) { valjobId = activeJobForStage(stage) if(jobId.isDefined) { logDebug("submitStage("+ stage +")") if(!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { valmissing = getMissingParentStages(stage).sortBy(_.id)// 根据final stage发现是否有parent stage logDebug("missing: "+ missing) if(missing == Nil) { logInfo("Submitting "+ stage +" ("+ stage.rdd +"), which has no missing parents") submitMissingTasks(stage, jobId.get) // 如果没有parent stage需要执行, 则直接submit当前stage的task } else{ for(parent <- missing) { submitStage(parent) // 提交父stage的task,这里是个递归,直到没有父stage才在上面的语句中提交task } waitingStages += stage // 暂时不能提交的stage,先添加到等待队列 } } } else{ abortStage(stage, "No active job for stage "+ stage.id) } }
这个提交stage的过程是一个递归的过程,它是先要把父stage先提交,然后把自己添加到等待队列中,直到没有父stage之后,就提交该stage中的任务。等待队列在最后的submitWaitingStages方法中提交。
getMissingParentStages通过图的遍历,来找出所依赖的所有父Stage。
privatedefgetMissingParentStages(stage: Stage): List[Stage] = { valmissing =newHashSet[Stage] valvisited =newHashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting valwaitingForVisit =newStack[RDD[_]] defvisit(rdd: RDD[_]) { if(!visited(rdd)) { visited += rdd if(getCacheLocs(rdd).contains(Nil)) { for(dep <- rdd.dependencies) { dep match{ caseshufDep: ShuffleDependency[_, _, _] =>// 如果发现ShuffleDependency, 说明遇到新的stage valmapStage = getShuffleMapStage(shufDep, stage.jobId) // check shuffleToMapStage, 如果该stage已经被创建则直接返回, 否则newStage if(!mapStage.isAvailable) { missing += mapStage } casenarrowDep: NarrowDependency[_] =>// 对于NarrowDependency, 说明仍然在这个stage中 waitingForVisit.push(narrowDep.rdd) } } } } } waitingForVisit.push(stage.rdd) while(!waitingForVisit.isEmpty) { visit(waitingForVisit.pop()) } missing.toList }
可见无论是哪种stage,都是对于每个stage中的每个partitions创建task,并最终封装成TaskSet,将该stage提交给taskscheduler。
/** Called when stage's parents are available and we can now do its task. */ privatedefsubmitMissingTasks(stage: Stage, jobId: Int) { logDebug("submitMissingTasks("+ stage +")") // Get our pending tasks and remember them in our pendingTasks entry stage.pendingTasks.clear() // First figure out the indexes of partition ids to compute. valpartitionsToCompute: Seq[Int] = { if(stage.isShuffleMap) { (0until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil) } else{ valjob = stage.resultOfJob.get (0until job.numPartitions).filter(id => !job.finished(id)) } } valproperties =if(jobIdToActiveJob.contains(jobId)) { jobIdToActiveJob(stage.jobId).properties } else{ // this stage will be assigned to "default" pool null } runningStages += stage // SparkListenerStageSubmitted should be posted before testing whether tasks are // serializable. If tasks are not serializable, a SparkListenerStageCompleted event // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size)) outputCommitCoordinator.stageStart(stage.id) listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties)) // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times. // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast // the serialized copy of the RDD and for each task we will deserialize it, which means each // task gets a different copy of the RDD. This provides stronger isolation between tasks that // might modify state of objects referenced in their closures. This is necessary in Hadoop // where the JobConf/Configuration object is not thread-safe. vartaskBinary: Broadcast[Array[Byte]] =null try{ // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep). // For ResultTask, serialize and broadcast (rdd, func). valtaskBinaryBytes: Array[Byte] = if(stage.isShuffleMap) { closureSerializer.serialize((stage.rdd, stage.shuffleDep.get) : AnyRef).array() } else{ closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : AnyRef).array() } taskBinary = sc.broadcast(taskBinaryBytes) } catch{ // In the case of a failure during serialization, abort the stage. casee: NotSerializableException => abortStage(stage, "Task not serializable: "+ e.toString) runningStages -= stage return caseNonFatal(e) => abortStage(stage, s"Task serialization failed: $e/n${e.getStackTraceString}") runningStages -= stage return } valtasks: Seq[Task[_]] =if(stage.isShuffleMap) { partitionsToCompute.map { id => vallocs = getPreferredLocs(stage.rdd, id) valpart = stage.rdd.partitions(id) newShuffleMapTask(stage.id, taskBinary, part, locs) } } else{ valjob = stage.resultOfJob.get partitionsToCompute.map { id => valp: Int = job.partitions(id) valpart = stage.rdd.partitions(p) vallocs = getPreferredLocs(stage.rdd, p) newResultTask(stage.id, taskBinary, part, locs, id) } } if(tasks.size >0) { logInfo("Submitting "+ tasks.size +" missing tasks from "+ stage +" ("+ stage.rdd +")") stage.pendingTasks ++= tasks logDebug("New pending tasks: "+ stage.pendingTasks) taskScheduler.submitTasks( newTaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else{ // Because we posted SparkListenerStageSubmitted earlier, we should mark // the stage as completed here in case there are no tasks to run markStageAsFinished(stage, None) logDebug("Stage "+ stage +" is actually done; %b %d %d".format( stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions)) } }
fxjwind—Spark源码分析—Stage
Spark源码系列(三)作业运行过程
Spark技术内幕:Stage划分及提交源码分析转载请注明作者Jason Ding及其出处
GitCafe博客主页(http://jasonding1354.gitcafe.io/)
Github博客主页(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354进入我的博客主页