// java.util.Collection default Stream<E> stream() { return StreamSupport.stream(spliterator(), false); } default Stream<E> parallelStream() { return StreamSupport.stream(spliterator(), true); }
@Data class Student { private Integer height; private String sex; } Map<String, List<Student>> map = Maps.newHashMap(); List<Student> list = Lists.newArrayList(); // 传统的迭代方式 for (Student student : list) { if (student.getHeight() > 160) { String sex = student.getSex(); if (!map.containsKey(sex)) { map.put(sex, Lists.newArrayList()); } map.get(sex).add(student); } } // Stream API,串行实现 map = list.stream().filter((Student s) -> s.getHeight() > 160).collect(Collectors.groupingBy(Student::getSex)); // Stream API,并行实现 map = list.parallelStream().filter((Student s) -> s.getHeight() > 160).collect(Collectors.groupingBy(Student::getSex));
List<String> names = Arrays.asList("张三", "李四", "王老五", "李三", "刘老四", "王小二", "张四", "张五六七"); String maxLenStartWithZ = names.stream() .filter(name -> name.startsWith("张")) .mapToInt(String::length) .max() .toString();
names是ArrayList集合,names.stream会调用集合类基础接口Collection的stream方法
default Stream<E> stream() { return StreamSupport.stream(spliterator(), false); }
Collection.stream方法会调用StreamSupport.stream方法,方法中初始化了一个ReferencePipeline的Head内部类对象
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) { Objects.requireNonNull(spliterator); return new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel); }
调用filter和map,两者都是 无状态的中间操作 ,因此并没有执行任何操作,只是分别创建了一个 Stage 来 标识 用户的每一次操作
通常情况下,Stream的操作需要一个回调函数,所以一个 完整的Stage 是由 数据来源、操作、回调函数 组成的三元组表示
@Override public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) { Objects.requireNonNull(predicate); return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { return new Sink.ChainedReference<P_OUT, P_OUT>(sink) { @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(P_OUT u) { if (predicate.test(u)) downstream.accept(u); } }; } }; }
@Override @SuppressWarnings("unchecked") public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { Objects.requireNonNull(mapper); return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<P_OUT, R>(sink) { @Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } }; } }; }
new StatelessOp会调用父类AbstractPipeline的构造函数,该构造函数会将前后的Stage联系起来,生成一个 Stage链表
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { if (previousStage.linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; // 将当前的Stage的next指针指向之前的Stage this.previousStage = previousStage; // 赋值当前Stage当全局变量previousStage this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this.sourceStage = previousStage.sourceStage; if (opIsStateful()) sourceStage.sourceAnyStateful = true; this.depth = previousStage.depth + 1; }
创建Stage时,会包含opWrapSink方法,该方法把一个 操作的具体实现 封装在Sink类中,Sink采用 处理->转发 的模式来 叠加操作
调用max,会调用ReferencePipeline的max方法
由于max是 终结操作 ,会创建一个 TerminalOp操作 ,同时创建一个 ReducingSink ,并且将操作封装在Sink类中
@Override public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) { return reduce(BinaryOperator.maxBy(comparator)); }
最后调用AbstractPipeline的wrapSink方法,生成一个Sink链表, Sink链表中的每一个Sink都封装了一个操作的具体实现
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) { Objects.requireNonNull(sink); for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; }
当Sink链表生成完成后,Stream开始执行,通过Spliterator迭代集合,执行Sink链表中的具体操作
@Override final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { copyIntoWithCancel(wrappedSink, spliterator); } }
List<String> names = Arrays.asList("张三", "李四", "王老五", "李三", "刘老四", "王小二", "张四", "张五六七"); String maxLenStartWithZ = names.stream() .parallel() .filter(name -> name.startsWith("张")) .mapToInt(String::length) .max() .toString();
Stream的并行处理在执行终结操作之前,跟串行处理的实现是一样的,在调用终结方法之后,会调用TerminalOp.evaluateParallel
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); }