上一篇文章我们探讨了基于定时任务的周期性检查点触发机制以及基于Akka的 actor
模型的消息驱动协同机制。这篇文章我们将探讨Zookeeper在Flink的 Fault Tolerance
中所起到的作用。
其实,Flink引入Zookeeper的目的主要是让 JobManager
实现高可用(leader选举)。
因为Zookeeper在Flink里存在多种应用场景,本篇我们还是将重心放在 Fault Tolerance
上,即讲解Zookeeper在检查点的恢复机制上发挥的作用。
如果用一幅图表示快照机制(检查点)大致的流程可见下图:
跟本文相关的主要有4,5,6三步
因为跟本文切实相关,所以先介绍一下 JobManager
的 RecoveryMode
(恢复模式)。 RecoveryMode
作为一个枚举类型,它有两个枚举值:
STANDALONE
表示不对 JobManager
的失败进行恢复。而 ZOOKEEPER
表示 JobManager
将基于Zookeeper实现HA(高可用)。
在前面的文章中已经提及过Flink里的检查点分为两种: PendingCheckpoint
(正在处理的检查点)和 CompletedCheckpoint
(完成了的检查点)。
PendingCheckpoint
表示一个检查点已经被创建,但还没有得到所有该应答的 task
的应答。一旦所有的 task
都给予应答,那么它将会被转化为一个 CompletedCheckpoint
。 PendingCheckpoint
通过 toCompletedCheckpoint
实例方法来将其转化为已完成了的检查点。其核心实现如下:
if (notYetAcknowledgedTasks.isEmpty()) {
CompletedCheckpoint completed = new CompletedCheckpoint(jobId, checkpointId,checkpointTimestamp, System.currentTimeMillis(), new ArrayList<StateForTask>(collectedStates));
dispose(null, false);
return completed;
}
它会检查还没有ack该检查点的 task
集合,如果集合为空(即所有 task
都已应答),则基于当前实例的属性构建一个 CompletedCheckpoint
的实例,并最终返回新创建的实例。但在返回之前,调用了 dispose
进行资源释放。
这个 dispose
方法是一个私有方法,其内部实现依赖于 releaseState
这个flag,上面的 dispose
调用将其置为 false
,意为不释放 task
状态:
if (releaseState) {
for (StateForTask state : collectedStates) {
state.discard(userClassLoader);
}
}
但最终, collectedStates
这个集合总是会被清空:
collectedStates.clear();
notYetAcknowledgedTasks.clear();
toCompletedCheckpoint
方法为什么不释放 task
的状态呢,因为它的语义只是提供 转化 操作,其实 collectedStates
这个集合已经在构造 CompletedCheckpoint
时被深拷贝给 CompletedCheckpoint
的实例了。而这些 task
的状态其最终的释放,将会由 CompletedCheckpoint
的 discard
方法完成。
PendingCheckpoint
的公共的 discard
方法的实现就会直接释放收集的状态集合:
public void discard(ClassLoader userClassLoader) {
dispose(userClassLoader, true);
}
公共的 discard
方法常用于检查点 超时回收 以及当最新的检查点已经完成时,距离当前时间更久的未完成的检查点的 自动失效 。
CompletedCheckpoint
表示一个已经成功完成了得检查点,当一个检查点在得到所有要求的 task
的应答之后被认为是一个 已完成 的检查点。
根据 JobManager
的恢复模式,Flink提供了两种已完成的检查点的存储机制的实现:
他们都实现了接口 CompletedCheckpointStore
,这个接口提供了思个值得关注的方法:
CompletedCheckpoint
的实例 针对 RecoveryMode
为 STANDALONE
提供了 StandaloneCompletedCheckpointStore
。它提供了一个基于JVM堆内存的 ArrayDeque
来存放检查点。
而针对 RecoveryMode
为 ZOOKEEPER
提供的 ZooKeeperCompletedCheckpointStore
要复杂得多。这也是我们关注的重点。它的实现依赖于两个存储机制:
在Zookeeper中的分布式存储:
private final ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper;
本地JVM内存中的存储:
private final ArrayDeque<Tuple2<StateHandle<CompletedCheckpoint>, String>> checkpointStateHandles;
我们先来看恢复方法 recover
,恢复的过程首先是从Zookeeper获取所有的检查点,这里为了规避 并发修改 带来的失败,采用了 循环重试 的机制:
while (true) {
try {
initialCheckpoints = checkpointsInZooKeeper.getAllSortedByName();
break;
}
catch (ConcurrentModificationException e) {
LOG.warn("Concurrent modification while reading from ZooKeeper. Retrying.");
}
}
在恢复时,将从Zookeeper中读取最新的检查点,如果检查点超过一个,仅仅最新的那个检查点有效,旧的都会被丢弃。如果存在着网络分区,多个 JobManager
的实例并发对相同的程序实行检查点,那么选择任意一个验证通过的已完成的检查点都是没有问题的。
if (numberOfInitialCheckpoints > 0) {
// Take the last one. This is the latest checkpoints, because path names are strictly
// increasing (checkpoint ID).
Tuple2<StateHandle<CompletedCheckpoint>, String> latest = initialCheckpoints
.get(numberOfInitialCheckpoints - 1);
CompletedCheckpoint latestCheckpoint = latest.f0.getState(userClassLoader);
checkpointStateHandles.add(latest);
LOG.info("Initialized with {}. Removing all older checkpoints.", latestCheckpoint);
for (int i = 0; i < numberOfInitialCheckpoints - 1; i++) {
try {
removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i));
}
catch (Exception e) {
LOG.error("Failed to discard checkpoint", e);
}
}
}
而 discardAllCheckpoints
方法会做四件事:
每个检查点都有各自的编号,为 Long
类型。根据 JobManager
的恢复模式分别提供了两种计数器:
计数器在这里被认为是一种服务,它具备 start
和 stop
方法
StandaloneCheckpointIDCounter
只是简单得对 AtomicLong
进行了包装,因为在这种模式下, JobManager
几乎是不可恢复的,所以这么做就足够了。
ZooKeeperCheckpointIDCounter
是基于Zookeeper实现的一种分布式原子累加器。具体的做法是每一个计数器,在Zookeeper上新建一个 ZNode
,形如:
/flink/checkpoint-counter/<job-id> 1 [persistent]
....
/flink/checkpoint-counter/<job-id> N [persistent]
在Zookeeper中的检查点编号被要求是升序的,这可以使得我们在 JobManager
失效的情况下,可以拥有一个共享的跨 JobManager
实例的计数器。
值得一提的是,这里使用的Zookeeper的客户端是 CuratorFramework
,同时还利用了它附带的 SharedCount
这一 recipes
来作为分布式共享的计数器。
而在累加接口方法 getAndIncrement
的实现上,使用了 循环尝试 的机制:
public long getAndIncrement() throws Exception {
while (true) {
ConnectionState connState = connStateListener.getLastState();
if (connState != null) {
throw new IllegalStateException("Connection state: " + connState);
}
VersionedValue<Integer> current = sharedCount.getVersionedValue();
Integer newCount = current.getValue() + 1;
if (sharedCount.trySetCount(current, newCount)) {
return current.getValue();
}
}
}
另外从 stop
方法的实现来看,如果一个计数器停止,则会再Zookeeper中删除其对应的 ZNode
。
所谓的检查点恢复服务,其实就是聚合了上面的 已完成的检查点存储 以及 检查点编号计数器 这两个功能。因为Flink提供了 STANDALONE
以及 ZOOKEEPER
这两个恢复模式,所以这里存在一个基于不同模式创建服务的工厂接口 CheckpointRecoveryFactory
。并针对这两种恢复模式分别提供了两个工厂: StandaloneCheckpointRecoveryFactory
以及 ZooKeeperCheckpointRecoveryFactory
。
具体的功能聚合体现在这两个方法上:
/**
* Creates a {@link CompletedCheckpointStore} instance for a job.
*
* @param jobId Job ID to recover checkpoints for
* @param userClassLoader User code class loader of the job
* @return {@link CompletedCheckpointStore} instance for the job
*/
CompletedCheckpointStore createCompletedCheckpoints(JobID jobId, ClassLoader userClassLoader)
throws Exception;
/**
* Creates a {@link CheckpointIDCounter} instance for a job.
*
* @param jobId Job ID to recover checkpoints for
* @return {@link CheckpointIDCounter} instance for the job
*/
CheckpointIDCounter createCheckpointIDCounter(JobID jobId) throws Exception;
两个工厂的具体实现并没有什么特别的地方。检查点恢复服务会被 JobManager
使用到。
本篇文章我们主要分析了,Zookeeper在Flink的 Fault Tolerance
机制中发挥的作用。但因为Zookeeper在Flink中得主要用途是实现 JobManager
的高可用,所以里面的部分内容多少还是跟这一主题有所联系。
微信扫码关注公众号:Apache_Flink
QQ扫码关注QQ群:Apache Flink学习交流群(123414680)