用户编写的程序逻辑需要提交给Flink才能得到执行。本文来探讨一下客户程序如何提交给Flink。鉴于用户将自己利用Flink的API编写的逻辑打成相应的应用程序包(比如Jar)然后提交到一个目标Flink集群上去运行是比较主流的使用场景,因此我们的分析也基于这一场景进行。
Flink的API针对不同的执行环境有不同的 Environment
对象,这里我们主要基于常用的 RemoteStreamEnvironment
和 RemoteEnvironment
进行分析
在前面我们谈到了Flink中实现了“惰性求值”,只有当最终调用 execute
方法时,才会“真正”开始执行。因此, execute
方法是我们的切入点。
其源码位于 org.apache.flink.streaming.api.environment.RemoteStreamEnvironment
首先,我们来看一下其 execute
方法触发的关键方法调用链示意图:
根据上图的调用链,我们针对这些关键方法进行剖析,当然一些细节性的内容我们可能会暂时略过,这样可以保证主路径一直都很清晰。
getStreamGraph
方法用于获得一个 StreamGraph
的实例,该实例表示流的完整的拓扑结构并且包含了生成 JobGraph
所必要的相关信息(包含了 source
、 sink
的集合以及这些在图中的“节点”抽象化的表示、一些虚拟的映射关系、执行和检查点的配置等)。
获得 StreamGraph
之后,通过调用 executeRemotely
方法进行远程执行。该方法首先根据获取到的用户程序包的路径以及类路径创建加载用户代码的类加载器:
ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(
jarFiles, globalClasspaths, getClass().getClassLoader());
紧接着根据配置构建Client对象(Client对象是真正跟JobManager对接的内部代理):
Client client;
try {
client = new Client(configuration);
client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
}catch (Exception e) {
throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), e);
}
后面的事情就此被 Client
接管:
try {
return client.runBlocking(streamGraph, jarFiles, globalClasspaths, usercodeClassLoader);
}catch (ProgramInvocationException e) {
throw e;
}catch (Exception e) {
String term = e.getMessage() == null ? "." : (": " + e.getMessage());
throw new ProgramInvocationException("The program execution failed" + term, e);
}finally {
client.shutdown();
}
client
对象调用了 runBlocking
以阻塞式的行为“运行”用户程序并等待返回 JobExecutionResult
对象作为 Job
的执行结果。执行完成,最终在 finally
块中,调用 shutdown
方法关闭并释放资源。
runBlocking
被调用后,调用链跳转到Client类中。为了适配多种提交方式以及运行模式, runBlocking
方法有着非常多的重载。在当前的远程执行环境下, runBlocking
在多个重载方法之间跳转的过程中,会调用 getJobGraph
方法获得 JobGraph
的实例。 JobGraph
表示Flink dataflow
程序,它将会被 JobManager
所理解并接收。在某个 Job
被提交给 JobManager
之前,通过Flink提供的高层次的API都将会被转化为 JobGraph
表示。关于如何获得JobGraph的实现,我们后面会进行剖析。这里,让我们忽视这些细节,进入下一个关键方法。
runBlocking_1
其实是 runBlocking
方法的重载,这里加一个后缀标识,只是为了跟上面的 runBlocking
进行区别。runBlocking_1方法中,首先利用 LeaderRetrievalUtils
创建了 LeaderRetrievalService
这一服务对象:
LeaderRetrievalService leaderRetrievalService;
try {
leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(config);
} catch (Exception e) {
throw new ProgramInvocationException("Could not create the leader retrieval service.", e);
}
顾名思义, LeaderRetrievalService
在Flink中提供查找主节点的服务。它会根据Flink的配置信息(主要是recovery.mode来判断基于哪种恢复机制来创建该服务。当前有两种模式:一种是 Standalone
的独立运行模式;另一种是基于 Zookeeper
的高可用模式)。Flink提供了一个称之为 LeaderRetrievalListener
的回调接口来获得主节点的信息。接下来,就是调用 JobClient
的 submitJobAndWait
方法将产生的 JobGraph
以及主节点查找的服务对象等相关信息提交给 JobManager
并等待返回结果:
try {
this.lastJobID = jobGraph.getJobID();
return JobClient.submitJobAndWait(actorSystem, leaderRetrievalService, jobGraph,
timeout, printStatusDuringExecution, classLoader);
} catch (JobExecutionException e) {
throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
}
上面的 submitJobAndWait
方法的第一个参数 actorSystem
是 ActorSystem
的实例。在构造 Client
对象时创建,在Job提交并获得返回结果后通过调用 Client
的 shutdown
方法关闭:
public void shutdown() {
if (!this.actorSystem.isTerminated()) {
this.actorSystem.shutdown();
this.actorSystem.awaitTermination();
}
}
该方法的调用见上面 executeRemotely
方法的代码段的finally语句块。
JobClient
的出现可能会让你产生疑惑——它跟 Client
是什么关系?作用是什么?下面这幅示意图可以用来解释这些疑问:
上面这幅图展示了 Client
对象与其他几个对象的关系。 JobClient
在其中起到了“桥接”作用,它在基于API的编程层面上桥接了同步的方法调用和异步的消息通信。更具体得说, JobClient
可以看做是一个“静态类”提供了一些静态方法,这里我们主要关注上面的 submitJobAndWait
方法,该方法内部封装了 Actor
之间的异步通信(具体的通信对象是 JobClientActor
,它负责跟 JobManager
的 ActorSystem
的 Actor
进行通信),并以阻塞的形式返回结果。而 Client
只需调用 JobClient
的这些方法,而无需关注其内部是如何实现的。
通过调用 JobClient
的静态方法 submitJobAndWait
,会触发基于 Akka
的 Actor
之间的消息通信来完成后续的提交JobGraph的动作。 JobClient
提交 Job
的基于消息交互的抽象示意图如下:
总体来说这里总共有两个 ActorSystem
,一个归属于 Client
,另一个归属于 JobManager
。在 submitJobAndWait
方法中,其首先会创建一个 JobClientActor
的 ActorRef
:
ActorRef jobClientActor = actorSystem.actorOf(jobClientActorProps);
然后向其发起一个 SubmitJobAndWait
消息,该消息将 JobGraph
的实例提交给 jobClientActor
。该消息的发起模式是 ask ,它表示需要一个应答消息。
JobClient向JobClientActor发送消息的代码段如下所示:
Future<Object> future = Patterns.ask(jobClientActor,
new JobClientMessages.SubmitJobAndWait(jobGraph),
new Timeout(AkkaUtils.INF_TIMEOUT()));
answer = Await.result(future, AkkaUtils.INF_TIMEOUT());
JobClient
会阻塞等待该 future
返回结果。在得到返回结果answer之后,先进行解析判断它是 Job
被成功执行返回的结果还是失败返回的结果。
至此,Client提交Streaming Job的关键方法调用路径已梳理完成。这里为了突出主路线,同时避免被太多的实现细节干扰,我们暂时忽略了一些重要数据结构和关键概念的解读。不过,后续我们会对它们进行分析。
微信扫码关注公众号:Apache_Flink
QQ扫码关注QQ群:Apache Flink学习交流群(123414680)