Flink在DataStream中也是通过一个特定的可迭代的流(IterativeStream)来构建相关的迭代处理逻辑,这一点跟DataSet提供的可迭代的数据集(IterativeDataSet)的是类似的。
IterativeStream继承自DataStream,因此DataStream支持的转换函数,在IterativeStream上同样可以调用。
IterativeStream的实例是通过DataStream的iterate方法创建的˙。iterate方法存在两个重载形式:一种是无参的,表示不限定最大等待时间;另一种提供一个长整型的maxWaitTimeMillis参数,允许用户指定等待反馈边的下一个输入元素的最大时间间隔。而迭代的关闭是通过调用IterativeStream的实例方法closeWith来实现的。
每一种数据流(DataStream)都会有与之对应的流转换(StreamTransformation)。IterativeStream对应的转换是FeedbackTransformation。
我们来看一下closeWith方法的实现:
publicDataStream<T>closeWith(DataStream<T> feedbackStream){ //基于需要反馈给迭代头的反馈流对象获取其所有前任的StreamTransformation对象,目的是为了下文的检查 Collection<StreamTransformation<?>> predecessors = feedbackStream.getTransformation().getTransitivePredecessors(); //在反馈流的所有前任StreamTransformation对象集合中查找当前可迭代的流(迭代头)对应的转换对象是否在其中, //如果不在,则抛出异常 if (!predecessors.contains(this.transformation)) { throw new UnsupportedOperationException( "Cannot close an iteration with a feedback DataStream " + " that does not originate from said iteration."); } //将反馈流对应的转换对象作为迭代头的反馈边 ((FeedbackTransformation) getTransformation()).addFeedbackEdge(feedbackStream.getTransformation()); return feedbackStream; }
解释一下上文代码中为什么要检查前任StreamTransformation对象的原因。我们结合上一篇的案例中的代码片段来看:
IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> iterativeStream = inputStream.map(new TupleTransformMapFunction()).iterate(5000); DataStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> fibonacciStream = iterativeStream.map(new FibonacciCalcStepFunction()); SplitStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> branchedStream = fibonacciStream.split(new FibonacciOverflowSelector()); iterativeStream.closeWith(branchedStream.select(ITERATE_FLAG));
这里传递给closeWith的是branchedStream筛选出的数据流,而从branchedStream向上是能追溯到可迭代的流iterativeStream的,因此满足前任追溯到迭代头的条件。所以这里需要基于前任向上游追溯的原因是确保反馈流是的源头是来自迭代头(从而形成迭代这样一个闭环),而不是任意的某个流都可以作为反馈流。
另外,IterativeStream通过调用withFeedbackType方法还可以改变或者重新指定迭代反馈流的类型,从而形成一个跟最初的输入流组合而成的连接迭代流(ConnectedIterativeStreams),这一点也是批处理中的迭代所不具备的。示例代码如下:
DataStream<Integer> source = ...; ConnectedIterativeStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(Integer.class);
ConnectedIterativeStreams是ConnectedStreams的特例,它表示将迭代最初的输入与反馈边的输入进行连接所形成的ConnectedStreams。ConnectedIterativeStreams的构造器会要求指定反馈流的数据类型信息(TypeInformation),你可以为其指定Flink所支持的任意类型。ConnectedIterativeStreams对应的转换是CoFeedbackTransformation,我们在下面会顺带介绍。
当IterativeStream转变为双流连接而成的ConnectedIterativeStreams,转换也从FeedbackTransformation转变为CoFeedbackTransformation,因此ConnectedIterativeStreams也提供了自己的closeWith方法来将CoFeedbackTransformation添加为自己的反馈边。在实现上和IterativeStream是类似的,不再赘述。
迭代流(IterativeStream)对应的转换是反馈转换(FeedbackTransformation),它表示拓扑中的一个反馈点(也即迭代头)。一个反馈点包含一个输入边以及若干个反馈边,且Flink要求每个反馈边的并行度必须跟输入边的并行度一致,这一点在往该转换中加入反馈边时会进行校验。
这里并行度一致的原因是Flink将采用一种CoLocationGroup来优化迭代任务的子任务执行。当一组作业顶点(JobVertex,一个任务在JobGraph中的表示)被包含在同一个CoLocationGroup中的时候,这些JobVertex在运行时所对应的任务的第i个子任务必须运行在同一个TaskManager的JVM实例中。那么一个分布式的迭代作业,其迭代部分是就是并行度个执行体在并行执行,而每个执行体中的子任务都在位于同一个TaskManager的实例中多线程的形式并发地执行,其中还涉及到并发环境下的数据交换,后续会进行分析。
当IterativeStream对象被构造时,FeedbackTransformation的实例会被创建并传递给DataStream的构造方法:
protectedIterativeStream(DataStream<T> dataStream,longmaxWaitTime){ super(dataStream.getExecutionEnvironment(), new FeedbackTransformation<>(dataStream.getTransformation(), maxWaitTime)); this.originalInput = dataStream; this.maxWaitTime = maxWaitTime; setBufferTimeout(dataStream.environment.getBufferTimeout()); }
每一个流转换对象都要实现获得其前任转换对象集合的getTransitivePredecessors方法,FeedbackTransformation对该方法的实现如下:
public Collection<StreamTransformation<?>> getTransitivePredecessors() { List<StreamTransformation<?>> result = Lists.newArrayList(); //先加入自身 result.add(this); //加入其输入端的所有前任转换对象 result.addAll(input.getTransitivePredecessors()); return result; }
在上面分析IterativeStream时,我们提过它可以转换为ConnectedIterativeStreams,ConnectedIterativeStreams对应的CoFeedbackTransformation这里我们也一并分析一下。CoFeedbackTransformation跟FeedbackTransformation一样都表示拓扑中的一个反馈点。对于CoFeedbackTransformation转换,它不要求反馈边元素的类型跟上游输入端元素的类型一致。因为上游流将会成为该转换的第一个输入,而反馈流将会成为该转换的第二个输入。因为两个流会在此连接,所以CoFeedbackTransformation后只允许跟TwoInputTransformations类型的转换。
CoFeedbackTransformation同样对输入端的并行度和反馈边的并行度有一定的限制,它也要求两者的并行度必须相等,但是它们的分区策略可以是不一致的。
Flink在创建ConnectedIterativeStreams流对象时,会用迭代流的初始输入来作为ConnectedIterativeStreams的第一个输入流,然后用CoFeedbackTransformation来构建参与连接的第二个流对象,这里可以指定跟迭代流类型不同的feedbackType作为第二个流的类型:
public ConnectedIterativeStreams(DataStream<I> input, TypeInformation<F> feedbackType, long waitTime) { super(input.getExecutionEnvironment(), input, new DataStream<>(input.getExecutionEnvironment(), new CoFeedbackTransformation<>(input.getParallelism(), feedbackType, waitTime))); this.coFeedbackTransformation = (CoFeedbackTransformation<F>) getSecondInput().getTransformation(); }
这一篇我们介绍了IterativeStream以及ConnectedIterativeStreams所对应的转换对象,下一篇我们分析StreamGraph的迭代相关的内容时,将会剖析Flink如何将FeedbackTransformation转换为算子。
微信扫码关注公众号:Apache_Flink
QQ扫码关注QQ群:Apache Flink学习交流群(123414680)