转载

hadoop yarn : hadoop如何得到JobID———JobSubmitter的submitJobInternal分析

Job类提交作业会调用JobSubmitter的submitJobInternal看看它的源码

JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {

//validate the jobs output specs  checkSpecs(job);  Configuration conf = job.getConfiguration(); addMRFrameworkToDistributedCache(conf);  Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); //*****myAdd LOG.info("---->jobStagingArea:  "+jobStagingArea); //configure the command line options correctly on the submitting dfs InetAddress ip = InetAddress.getLocalHost(); if (ip != null) {   submitHostAddress = ip.getHostAddress();   submitHostName = ip.getHostName();   conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);   conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } JobID jobId = submitClient.getNewJobID(); job.setJobID(jobId); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); JobStatus status = null; try {   conf.set(MRJobConfig.USER_NAME,       UserGroupInformation.getCurrentUser().getShortUserName());   conf.set("hadoop.http.filter.initializers",        "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");   conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());   LOG.debug("Configuring job " + jobId + " with " + submitJobDir        + " as the submit dir");   // get delegation token for the dir   TokenCache.obtainTokensForNamenodes(job.getCredentials(),       new Path[] { submitJobDir }, conf);    populateTokenCache(conf, job.getCredentials());    // generate a secret to authenticate shuffle transfers   if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {     KeyGenerator keyGen;     try {        int keyLen = CryptoUtils.isShuffleEncrypted(conf)            ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,                MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)           : SHUFFLE_KEY_LENGTH;       keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);       keyGen.init(keyLen);     } catch (NoSuchAlgorithmException e) {       throw new IOException("Error generating shuffle secret key", e);     }     SecretKey shuffleKey = keyGen.generateKey();     TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),         job.getCredentials());   }    copyAndConfigureFiles(job, submitJobDir);       Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);    // Create the splits for the job   LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));   int maps = writeSplits(job, submitJobDir);   conf.setInt(MRJobConfig.NUM_MAPS, maps);   LOG.info("number of splits:" + maps);    // write "queue admins of the queue to which job is being submitted"   // to job file.   String queue = conf.get(MRJobConfig.QUEUE_NAME,       JobConf.DEFAULT_QUEUE_NAME);   AccessControlList acl = submitClient.getQueueAdmins(queue);   conf.set(toFullPropertyName(queue,       QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());    // removing jobtoken referrals before copying the jobconf to HDFS   // as the tasks don't need this setting, actually they may break   // because of it if present as the referral will point to a   // different job.   TokenCache.cleanUpTokenReferral(conf);    if (conf.getBoolean(       MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,       MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {     // Add HDFS tracking ids     ArrayList<String> trackingIds = new ArrayList<String>();     for (Token<? extends TokenIdentifier> t :         job.getCredentials().getAllTokens()) {       trackingIds.add(t.decodeIdentifier().getTrackingId());     }     conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,         trackingIds.toArray(new String[trackingIds.size()]));   }    // Set reservation info if it exists   ReservationId reservationId = job.getReservationId();   if (reservationId != null) {     conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());   }    // Write job file to submit dir   writeConf(conf, submitJobFile);    //   // Now, actually submit the job (using the submit name)   //   printTokens(jobId, job.getCredentials());   status = submitClient.submitJob(       jobId, submitJobDir.toString(), job.getCredentials());   if (status != null) {     return status;   } else {     throw new IOException("Could not launch job");   } } finally {   if (status == null) {     LOG.info("Cleaning up the staging area " + submitJobDir);     if (jtFs != null && submitJobDir != null)       jtFs.delete(submitJobDir, true);    } } 

获取JobID的代码为 JobID jobId = submitClient.getNewJobID(); 这里的 submitClientYarnRunner ,调用它的getNewJobID()会调用它内部的 resMgrDelegate.getNewJobID (); 在ResourceMgrDelegate类中

public JobID getNewJobID() throws IOException, InterruptedException

try {   this.application = client.createApplication().getApplicationSubmissionContext();   this.applicationId = this.application.getApplicationId();   return TypeConverter.fromYarn(applicationId); } catch (YarnException e) {   throw new IOException(e); } 

ResourceMgrDelegate中的client为 YarnClientImplYarnClientImpl 类中 createApplication ()的源码如下

@Override public YarnClientApplication createApplication() throws YarnException, IOException {

ApplicationSubmissionContext context = Records.newRecord     (ApplicationSubmissionContext.class); GetNewApplicationResponse newApp = getNewApplication(); ApplicationId appId = newApp.getApplicationId(); context.setApplicationId(appId); return new YarnClientApplication(newApp, context); 

}

getNewApplication()源码如下

private GetNewApplicationResponse getNewApplication() throws YarnException, IOException {

GetNewApplicationRequest request =     Records.newRecord(GetNewApplicationRequest.class); //The interface used by clients to obtain a new ApplicationId for submitting new applications. //The ResourceManager responds with a new, monotonically increasing, ApplicationId which is used by the client to submit a new application. return rmClient.getNewApplication(request); 

}

总结:

获取JobID的大致流程如下

1、提交作业的客户端 YarnRunner 调用 getNewJobID 方法,内部调用 ResourceMgrDelegategetNewJobID

2、ResourceMgrDelegate调用内部成员 client (实际上是 YarnClientImpl )的 CreateApplication 方法创建一个 YarnApplication

3、YarnApplication创建流程为:①构造一个 ApplicationSubmissionContext context 对象②构造一个 GetNewApplicationRequest request ,③调用 rmClient. getNewApplication (request)获得一个 GetNewApplicationResponse newApp 对象 newApp中则包含了ResourceManager分配的 ApplicationId .

4.调用 contextsetApplicationId 设置 ApplicationId ,将 ResourceMgrDelegate 的内部成员 Application 设置为 context

5.将 ResourceMgrDelegate 的内部成员 ApplicationId 设置为context的 ApplicationId

原文  http://www.ituring.com.cn/article/211546
正文到此结束
Loading...