转载

Flink 源码解析之从 Example 出发:读懂集群任务执行流程

Flink 源码解析之从 Example 出发:读懂集群任务执行流程

微信公众号: 深广大数据Club

关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;

如果你觉得深广大数据Club对你有帮助,欢迎赞赏

本文主要讲述Apache Flink在集群模式下提交任务的执行流程源码分析。

本地模式任务提交源码解析可以参考上篇文章《Flink源码解析 | 从Example出发:读懂本地任务执行流程》进行了解。

Apache Flink集群模式任务提交执行流程入口与本地模式入口相同。

我们还是从SocketWindowWordCount入手.

任务代码入口

SocketWindowWordCount.run

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

getExecutionEnvironment()

判断获取对应的ExecutionEnvironment对象,这里获取的对象是StreamContextEnvironment。

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        if (env instanceof ContextEnvironment) {
            return new StreamContextEnvironment((ContextEnvironment) env);
        } else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) {
            return new StreamPlanEnvironment(env);
        } else {
            return createLocalEnvironment();
        }

StreamContextEnvironment.execute()

@Override
    public JobExecutionResult execute(String jobName) throws Exception {
        Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");

        StreamGraph streamGraph = this.getStreamGraph();
        streamGraph.setJobName(jobName);

        transformations.clear();

        // execute the programs
        if (ctx instanceof DetachedEnvironment) {
            LOG.warn("Job was executed in detached mode, the results will be available on completion.");
            ((DetachedEnvironment) ctx).setDetachedPlan(streamGraph);
            return DetachedEnvironment.DetachedJobExecutionResult.INSTANCE;
        } else {
            return ctx
                .getClient()
                .run(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointRestoreSettings())
                .getJobExecutionResult();
        }
    }

若不指定detached模式,则执行else代码块中的代码

  • 获取ClusterClient对象,执行run方法

  • 设置jar,classpath,classloader,savePointRestore

  • 获取JobExecutionResult

ClusterClient.java

public JobSubmissionResult run(PackagedProgram prog, int parallelism)
            throws ProgramInvocationException, ProgramMissingJobException {
    ...
    if (prog.isUsingProgramEntryPoint()) {
        ...
    }else if (prog.isUsingInteractiveMode()) {
        ...
    }
}

任务运行提交的时候判断是使用交互模式还是使用程序入口点。

    public boolean isUsingInteractiveMode() {
        return this.program == null;
    }

    public boolean isUsingProgramEntryPoint() {
        return this.program != null;
    }

判断条件则是program是否为null。

从flink run入手

flink run脚本调用CliFrontend.java类中的run方法。

我们执行run方法启动程序,调用buildProgram()方法初给program赋值,之后传递给runProgram()继续往下调用。

final Options commandOptions = CliFrontendParser.getRunCommandOptions();

        final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions);

        final CommandLine commandLine = CliFrontendParser.parse(commandLineOptions, args, true);

        final RunOptions runOptions = new RunOptions(commandLine);

        final PackagedProgram program;
        try {
            LOG.info("Building program from JAR file");
            program = buildProgram(runOptions);
        }
        ...
        runProgram(customCommandLine, commandLine, runOptions, program);
        ...

从以下代码可以看出,这里buildProgram其实就是我们提交jar包的方式。

PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundException, ProgramInvocationException {
        String[] programArgs = options.getProgramArgs();
        String jarFilePath = options.getJarFilePath();
        List<URL> classpaths = options.getClasspaths();

        if (jarFilePath == null) {
            throw new IllegalArgumentException("The program JAR file was not specified.");
        }

        File jarFile = new File(jarFilePath);

        // Check if JAR file exists
        if (!jarFile.exists()) {
            throw new FileNotFoundException("JAR file does not exist: " + jarFile);
        }
        else if (!jarFile.isFile()) {
            throw new FileNotFoundException("JAR file is not a file: " + jarFile);
        }

        // Get assembler class
        String entryPointClass = options.getEntryPointClassName();

        PackagedProgram program = entryPointClass == null ?
                new PackagedProgram(jarFile, classpaths, programArgs) :
                new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);

        program.setSavepointRestoreSettings(options.getSavepointRestoreSettings());

        return program;
    }

isUsingProgramEntryPoint

调用多层run方法后,最后调用StandaloneClusterClient的submit方法提交任务,返回JobSubmissionResult结果。

ClusterClient.java

public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings)
            throws CompilerException, ProgramInvocationException {
        ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
        if (classLoader == null) {
            throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
        }

        OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
        return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings);
    }
    ...
    public JobSubmissionResult run(FlinkPlan compiledPlan,
            List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)
            throws ProgramInvocationException {
        JobGraph job = getJobGraph(flinkConfig, compiledPlan, libraries, classpaths, savepointSettings);
        return submitJob(job, classLoader);
    }

以上流程调用涉及到的两个关键的run方法。

  • 第一个run()方法

    • 加载运行jar包中的主程序类

    • 获取优化计划OptimizedPlan

    • 提交到下一个run方法

  • 第二个run()方法

    • 创建JobGraph对象

    • 提交任务并返回JobSubmissionResult(ClusterClient的submitJob()方法是一个抽象方法,这里实际上是调用的StandaloneClusterClient的submitJob()方法)

StandaloneClusterClient.java

@Override
    public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader)
            throws ProgramInvocationException {
        if (isDetached()) {
            return super.runDetached(jobGraph, classLoader);
        } else {
            return super.run(jobGraph, classLoader);
        }
    }

我们在执行flink run命令的时候,若命令行添加 -d 指定,则会走 runDetached() ;否则,走 run()

run()

public JobExecutionResult run(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {

        waitForClusterToBeReady();

        final ActorSystem actorSystem;

        try {
            actorSystem = actorSystemLoader.get();
        } catch (FlinkException fe) {
            throw new ProgramInvocationException("Could not start the ActorSystem needed to talk to the " +
                "JobManager.", jobGraph.getJobID(), fe);
        }

        try {
            logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job completion.");
            this.lastJobExecutionResult = JobClient.submitJobAndWait(
                actorSystem,
                flinkConfig,
                highAvailabilityServices,
                jobGraph,
                timeout,
                printStatusDuringExecution,
                classLoader);

            return lastJobExecutionResult;
        } catch (JobExecutionException e) {
            throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), jobGraph.getJobID(), e);
        }
    }

到这里就会涉及到flink的ActorSystem,ActorSystem的分析后续文章再做讲述。

上述逻辑包含如下步骤:

  • 等待集群状态准备就绪

  • 获取ActorSystem实例

  • 传入actorSystem,flinkConfig以及jobGraph等参数,调用JobClient.submitJobAndWait()方法执行并等待任务返回结果JobExecutionResult

runDetached()

public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {

        waitForClusterToBeReady();

        final ActorGateway jobManagerGateway;
        try {
            jobManagerGateway = getJobManagerGateway();
        } catch (Exception e) {
            throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.",
                jobGraph.getJobID(), e);
        }

        try {
            logAndSysout("Submitting Job with JobID: " + jobGraph.getJobID() + ". Returning after job submission.");
            JobClient.submitJobDetached(
                new AkkaJobManagerGateway(jobManagerGateway),
                flinkConfig,
                jobGraph,
                Time.milliseconds(timeout.toMillis()),
                classLoader);
            return new JobSubmissionResult(jobGraph.getJobID());
        } catch (JobExecutionException e) {
            throw new ProgramInvocationException("The program execution failed: " + e.getMessage(),
                jobGraph.getJobID(), e);
        }
    }

runDetached流程与run的流程类似。不同的是runDetached采用的是ActorGateway,而run采用的是ActorSystem.

isUsingInteractiveMode

创建ContextEnvironmentFactory工厂对象,并通过factory.getLastEnvCreated()获得DetachedEnvironment,并调用finalizeExecute方法。通过实例化的ClusterClient实例对象调用run方法,run方法再调用submit执行,返回JobSubmissionResult结果

这块在这里就不具体展开,有兴趣的伙伴可以自己看下源码。

关注公众号

Flink 源码解析之从 Example 出发:读懂集群任务执行流程

原文  https://mp.weixin.qq.com/s/dY6MAUFogNG7Mo_oaKpOnQ
正文到此结束
Loading...