上一小节 《TaskScheduler源码与任务提交原理浅析2》 介绍了Driver侧将Stage进行划分,根据Executor闲置情况分发任务,最终通过DriverActor向executorActor发送任务消息。
我们要了解Executor的执行机制首先要了解Executor在Driver侧的注册过程,这篇文章先了解一下Application和Executor的注册过程。
Spark将由Executor执行的Task分为ShuffleMapTask和ResultTask两种,其源码存在scheduler package中。
Task是介于DAGScheduler和TaskScheduler中间的接口,在DAGScheduler,需要把DAG中的每个stage的每个partitions封装成task,最终把taskset提交给TaskScheduler。
/** * A unit of execution. We have two kinds of Task's in Spark: * - [[org.apache.spark.scheduler.ShuffleMapTask]] * - [[org.apache.spark.scheduler.ResultTask]] * * A Spark job consists of one or more stages. The very last stage in a job consists of multiple * ResultTasks, while earlier stages consist of ShuffleMapTasks. A ResultTask executes the task * and sends the task output back to the driver application. A ShuffleMapTask executes the task * and divides the task output to multiple buckets (based on the task's partitioner). * * @paramstageId id of the stage this task belongs to * @parampartitionId index of the number in the RDD */ private[spark]abstractclassTask[T](val stageId: Int, var partitionId: Int)extendsSerializable
Task对应一个stageId和partitionId。
提供runTask()接口、kill()接口等。
提供killed变量、TaskMetrics变量、TaskContext变量等。
除了上述基本接口和变量,Task的伴生对象提供了序列化和反序列化应用依赖的jar包的方法。原因是Task需要保证工作节点具备本次Task需要的其他依赖,注册到SparkContext下,所以提供了把依赖转成流写入写出的方法。
对应于ShuffleMap Stage, 产生的结果作为其他stage的输入。ShuffleMapTask复写了MapStatus向外读写的方法,因为向外读写的内容包括:stageId,rdd,dep,partitionId,epoch和split(某个partition)。对于其中的stageId,rdd,dep有统一的序列化和反序列化操作并会cache在内存里,再放到ObjectOutput里写出去。序列化操作使用的是Gzip,序列化信息会维护在serializedInfoCache = newHashMap[Int, Array[Byte]]。这部分需要序列化并保存的原因是:stageId,rdd,dep真正代表了本次Shuffle Task的信息,为了减轻master节点负担,把这部分序列化结果cache了起来。
/** * A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner * specified in the ShuffleDependency). * * See [[org.apache.spark.scheduler.Task]] for more information. * * @paramstageId id of the stage this task belongs to * @paramtaskBinary broadcast version of of the RDD and the ShuffleDependency. Once deserialized, * the type should be (RDD[_], ShuffleDependency[_, _, _]). * @parampartition partition of the RDD this task is associated with * @paramlocs preferred task execution locations for locality scheduling */ private[spark]classShuffleMapTask( stageId: Int, taskBinary: Broadcast[Array[Byte]], partition: Partition, @transientprivate var locs: Seq[TaskLocation]) extendsTask[MapStatus](stageId, partition.index)withLogging {
对应于Result Stage直接产生结果。
/** * A task that sends back the output to the driver application. * * See [[Task]] for more information. * * @paramstageId id of the stage this task belongs to * @paramtaskBinary broadcasted version of the serialized RDD and the function to apply on each * partition of the given RDD. Once deserialized, the type should be * (RDD[T], (TaskContext, Iterator[T]) => U). * @parampartition partition of the RDD this task is associated with * @paramlocs preferred task execution locations for locality scheduling * @paramoutputId index of the task in this job (a job can launch tasks on only a subset of the * input RDD's partitions). */ private[spark]classResultTask[T,U]( stageId: Int, taskBinary: Broadcast[Array[Byte]], partition: Partition, @transientlocs: Seq[TaskLocation], val outputId: Int) extendsTask[U](stageId, partition.index)withSerializable {
TaskSet是一个数据结构,用于封装一个stage的所有的tasks, 以提交给TaskScheduler。TaskSet就是可以做pipeline的一组完全相同的task,每个task的处理逻辑完全相同,不同的是处理数据,每个task负责处理一个partition。pipeline,可以称为大数据处理的基石,只有数据进行pipeline处理,才能将其放到集群中去运行。对于一个task来说,它从数据源获得逻辑,然后按照拓扑顺序,顺序执行(实际上是调用rdd的compute)。
/** * A set of tasks submitted together to the low-level TaskScheduler, usually representing * missing partitions of a particular stage. */ private[spark]classTaskSet( val tasks: Array[Task[_]], val stageId: Int, val attempt: Int, val priority: Int, val properties: Properties) { valid: String = stageId +"."+ attempt overridedeftoString: String ="TaskSet "+ id }
Driver发送LaunchTask消息被Executor接收,Executor会使用launchTask对消息进行处理。不过在这个过程之前,我们要知道,如果Executor没有注册到Driver,即便接收到LaunchTask指令,也不会做任务处理。所以我们要先搞清楚,Executor是如何在Driver侧进行注册的。
Executor的注册是发生在Application的注册过程中的,我们以Standalone模式为例:
SparkContext创建schedulerBackend和taskScheduler,schedulerBackend作为TaskScheduler对象的一个成员存在 —> 在TaskScheduler对象调用start函数时,其实调用了backend.start()函数 —> backend.start()函数中启动了AppClient,AppClient的其中一个参数ApplicationDescription就是封装的运行 CoarseGrainedExecutorBackend的命令
—> AppClient内部启动了一个ClientActor,这个ClientActor启动之后,会尝试向Master发送一个指令 actor ! RegisterApplication(appDescription)
注册一个Application
下面是SparkDeploySchedulerBackend的start函数中的部分注册Application的代码:
// Start executors with a few necessary configs for registering with the scheduler valsparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf) valjavaOpts = sparkJavaOpts ++ extraJavaOpts valcommand = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) valappUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") valappDesc =newApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec) client = newAppClient(sc.env.actorSystem, masters, appDesc,this, conf) client.start()
AppClient向Master提交Application
AppClient是Application和Master交互的接口。它的包含一个类型为 org.apache.spark.deploy.client.AppClient.ClientActor
的成员变量actor。它负责了所有的与Master的交互。其中提交Application过程涉及的函数调用为:
ClientActor的 preStart()
—> 调用 registerWithMaster()
—> 调用 tryRegisterAllMasters
—> actor ! RegisterApplication(appDescription)
—> Master的receiveWithLogging函数处理RegisterApplication消息。
下面是RegisterApplication(appDescription)消息的相关处理代码(在Master.scala中的receiveWithLogging部分代码):
caseRegisterApplication(description) => { if(state == RecoveryState.STANDBY) { // ignore, don't send response } else{ logInfo("Registering app "+ description.name) valapp = createApplication(description, sender) registerApplication(app) logInfo("Registered app "+ description.name +" with ID "+ app.id) persistenceEngine.addApplication(app) sender ! RegisteredApplication(app.id, masterUrl) schedule()//为处于待分配资源的Application分配资源。在每次有新的Application加入或者新的资源加入时都会调用schedule进行调度 } }
这段代码做了以下几件事:
schedule()为处于待分配资源的Application分配资源。在每次有新的Application加入或者新的资源加入时都会调用schedule进行调度。为Application分配资源选择worker(executor),现在有两种策略:
对于同一个Application,它在一个worker上只能拥有一个executor;当然了,这个executor可能拥有多于1个core。对于策略1,任务的部署会慢于策略2,但是GC的时间会更快。
schedule函数的源码,解释在中文注释中:
/* * Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability changes. */ privatedefschedule() { if(state != RecoveryState.ALIVE) {return} // First schedule drivers, they take strict precedence over applications // Randomization helps balance drivers valshuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) valnumWorkersAlive = shuffledAliveWorkers.size varcurPos =0 for(driver <- waitingDrivers.toList) {// iterate over a copy of waitingDrivers // We assign workers to each waiting driver in a round-robin fashion. For each driver, we // start from the last worker that was assigned a driver, and continue onwards until we have // explored all alive workers. varlaunched =false varnumWorkersVisited =0 while(numWorkersVisited < numWorkersAlive && !launched) { valworker = shuffledAliveWorkers(curPos) numWorkersVisited += 1 if(worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) waitingDrivers -= driver launched = true } curPos = (curPos + 1) % numWorkersAlive } } // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. if(spreadOutApps) {//尽量的打散负载,如有可能,每个executor分配一个core // Try to spread out each app among all the nodes, until it has all its cores for(app <- waitingAppsifapp.coresLeft >0) { // 可用的worker的标准:State是Alive,其上并没有该Application的executor,可用内存满足要求。 // 在可用的worker中,优先选择可用core数多的。 valusableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(canUse(app, _)).sortBy(_.coresFree).reverse valnumUsable = usableWorkers.length valassigned =newArray[Int](numUsable)// Number of cores to give on each node vartoAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) varpos =0 while(toAssign >0) { if(usableWorkers(pos).coresFree - assigned(pos) >0) { toAssign -= 1 assigned(pos) += 1 } pos = (pos + 1) % numUsable } // Now that we've decided how many cores to give on each node, let's actually give them for(pos <-0until numUsable) { if(assigned(pos) >0) { valexec = app.addExecutor(usableWorkers(pos), assigned(pos)) launchExecutor(usableWorkers(pos), exec) app.state = ApplicationState.RUNNING } } } } else{//尽可能多的利用worker的core // Pack each app into as few nodes as possible until we've assigned all its cores for(worker <- workersifworker.coresFree >0&& worker.state == WorkerState.ALIVE) { for(app <- waitingAppsifapp.coresLeft >0) { if(canUse(app, worker)) { valcoresToUse = math.min(worker.coresFree, app.coresLeft) if(coresToUse >0) { valexec = app.addExecutor(worker, coresToUse) launchExecutor(worker, exec) app.state = ApplicationState.RUNNING } } } } } }
在选择了worker和确定了worker上得executor需要的CPU core数后,Master会调用 launchExecutor(worker: WorkerInfo, exec: ExecutorInfo)向Worker发送请求,向AppClient发送executor已经添加的消息。同时会更新master保存的worker的信息,包括增加executor,减少可用的CPU core数和memory数。Master不会等到真正在worker上成功启动executor后再更新worker的信息。如果worker启动executor失败,那么它会发送FAILED的消息给Master,Master收到该消息时再次更新worker的信息即可。
deflaunchExecutor(worker: WorkerInfo, exec: ExecutorDesc) { logInfo("Launching executor "+ exec.fullId +" on worker "+ worker.id) worker.addExecutor(exec) worker.actor ! LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory) exec.application.driver ! ExecutorAdded( exec.id, worker.id, worker.hostPort, exec.cores, exec.memory) }
下面的调用关系链是Worker接收到来自Master的 LaunchExecutor消息
后的调用过程:
LaunchExecutor的消息处理中创建 ExecutorRunner
—> ExecutorRunner会将在 SparkDeploySchedulerBackend
中准备好的 ApplicationDescription
以进程的形式启动起来 —> 启动 ApplicationDescription
中携带的 CoarseGrainedExecutorBackend
—> CoarseGrainedExecutorBackend
启动后,会首先通过传入的driverUrl这个参数向在 CoarseGrainedSchedulerBackend::DriverActor
发送 RegisterExecutor消息
—> DriverActor会回复 RegisteredExecutor
—> CoarseGrainedExecutorBackend
会创建一个 Executor
—> Executor创建完毕。
CoarseGrainedExecutorBackend启动后,preStart函数执行的相关操作:
overridedefpreStart() { logInfo("Connecting to driver: "+ driverUrl) driver = context.actorSelection(driverUrl) driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) }
CoarseGrainedExecutorBackend接收RegisteredExecutor消息后,创建Executor的操作:
overridedefreceiveWithLogging = { caseRegisteredExecutor => logInfo("Successfully registered with driver") val(hostname, _) = Utils.parseHostPort(hostPort) executor = newExecutor(executorId, hostname, env, userClassPath, isLocal =false) ......
Spark Core源码分析: Spark任务模型
Spark技术内幕:Executor分配详解 —— 强烈推荐该博文,其中博主结合Spark源码对Executor的分配讲解的非常详细
转载请注明作者Jason Ding及其出处
GitCafe博客主页(http://jasonding1354.gitcafe.io/)
Github博客主页(http://jasonding1354.github.io/)
CSDN博客(http://blog.csdn.net/jasonding1354)
简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
Google搜索jasonding1354进入我的博客主页