微信公众号: 深广大数据Club
关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;
如果你觉得深广大数据Club对你有帮助,欢迎赞赏
本文主要讲述Apache Flink在集群模式下提交任务的执行流程源码分析。
本地模式任务提交源码解析可以参考上篇文章《Flink源码解析 | 从Example出发:读懂本地任务执行流程》进行了解。
Apache Flink集群模式任务提交执行流程入口与本地模式入口相同。
我们还是从SocketWindowWordCount入手.
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
判断获取对应的ExecutionEnvironment对象,这里获取的对象是StreamContextEnvironment。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); if (env instanceof ContextEnvironment) { return new StreamContextEnvironment((ContextEnvironment) env); } else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) { return new StreamPlanEnvironment(env); } else { return createLocalEnvironment(); }
@Override public JobExecutionResult execute(String jobName) throws Exception { Preconditions.checkNotNull(jobName, "Streaming Job name should not be null."); StreamGraph streamGraph = this.getStreamGraph(); streamGraph.setJobName(jobName); transformations.clear(); // execute the programs if (ctx instanceof DetachedEnvironment) { LOG.warn("Job was executed in detached mode, the results will be available on completion."); ((DetachedEnvironment) ctx).setDetachedPlan(streamGraph); return DetachedEnvironment.DetachedJobExecutionResult.INSTANCE; } else { return ctx .getClient() .run(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointRestoreSettings()) .getJobExecutionResult(); } }
若不指定detached模式,则执行else代码块中的代码
获取ClusterClient对象,执行run方法
设置jar,classpath,classloader,savePointRestore
获取JobExecutionResult
public JobSubmissionResult run(PackagedProgram prog, int parallelism) throws ProgramInvocationException, ProgramMissingJobException { ... if (prog.isUsingProgramEntryPoint()) { ... }else if (prog.isUsingInteractiveMode()) { ... } }
任务运行提交的时候判断是使用交互模式还是使用程序入口点。
public boolean isUsingInteractiveMode() { return this.program == null; } public boolean isUsingProgramEntryPoint() { return this.program != null; }
判断条件则是program是否为null。
flink run脚本调用CliFrontend.java类中的run方法。
我们执行run方法启动程序,调用buildProgram()方法初给program赋值,之后传递给runProgram()继续往下调用。
final Options commandOptions = CliFrontendParser.getRunCommandOptions(); final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions); final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true); final RunOptions runOptions = new RunOptions(commandLine); final PackagedProgram program; try { LOG.info("Building program from JAR file"); program = buildProgram(runOptions); } ... runProgram(customCommandLine, commandLine, runOptions, program); ...
从以下代码可以看出,这里buildProgram其实就是我们提交jar包的方式。
PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundException, ProgramInvocationException { String[] programArgs = options.getProgramArgs(); String jarFilePath = options.getJarFilePath(); List<URL> classpaths = options.getClasspaths(); if (jarFilePath == null) { throw new IllegalArgumentException("The program JAR file was not specified."); } File jarFile = new File(jarFilePath); // Check if JAR file exists if (!jarFile.exists()) { throw new FileNotFoundException("JAR file does not exist: " + jarFile); } else if (!jarFile.isFile()) { throw new FileNotFoundException("JAR file is not a file: " + jarFile); } // Get assembler class String entryPointClass = options.getEntryPointClassName(); PackagedProgram program = entryPointClass == null ? new PackagedProgram(jarFile, classpaths, programArgs) : new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs); program.setSavepointRestoreSettings(options.getSavepointRestoreSettings()); return program; }
调用多层run方法后,最后调用StandaloneClusterClient的submit方法提交任务,返回JobSubmissionResult结果。
public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings) throws CompilerException, ProgramInvocationException { ClassLoader classLoader = jobWithJars.getUserCodeClassLoader(); if (classLoader == null) { throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader."); } OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism); return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings); } ... public JobSubmissionResult run(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException { JobGraph job = getJobGraph(flinkConfig, compiledPlan, libraries, classpaths, savepointSettings); return submitJob(job, classLoader); }
以上流程调用涉及到的两个关键的run方法。
第一个run()方法
加载运行jar包中的主程序类
获取优化计划OptimizedPlan
提交到下一个run方法
第二个run()方法
创建JobGraph对象
提交任务并返回JobSubmissionResult(ClusterClient的submitJob()方法是一个抽象方法,这里实际上是调用的StandaloneClusterClient的submitJob()方法)
@Override public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { if (isDetached()) { return super.runDetached(jobGraph, classLoader); } else { return super.run(jobGraph, classLoader); } }
我们在执行flink run命令的时候,若命令行添加 -d
指定,则会走 runDetached()
;否则,走 run()
public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { waitForClusterToBeReady(); final ActorSystem actorSystem; try { actorSystem = actorSystemLoader.get(); } catch (FlinkException fe) { throw new ProgramInvocationException("Could not start the ActorSystem needed to talk to the " + "JobManager.", jobGraph.getJobID(), fe); } try { logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job completion."); this.lastJobExecutionResult = JobClient.submitJobAndWait( actorSystem, flinkConfig, highAvailabilityServices, jobGraph, timeout, printStatusDuringExecution, classLoader); return lastJobExecutionResult; } catch (JobExecutionException e) { throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), jobGraph.getJobID(), e); } }
到这里就会涉及到flink的ActorSystem,ActorSystem的分析后续文章再做讲述。
上述逻辑包含如下步骤:
等待集群状态准备就绪
获取ActorSystem实例
传入actorSystem,flinkConfig以及jobGraph等参数,调用JobClient.submitJobAndWait()方法执行并等待任务返回结果JobExecutionResult
public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException { waitForClusterToBeReady(); final ActorGateway jobManagerGateway; try { jobManagerGateway = getJobManagerGateway(); } catch (Exception e) { throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", jobGraph.getJobID(), e); } try { logAndSysout("Submitting Job with JobID: " + jobGraph.getJobID() + ". Returning after job submission."); JobClient.submitJobDetached( new AkkaJobManagerGateway(jobManagerGateway), flinkConfig, jobGraph, Time.milliseconds(timeout.toMillis()), classLoader); return new JobSubmissionResult(jobGraph.getJobID()); } catch (JobExecutionException e) { throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), jobGraph.getJobID(), e); } }
runDetached流程与run的流程类似。不同的是runDetached采用的是ActorGateway,而run采用的是ActorSystem.
创建ContextEnvironmentFactory工厂对象,并通过factory.getLastEnvCreated()获得DetachedEnvironment,并调用finalizeExecute方法。通过实例化的ClusterClient实例对象调用run方法,run方法再调用submit执行,返回JobSubmissionResult结果
这块在这里就不具体展开,有兴趣的伙伴可以自己看下源码。