Job类提交作业会调用JobSubmitter的submitJobInternal看看它的源码
JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {
//validate the jobs output specs checkSpecs(job); Configuration conf = job.getConfiguration(); addMRFrameworkToDistributedCache(conf); Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); //*****myAdd LOG.info("---->jobStagingArea: "+jobStagingArea); //configure the command line options correctly on the submitting dfs InetAddress ip = InetAddress.getLocalHost(); if (ip != null) { submitHostAddress = ip.getHostAddress(); submitHostName = ip.getHostName(); conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } JobID jobId = submitClient.getNewJobID(); job.setJobID(jobId); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); JobStatus status = null; try { conf.set(MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser().getShortUserName()); conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf); populateTokenCache(conf, job.getCredentials()); // generate a secret to authenticate shuffle transfers if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { KeyGenerator keyGen; try { int keyLen = CryptoUtils.isShuffleEncrypted(conf) ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS) : SHUFFLE_KEY_LENGTH; keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); keyGen.init(keyLen); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating shuffle secret key", e); } SecretKey shuffleKey = keyGen.generateKey(); TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials()); } copyAndConfigureFiles(job, submitJobDir); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); // Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); int maps = writeSplits(job, submitJobDir); conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted" // to job file. String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); AccessControlList acl = submitClient.getQueueAdmins(queue); conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(conf); if (conf.getBoolean( MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED, MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) { // Add HDFS tracking ids ArrayList<String> trackingIds = new ArrayList<String>(); for (Token<? extends TokenIdentifier> t : job.getCredentials().getAllTokens()) { trackingIds.add(t.decodeIdentifier().getTrackingId()); } conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, trackingIds.toArray(new String[trackingIds.size()])); } // Set reservation info if it exists ReservationId reservationId = job.getReservationId(); if (reservationId != null) { conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString()); } // Write job file to submit dir writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) // printTokens(jobId, job.getCredentials()); status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status; } else { throw new IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true); } }
获取JobID的代码为 JobID jobId = submitClient.getNewJobID(); 这里的 submitClient 为 YarnRunner ,调用它的getNewJobID()会调用它内部的 resMgrDelegate.getNewJobID (); 在ResourceMgrDelegate类中
public JobID getNewJobID() throws IOException, InterruptedException
try { this.application = client.createApplication().getApplicationSubmissionContext(); this.applicationId = this.application.getApplicationId(); return TypeConverter.fromYarn(applicationId); } catch (YarnException e) { throw new IOException(e); }
ResourceMgrDelegate中的client为 YarnClientImpl 在 YarnClientImpl 类中 createApplication ()的源码如下
@Override public YarnClientApplication createApplication() throws YarnException, IOException {
ApplicationSubmissionContext context = Records.newRecord (ApplicationSubmissionContext.class); GetNewApplicationResponse newApp = getNewApplication(); ApplicationId appId = newApp.getApplicationId(); context.setApplicationId(appId); return new YarnClientApplication(newApp, context);
}
getNewApplication()源码如下
private GetNewApplicationResponse getNewApplication() throws YarnException, IOException {
GetNewApplicationRequest request = Records.newRecord(GetNewApplicationRequest.class); //The interface used by clients to obtain a new ApplicationId for submitting new applications. //The ResourceManager responds with a new, monotonically increasing, ApplicationId which is used by the client to submit a new application. return rmClient.getNewApplication(request);
}
总结:
获取JobID的大致流程如下
1、提交作业的客户端 YarnRunner 调用 getNewJobID 方法,内部调用 ResourceMgrDelegate 的 getNewJobID
2、ResourceMgrDelegate调用内部成员 client (实际上是 YarnClientImpl )的 CreateApplication 方法创建一个 YarnApplication
3、YarnApplication创建流程为:①构造一个 ApplicationSubmissionContext context 对象②构造一个 GetNewApplicationRequest request ,③调用 rmClient. getNewApplication (request)获得一个 GetNewApplicationResponse newApp 对象 newApp中则包含了ResourceManager分配的 ApplicationId .
4.调用 context 的 setApplicationId 设置 ApplicationId ,将 ResourceMgrDelegate 的内部成员 Application 设置为 context
5.将 ResourceMgrDelegate 的内部成员 ApplicationId 设置为context的 ApplicationId