转载

[原]Spark on Yarn: Cluster模式Scheduler实现

  • 背景
  • 主体逻辑
  • 具体实现
    • AM
    • YarnAllocator
    • Executor

背景

Spark on Yarn分yarn-cluster和yarn-client两种模式。本文通过Cluster模式的TaskScheduler实现入手,梳理一遍spark on yarn的大致实现逻辑。

前提我对两种模式以及yarn任务的整体运行逻辑不是很清楚。

主体逻辑

cluster模式中,使用的TaskScheduler是 YarnClusterScheduler

它继承了默认使用的TaskSchedulerImpl类,额外在postStartHook方法里,唤醒了 ApplicationMaster 类的设置sparkcontext的方法。

ApplicationMaster 相当于是spark在yarn上的AM,内部的 YarnRMClient 类,负责向RM注册和注销AM,以及拿到attemptId。注册AM之后,得到一个可以申请/释放资源的 YarnAllocationHandler 类,从而可以维护container与executor之间的关系。

下节具体介绍几个主要类的实现逻辑。

具体实现

AM

ApplicationMaster ,通过 YarnRMClient 来完成自己的注册和注销。

AM的启动方式

/**  * This object does not provide any special functionality. It exists so that it's easy to tell  * apart the client-mode AM from the cluster-mode AM when using tools such as ps or jps.  */ object ExecutorLauncher {    def main(args: Array[String]) = {     ApplicationMaster.main(args)   }  } 

main里面调用AM的run方法:

  def main(args: Array[String]) = {  SignalLogger.register(log)  val amArgs = new ApplicationMasterArguments(args)  SparkHadoopUtil.get.runAsSparkUser { () =>    master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs))    System.exit(master.run())  }   }  

run方法里:

1. 如果不是Driver模式,执行 runExecutorLauncher 逻辑:

启动后,执行registerAM,里面new了YarnAllocator的实现,调用allocateResources, 申请并执行 container。同时,启动一个reporter线程,每隔一段时间调用YarnAllocator的allocateResources方法,或汇报有太多executor fail了。

2. 如果是Driver模式,执行 runDriver 逻辑:

也是执行registerAM,但是之前需要反射执行jar包里用户定义的driver类。

YarnAllocator

YarnAllocator 负责向yarn申请和释放containers,维护containe、executor相关关系,有一个线程池。申请到container之后,在container里执行ExecutorRunnable。需要子类实现的是申请和释放这两个方法:

protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse protected def releaseContainer(container: Container): Unit

YarnAllocationHandler 继承了YarnAllocator。

  1. allocateContainers方法:
    Yarn api里提供ResourceRequest这个类,里面包含了一个app向RM索要不同container的信息,包括机器名/机架名,cpu和mem资源数,container数,优先级,locality是否放松。然后组成AllocateRequest类,代表AM向RM从集群里获得resource。调用ApplicationMasterProtocal的allocate(AllocateRequest),由AM**向RM发起资源请求**。
  2. releaseContainer方法:
    每次把需要release的container记录下来。在每次allocateContainers调用的时候,
    会往AllocateRequest里addAllReleases(releasedContainerList),在请求资源的时候顺便把历史资源释放掉。

ExecutorRunnable与Yarn的关系:

1. 向ContainerManager建立连接,让cm来startContainer。

2. ContainerLaunchContext包含了yarn的NodeManager启动一个container需要的所有信息。ExecutorRunnable会构建这个container申请信息。

可以参考这段启动逻辑:

def startContainer = {  logInfo("Setting up ContainerLaunchContext")  val ctx = Records.newRecord(classOf[ContainerLaunchContext])    .asInstanceOf[ContainerLaunchContext]  ctx.setContainerId(container.getId())  ctx.setResource(container.getResource())  val localResources = prepareLocalResources  ctx.setLocalResources(localResources)  val env = prepareEnvironment  ctx.setEnvironment(env)  ctx.setUser(UserGroupInformation.getCurrentUser().getShortUserName())  val credentials = UserGroupInformation.getCurrentUser().getCredentials()  val dob = new DataOutputBuffer()  credentials.writeTokenStorageToStream(dob)  ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))  val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores,    appAttemptId, localResources)  logInfo("Setting up executor with commands: " + commands)  ctx.setCommands(commands)  ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))  // If external shuffle service is enabled, register with the Yarn shuffle service already  // started on the NodeManager and, if authentication is enabled, provide it with our secret  // key for fetching shuffle files later  if (sparkConf.getBoolean("spark.shuffle.service.enabled", false)) {    val secretString = securityMgr.getSecretKey()    val secretBytes =   if (secretString != null) {     // This conversion must match how the YarnShuffleService decodes our secret     JavaUtils.stringToBytes(secretString)   } else {     // Authentication is not enabled, so just provide dummy metadata     ByteBuffer.allocate(0)   }    ctx.setServiceData(Map[String, ByteBuffer]("spark_shuffle" -> secretBytes))  }  // Send the start request to the ContainerManager  val startReq = Records.newRecord(classOf[StartContainerRequest])  .asInstanceOf[StartContainerRequest]  startReq.setContainerLaunchContext(ctx)  cm.startContainer(startReq)   }  

值得注意的是 setServiceData 方法,如果在node manager上启动了 external shuffle service 。Yarn的AuxiliaryService支持在NodeManager上启动辅助服务。spark有一个参数 spark.shuffle.service.enabled 来设置该服务是否被启用,我看的1.2.0版本里貌似没有服务的实现代码。

Executor

此外,从ExecutorRunnableUtil的prepareCommand方法可以得知,ExecutorRunnable通过命令行启动了 CoarseGrainedExecutorBackend 进程,与粗粒度的mesos模式和standalone模式一致,task最终落到CoarseGrainedExecutorBackend里面执行。

全文完:)

正文到此结束
Loading...