今天我们来解读一下Flink stream里的 source 模块。它是整个stream的入口,也是我们了解其流处理体系的入口。
SourceFunction是所有stream source的根接口。
它继承自一个标记接口(空接口) Function
。
SourceFunction
定义了两个接口方法:
正常情况下,一个 SourceFunction
实现这两个接口方法就可以了。其实这两个接口方法也固化了一种实现 模板 。
比如,实现一个XXXSourceFunction,那么大致的模板是这样的:
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<T> ctx) throws Exception {
while (isRunning && otherCondition == true) {
ctx.collect(getElement());
}
}
@Override
public void cancel() {
isRunning = false;
}
Flink将Source的运行机制跟其如何emit元素进行了分离。具体如何emit元素,取决于另外一个独立的接口 SourceContext
。 SourceFunction
以内部接口的方式定义了该上下文接口对象,将具体的实现抛给具体的sourceFunction。该接口中定义了emit元素的接口方法:
System#currentTimeMillis()
),这种由当前source自动追加的时间戳,在Flink里称之为 Ingress Time
(即摄入时间)。 Event Time
(即用户自行设置的事件时间)。 Watermark
。 这里有几个时间概念可参考我之前的文章: http://vinoyang.com/2016/05/02/flink-concepts/#时间
Watermark:Flink用 Watermark
来对上面的 Event Time
类型的事件进行窗口处理。所谓的 Watermark
是一个时间基准。WaterMark包含一个时间戳,Flink使用 WaterMark
标记所有小于该时间戳的消息都已流入,Flink的数据源在确认所有小于某个时间戳的消息都已输出到Flink流处理系统后,会生成一个包含该时间戳的 WaterMark
,插入到消息流中输出到Flink流处理系统中,Flink操作符按照时间窗口缓存所有流入的消息,当operator处理到WaterMark时,它对所有小于该WaterMark时间戳的时间窗口数据进行处理并发送到下一个operator节点,然后也将WaterMark发送到下一个operator节点。
source相关的完整类图如下:
一个抽象类,继承自 AbstractRichFunction
。为实现一个 Rich SourceFunction提供基础能力(其实所谓的Rich,主要是提供某种范式或者模板帮助你完成一部分基础实现)。该类的子类有两个,不过他们仍然是抽象类,只是在此基础上提供了更具体的实现:
MessageAcknowledgingSourceBase
的基础上针对ID应答机制进行了更为细分的处理,支持两种ID应答模型: session id
和 unique message id
。 该接口只是个标记接口,用于标识继承该接口的Source都是并行执行的。其直接实现类是 RichParallelSourceFunction
,它是一个抽象类并继承自 AbstractRichFunction
(从名称可以看出,它应该兼具 rich
和 parallel
两个特性,这里的 rich 体现在它定义了 open
和 close
这两个方法)。
继承 RichParallelSourceFunction
的那些SourceFunction意味着它们都是并行执行的并且可能有一些资源需要open/close,Flink提供了这么几个实现:
InputFormat
作为数据源记录的生产器(它可以接收一个file path来基于文件生产记录),根据给定的 TypeInformation
来产生序列化器,再结合内部创建的 splitIterator
实现了一个基于文件的sourceFunction。 DeserializationSchema
,该属性是一个协议接口,用于定义如何将二进制数据反序列化为Java/Scala对象。 start
和 end
作为一个发射序列的区间,然后根据一定的算法算得需要发射的时间间隔,并保证区间内的元素送达具有 exactly once
的强一致性,具体的计算方式需要结合当前task的subtask的数量以及当前subtask在集合中的索引计算得出。 SplittableIterator
(它是一个全局的iterator)结合当前task运行时subtask的数量,以及该subtask在所有subtask中的序号计算出分区(partition)从而产生一个细分的 Iterator
。通过 Iterator
迭代来发射元素。 该Source是以监控给定 path
位置的文件为手段,根据给定的 interval
作为时间间隔,emit的内容依赖监控文件的变。Flink为这种形式的Source提供了三种watchtype :
public enum WatchType {
ONLY_NEW_FILES, //仅关注新文件产生
REPROCESS_WITH_APPENDED, //当有文件产生变更,该文件的所有内容都需要被重新处理
PROCESS_ONLY_APPENDED //当有文件产生变更,只有变更的内容需要被处理
}
该类型的Source始终发射的是一个三元组(Tuple3),它包含三个元素:
watchtype的不同主要影响发射元素的内容。当WatchType的类型为 ONLY_NEW_FILES
或 REPROCESS_WITH_APPENDED
类型时, offset
会被设置为0, fileSize
被设置为-1。而WatchType类型为 PROCESS_ONLY_APPENDED
,则三个值都为其对应的真实值。
根据给定的 hostname
和 port
,以socket的方式进行通信并获取数据,以 delimiter
参数给定的字符作为终止标识符。
该Source接收一个迭代器,然后在发射循环体中,依次迭代发射数据。
该Source接收一个元素迭代器(一组元素的集合),以Flink的类型序列化机制将其序列化为二进制数据,然后在发射元素的循环体中,进行反序列化为初始类型,再发射数据。
这里先序列化为二进制,再从二进制反序列化为最初的对象类型。不是特别容易理解,乍一看多此一举,让人匪夷所思。其实,这么做是有原因的,是因为Flink的序列化机制是其自定义的,并且跟其自主管理内存紧密联系在一起(想了解其自主内存管理的可参看我之前的系列文章)。而自主内存管理又涉及到二进制数据的存储。 FromElementsFunction
支持从某个 check point
部分恢复,所以必须先还原其原先的存储位置(通过序列化),然后跳过不需要emit的元素,然后再发射需要发射的元素(将这些元素反序列化)。
Flink自身提供了一些针对第三方主流开源系统的连接器支持,它们有:
这些连接器有些可以同时作为 source
和 sink
。因为我们今天的主题是source,所以我们先来看看以上这些被支持的连接器它们的source都是继承自刚刚我们谈到的哪些接口或者类。
这篇文章我们主要谈及了Flink的stream source相关的设计、实现。当然这个主题还没有完全谈完,还会有后续篇幅继续解读。
微信扫码关注公众号:Apache_Flink