先看使用方式
List<Integer> list = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 8, 8, 9, 9, 9, 1, 2, 3, 2, 3, 4); String[] strings = Flow.of(list) .filter(x -> x % 2 == 0) .map(x -> x + "s") .limit(30) .skip(1) .distinct() .sorted(Comparator.reverseOrder()) .toArray(String[]::new); ///[8s, 6s, 4s, 2s] System.out.println(Arrays.toString(strings));
new class | java8 class | mean |
---|---|---|
Flow |
Stream |
流 |
Visitor |
Spliterator |
迭代器 多了个获取数量的方法 |
Stage |
Sink |
表示一次操作 分为 begin/accept/end 三步 |
Flow
Flow.of(Collection<E>)
Flow.of(Visitor<E>)
Iterator
的子接口)创建流 Flow.of(E...)
Flow.of(E)
Flow.empty()
Flow.iterate(E seed, UnaryOperator<E> f)
Flow.generate(Supplier s)
无状态操作
有状态操作
终止操作
AbstractFlow<S, T> implements Flow<T>
是实现类。 每个无状态操作或有状态操作都不实际执行那个操作, 而是记录当前要执行的动作,然后返回一个新的流, 在终止操作调用时,再一次性遍历数据源,依次执行所有操作。
实现过程
先了解下 Java8 的 ReferencePipeline
, Sink
, java.util.stream.AbstractPipeline.wrapAndCopyInto
尝试写出 filter
, foreach
的方法体
public Flow<T> filter(Predicate<? super T> predicate) { Objects.requireNonNull(predicate); return new AbstractFlow<T, T>(this) { @Override Stage<T> wrapDownstream(Stage<T> nextStage) { //? } }; }
wrapDownstream
对应 java.util.stream.AbstractPipeline.opWrapSink
.
Stage
对应 Sink
表示一个要执行的动作。包含三个方法
begin(long size) accept(T) end() canFinish()
如何将每个动作串起来:通过 downstream 这个表示下游操作的变量 —— 每个 Stage 将自己的动作执行完后,调用下游动作的相应方法。
终止方法调用时,才会开始遍历,看下终止方法:
@Override public void forEach(Consumer<? super T> action) { Objects.requireNonNull(action); terminal(new AbstractTerminal<T, Void>(this) {...}); } /** * 终止方法 * * @param <R> 返回类型 * @param terminalStage 终止操作 */ private <R> R terminal(Stage.TerminalStage<T, R> terminalStage) { return terminalStage.startAndGet(source); } /** * 终止操作的基类 * * @param <T> 流中元素类型 * @param <R> 该终止操作的返回类型 通过 {@link Supplier#get()} 返回结果 * @see Supplier#get() */ static abstract class AbstractTerminal<T, R> implements Stage.TerminalStage<T, R>, Supplier<R> { private AbstractFlow<?, T> lastFlow; //... @Override public R startAndGet(Visitor<?> in) { lastFlow.start(in, this); return get(); } } /** * 开始执行整串流各阶段的操作 * * @param <S_IN> 源迭代器的元素类型 */ private <S_IN> void start(Visitor<S_IN> in, Stage.TerminalStage<T, ?> terminalStage) { Stage<S_IN> stage = wrapStage(terminalStage); stage.begin(in.getSizeIfKnown()); while (in.hasNext() && !stage.canFinish()) { stage.accept(in.next()); } stage.end(); } /** * 将每个阶段的操作串起来 真正开始处理流时调用 * * @param stage 最后一个操作 * @param <S_IN> 源迭代器的元素类型 * @return 将整个串的每个操作串起来作为一个操作 */ @SuppressWarnings("unchecked") private <S_IN> Stage<S_IN> wrapStage(Stage<T> stage) { for (AbstractFlow flow = this; flow.prev != null; flow = flow.prev) { /* flow.prev != null 即头节点不参与*/ stage = flow.wrapDownstream(stage); } return (Stage<S_IN>) stage; }
解释
在终止方法调用时,也有一个 TerminalStage
表示终止操作, TerminalStage 比 Stage 多了个 startAndGet
方法,该方法表示开始遍历数据源,并返回最终结果。 分为两步,将整个动作串一次执行( start()
),取结果( get()
)。
start 中首先将每个阶段的动作都包裹起来, 从终止操作的 TerminalStage 开始, 作为前一个阶段的下游依次往前包裹每个 Stage 即 wrapStage
和 每次需要重写的 wrapDownstream
方法。 然后调用包裹了所有操作的 stage 的 begin/accept/end 方法,执行整串动作。
遍历结束后,返回终止操作的结果 get()
即完成。
看下具体 filter 的实现:
public Flow<T> filter(Predicate<? super T> predicate) { Objects.requireNonNull(predicate); return new AbstractFlow<T, T>(this) { @Override Stage<T> wrapDownstream(Stage<T> nextStage) { return new Stage.AbstractChainedStage<T, T>(nextStage) { @Override public void begin(long size) { /*因为过滤后数量可能改变 所以是不确定个数*/ downstream.begin(Visitor.UNKNOWN_SIZE); } @Override public void accept(T element) { if (predicate.test(element)) { downstream.accept(element); } } }; } }; }
直接返回一个新的流,包裹下游的方法实现为, begin 方法通知下游元素个数不确定, accept 方法先过一遍 foreach 传入的测试条件,符合条件才往下游发送。 end 方法没有重写,由于该操作不是短路操作 canFinish 也没有重写。
终止操作,看一个 foreach:
@Override public void forEach(Consumer<? super T> action) { Objects.requireNonNull(action); // 遍历不需要返回内容所以是 Void terminal(new AbstractTerminal<T, Void>(this) { @Override public Void get() { return null; } @Override public void accept(T element) { action.accept(element); } }); }
foreach 处于操作串的最下游, accept 方法直接调用传入的 Consumer 对每个元素进行消费 (正是 foreach 的语义)。 该操作不需要返回内容,所以 get() 放回 null,
Flow.of(list) .filter(x -> x % 2 == 0) .foreach(System.out::pringln);
你也可以在 IDEA 中从 terminal
方法开始打断点, 代码中每个返回的抽象类都重写了 toString 方法, 有助于 debug 区分各个类。
代码地址: