本文是由笔者所原创的 《Java Stream Pipeline 流水线系列》 文章之一,是继 《深入 Java Lambda 系列》 的后续系列文章;本文将会重点就 Java 流水线 Stream 的相关特性进行描述;
本文为作者原创作品,转载请注明出处;
本文将重点提炼 State of the Lambda - Libraries Edition: http://cr.openjdk.java.net/~briangoetz/lambda/sotc3.html 中的官网内容,概要其核心内容;笔者并不会逐字逐句的翻译,而是将自己所理解的内容进行整理和总结。
使用内部迭代的方式是 Stream 既流式计算的一个设计理念,通过内部迭代器的方式,将流式计算的大量复杂的流程进行了封装;与内部迭代器( internal iteration )相对应的自然就是外部迭代器( external iteration ),外部迭代器的优势自然是给了用户极大的创作空间,但是相应的弊端也非常的明显,那就是太过于随意;实际上,笔者认为,在流式计算兴起以前,外部的或者是内部的,其实都无所谓,因为没有那么大量的数据,所以怎么迭代都好;但是,在流式计算兴起以后,特别是像具有互联网这种特性的数据处理过程,往往都是源源不断,永无止境的数据流入并需要处理,当流式计算在互联网中显得格外重要的时候,必然需要对流的处理的方式进行更好的封装便于方法和逻辑的重用,而流的处理是建立在流的迭代方式上的,自然就需要对流的迭代方式进行封装,也就自然需要一个所谓的“内部迭代器”;只有这样,我们才能够对流的处理进行封装,使得其功能和逻辑得以重用;一个典型的例子便是 Parallel Stream,通过“内部迭代器”使用 Fork and Join 架构实现并行处理,如果仍然是使用“外部迭代器”的方式自然就没有办法对 Fork and Join 架构进行统一的封装并对相应方法进行重构了。
最后来看一个“外部迭代器”和“内部迭代器”的例子,
“外部迭代器”
for (Shape s : shapes) { if( s.getColor() == BLUE ) s.setColor(RED); } }
“内部迭代器”
shapes.stream() .filter(s -> s.getColor() == BLUE) .forEach(s -> { s.setColor(RED); });
可见,内部迭代器通过一些标准的接口比如 filter 和 forEach 将外部迭代器的外部循环逻辑给替代了,这样,一来便于用户只需要关注自己的核心部分代码实现,而来便于 Stream 在架构层面进行重构。
对于流式计算,可以采取两种方式来实现,一种是 Eagerly(殷切的),一种是 Lazniess(懒惰的);笔者使用这样一个例子来让大家迅速明白这两者这件的区别,假设我们有这样一个需求,有这样一组队列,
Arrays.asList("abc", "bdfafda", "ab", "abdace");
计算过程是,先过滤 filter ,筛选出首字母为 “a” 的字符串,然后统计各个字符串的长度,最后返回最大的长度值;下面来看看 Eagerly 和 Lazniess 设计实现上的区别,
Lazniess
当遇到 filter,统计字符长度等操作的时候,它都不会开始计算;只有当遇到 结束操作
,比如统计最大值 max 方法后,才会开始计算;所以,它的中间计算过程既 filter 和 mapToInt 都是懒惰的,而只有 结束操作
max 是 Eagerly(殷切的);比如,我们将上述的步骤用流表述为,
strings.stream() .filter(s -> s.startsWith("a")) .mapToInt(String::length) .max();
也就是说,只有当执行到 max() 方法以后,才会开始执行 filter()、mapToInt() 直到最后的结束操作 max();并且注意,整个执行过程中,并不是等 filter 执行完以后,才开始执行 mapToInt,等 mapToInt 执行完成以后,才开始执行 max,而是一个元素会执行完整个流程,比如说元素 “abc”,它会先执行 filter,满足需求,filter 直接将 “abc” 发送给 mapToInt 处理,mapToInt 计算得到长度为 3,然后将 3 直接发送给 max,等这一系列的动作完成以后,才会执行下一个元素 “bdfafda”。
所以这里可以总结得出,Java Stream 包含惰性方法和殷切的方法,只是要注意的是所有的 结束操作
都是都是殷切的。
上一小节中提到了触发整个 Stream 开始执行的相关操作,当中提到了 max 方法;该方法是一个常规的 结束方法
,该结束方法将会触发 Stream 开始执行,这个过程也就是所谓的 惰性
;不过有一些特殊的结束方法,在 Stream 的执行过程中,还没有处理完所有元素的情况下,便可以终止流式计算,这种特殊的结束方法被称作为“Short-Circuiting”既是 短路操作
;常用的 findFirst、anyMatch 等等,比如
Optional<Shape> firstBlue = shapes.stream() .filter(s -> s.getColor() == BLUE) .findFirst();
上面这个例子,只要找到第一个元素即可立即返回并直接终止当前的流计算。
这里谈谈 side-effect 既是副作用,在官网中很多地方出现了这个词,其实很多网络博文对此理解不准确,这也难怪,因为官网对此概念一直是模模糊糊的;其实对 side-effect 正确的理解是,允许出现副作用的地方,因此在官方文档中有这么一个概念 side-effect-producing operations 就是说可以产生副作用的操作集,典型的就是 forEach(action) 方法,在这个方法中,允许产生副作用,比如修改源数据中的某些元素,或者自定义将元素保存到用户自定的队列中;看看下面这个用例,
List<Shape> list = new ArrayList<Shape>(); Collections.addAll(list, new Shape("RED"), new Shape("BLUE"), new Shape("BLUE")); list.stream() .filter(s -> s.getColor().equals("BLUE")) .forEach( s -> { s.setColor("GRAY"); }); list.forEach( s -> { System.out.println(s.getColor()); } );
逻辑很简单,就是将 BLUE Shape 改为 GRAY ;输出结果,
RED GRAY GRAY
可见 forEach(action) 是允许发生副作用的地方;那么将这个例子稍微改动一下,我们便可以将结果保存到用户自定义的队列 storage 中;
List<Shape> list = new ArrayList<Shape>(); Collections.addAll(list, new Shape("RED"), new Shape("BLUE"), new Shape("BLUE")); List<Shape> storage = new ArrayList<Shape>(); list.stream() .filter(s -> s.getColor().equals("BLUE")) .forEach( s -> { s.setColor("GRAY"); storage.add(s); }); storage.forEach( s -> { System.out.println(s.getColor()); } );
输出结果,
GRAY GRAY
Collectors 故名思议,既是 收集器 ;
在前面的大量的用例中,我们已经看到通过使用方法 collect() 将流式处理的结果输出到一个 List 或者 Set 中;collect() 方法的参数是 Collector 接口对象,通过构造该接口对象解耦了 List、Set、Map 之间的异构性; Collectors 工具类为一些常用的 collectors 提供了构造工厂类,比如 Collectors.toList()、Collectors.toSet();前面的介绍中,我们已经看到了大量的有关 list 和 set 相关的用例,稍微复杂一点的是 toMap(),
toMap()
下面让我们来看一个 toMap 的例子,比如我们想要根据 catalog number 为所有的专辑创建一个索引,可以用如下的方式,
Map<Integer, Album> albumsByCatalogNumber = albums.stream() .collect(Collectors.toMap(a -> a.getCatalogNumber(), a -> a));
这样,将会返回一个以 catalog number 为 Key,album 为 Value 的 Map;
groupingBy()
与 toMap 相关的是 groupingBy;比如我们有这样一个需求,就是让专辑中的每一首评分高于 4 分的歌按照其作者(artist)进行归类,我们可以用如下的方式,
Map<Artist, List<Track>> favsByArtist = tracks.stream() .filter(t -> t.rating >= 4) .collect(Collectors.groupingBy(t -> t.artist));
这样我们得到了一个 Map,其中的 Track(歌曲)按照其作者进行了分类存储;可以看到,上述默认将 Track 构造为了 List,但是往往 Track 会有重复的情况出现,如果是这样,我们可以使用如下的方式过滤掉重复的 Tracks,
Map<Artist, Set<Track>> favsByArtist = tracks.stream() .filter(t -> t.rating >= 4) .collect(Collectors.groupingBy(t -> t.artist, Collectors.toSet()));
通过使用 toSet() 将结果保存到 Set 中,通过这样的方式将重复的 tracks 过滤掉;如果我们有这样一个更为复杂的需求,我们想要知道同一个 Artist 的所有 Tracks 同时对所有相同评分的 Tracks 进行分组,那么这个时候,我们可以通过多层的 groupingBy() 来构建一个双层的 Map 来实现,
Map<Artist, Map<Integer, List<Track>>> byArtistAndRating = tracks.stream() .collect(groupingBy(t -> t.artist, groupingBy(t -> t.rating)));
最后,来看一个更好玩的,统计歌曲名( Track Name )中所使用单词的频率,并用降序进行排列,
Pattern pattern = Pattern.compile("//s+"); Map<String, Integer> wordFreq = tracks.stream() .flatMap(t -> pattern.splitAsStream(t.name)) // Stream<String> .collect(groupingBy(s -> s.toUpperCase(), counting()));
要注意 flatMap 的用法,它接受一个返回结果为 Stream R 的 lambda 表达式作为参数,
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
并且将 R 作为流传递给 downstream ,参考下面的源码;
ReferencePipeline.java
@Override public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) { Objects.requireNonNull(mapper); // We can do better than this, by polling cancellationRequested when stream is infinite return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<P_OUT, R>(sink) { @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(P_OUT u) { try (Stream<? extends R> result = mapper.apply(u)) { // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it if (result != null) result.sequential().forEach(downstream); } } }; } }; }
注意上述代码第 20 行,逐个迭代 R Stream 中的元素并交给 downstream 既是下游 Sink 进行处理,由此可知, flatMap 的作用就是将输入 T 转换成另外一个 Stream R ,然后进行流式计算,回到当前的这个例子中便是,将 Track Name 转换成一个一个的 单词流 ,然后进行后续的处理,后续交给 groupBy() 进行处理,根据不同的单词进行分组并计算 count;
While the use of internal iteration makes it possible that operations be done in parallel, we do not wish to inflict any sort of “transparent parallelism” on the user. Instead, users should be able to select parallelism in an explicit but unobtrusive manner.
上述这段话描述了 Stream 并行处理的设计初衷,作者并不想让用户对并行处理是完全透明的,相反,用户可以手动的选择是否要使用并行处理;通过如下的方式,
int sum = shapes.parallel() .filter(s -> s.getColor() == BLUE) .map(s -> s.getWeight()) .sum();
这样,通过调用 Stream.parallel() 方法便会得到一个具有并发处理特性的 Stream 对象;上述代码中,shapes 表示的是一个 Stream 对象。
The steps involved in implementing parallel computations with Fork/Join are: dividing a problem into subproblems, solving the subproblems sequentially, and combining the results of subproblems. The Fork/Join machinery is designed to automate this process.
上面这段话描述了 Parllel Stream 的实现原理,将一个大的问题拆分成小的问题,按顺序的解决小的问题,然后依次将子任务的结果合并(备注,这正是CountedCompleter 所能够保证的);新的 Fork/Join 机制被设计用来自动的实现该流程。原理性的东西不再照本宣科,不去逐字逐句的翻译官网上的内容;总体而言,如果要将大问题才分成小的问题,就必须利用一个新的工具,而这个工具正是 Spliterator 接口,有关该接口的详细内容笔者将在后续的博文中单独进行介绍,
public interface Spliterator<T> { // Element access boolean tryAdvance(Consumer<? super T> action); void forEachRemaining(Consumer<? super T> action); // Decomposition Spliterator<T> trySplit(); // Optional metadata long estimateSize(); int characteristics(); Comparator<? super T> getComparator(); }
首先,笔者要问的是什么是 Encounted Order?先来看官网上的一段描述,
Many data sources, such as lists, arrays, and I/O channels, have a natural encounter order , which means the order in which the elements appear has significance. Others, such as HashSet, have no defined encounter order
上面的描述,说明了 Encounted Order 的一些性质,主要是说明 lists, arrays 和 I/O channels 都有自然的 Encounted Order,其它的,比如 HashSet 就没有 Encounted Order;但是,仅凭上面的这段官网描述,我们依然不知道 Encounter Order 到底是什么呢?不过可以大概的嗅到它的味道;笔者经过反复思考,慎重的给出了它的定义,如下,
Encoutned Order 就是一种所见即所得的顺序;什么意思呢?就是说,从队列中取出元素的顺序既是你存放该元素时候的顺序;比如,
collection.add("a"); collection.add("b"); collection.add("c");
如果使用 collection 迭代器依次迭代元素的话,输出的元素的顺序也就是 “a”, “b”, “c”;满足这种特性的迭代顺序,就称作 Encounted Order;
所以,这里要重点说明的是,Parallel Stream 同样满足这样的特性;比如,
List<String> names = people.parallelStream() .map(Person::getName) .collect(toList());
上面所输出的名字,即便是采用并发处理的方式也同样会严格按照 people 队列中 person 所出现的 Encounted Order 所进行排列;笔者将会在后续的 Parallel Stream 源码分析的博文中详细的介绍,其实这种特性是由继承自 CountedCompleter 的 java.util.stream.AbstractTask 中的并发执行逻辑所能保证的。
备注,官网的该章节的内容的介绍与 Java SE 8 SDK 的真实实现有差异,该差异笔者将会在本章节结束的部分进行描述,由此可见,该部分内容应该仅仅是对于 Primitive streams 实现之前的构思 ;
我们期望通过如下的样例代码块对原始类型( int )进行流式计算,
List<String> strings = ... int sumOfLengths = strings.stream() .map(String::length) .reduce(0, Integer::plus);
通过这样的方式,能够不必考虑原始类型与引用类型之间的差异(备注,通过将原始类型自动封装成为引用类型后,这样便可以统一的使用引用类型来进行处理);但是这样做,却会引来性能上的问题,那就是频繁的在原始类型和引用类型之间进行封装和解封装,而这个性能的损耗在对性能要求极为苛刻的流式计算中是不能被接受的;所以这个看似完美的解决方案,其实并不可取。
所以留给我们的只有如下的两种解决方式,
这里作者认为上述的方案 #2 并不成熟,甚至于不能作为一个完整的解决方案;下面是作者针对方案 #1 的一些解决办法的描述,
List<String> strings = ... int sumOfLengths = strings.stream() .map(String::length) .sum();
使用上述的方式就需要能够使得 map(Function<T,U>) 能够被 map(IntMapper<T>) 的方式所重载,然后提供一些原始类型所特有的 Stream 对象,诸如 IntStream、DoubleStream 等等;所以,Stream 的接口类似下面这样,
interface Stream<T> { ... <U> Stream<U> map(Mapper<T,U>); IntStream map(IntMapper<T>); LongStream map(LongMapper<T>); DoubleStream map(DoubleMapper<T>); }
所以,从官网的这段描述中可以看到,作者的初衷是想使用方案 #1 通过对 map 方法进行重载的方式更为动态的实现 Primitive Stream;但是正如笔者在该章节开始所描述的那样,JDK 8 并没有按照 #1 的方案实施,而是使用的方案 #2,既是使用 fused-operation approach;从 Stream 的接口最终实现就可以看出,
Stream<T>.java
<R> Stream<R> map(Function<? super T, ? extends R> mapper); IntStream mapToInt(ToIntFunction<? super T> mapper); LongStream mapToLong(ToLongFunction<? super T> mapper); DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);
可见分别提供了对各种原始数字类型的 fused-operations 来处理原始类型的流式计算;所以,上述样例代码最终的实现方式如下所述,
List<String> strings = ... int sumOfLengths = strings.stream() .mapToInt(String::length) .sum();
注入代码第 3 行,使用的是方法 mapToInt 而非重载过后的 map ;笔者猜想,之所以方案 #1 最终并没有落地,估计是方案 #1 中有一个比较难以实现的前提,那就是需要使得 map(Function<T,U>) 方法能够被 map(IntMapper<T>) 所重载,而这种方式是 Java 的重载默认不提供的,因为毕竟参数类型有极大的不同,如果要强制实现,那么这里必然针对这个特例对 JVM 底层源码进行修改,且难度可想而知。
有些 Collections 中的元素允许 null 值,但有些不允许;我们有三种不同的处理 nulls 方式供 Stream 来实现,
Ignore
Ignore nulls in streams.
这里直译为直接忽略掉;但是其实真正的含义是,跳过 nulls,不处理它。
Stream 的设计者采用第三种方案;交由用户自己去如何处理 Nulls;比如,如果不允许 Nulls 发生,可以使用如下代码,
filter(e -> { (if e == null) throw new NPE(); } )
如果,想直接 Ingore,使用
filter(e -> e != null)
Because the stream source might be a mutable collection, there is the possibility for interference if the source is modified while it is being traversed. The stream operations are intended to be used while the underlying source is held constant for the duration of the operation.
上面描述了因为 Collection 中的元素往往是引用对象,因此该元素是允许被修改的(即便是在 Collection 对象上加上 final 限制);而 Stream 的相关操作是建立在将 collection 中的元素视为 常量
的基础之上的,所以 Stream 操作的限制就是不允许对源数组中的数据进行修改;因此引用对象本身可以被修改的这个特性与此相悖。
笔者补充,其实这个限制并不新鲜,Java 的 Iteratble 接口早就有这种限制了,比如 AbstractList 中的属性 modCount 那样,记录了当前 list 中的修改次数,然后通过使用 变量记录迭代器中 list 的修改次数,如果在 list 的迭代过程中,发现这两个值不相等,既 modCount ≠ expectedModCount,那么将会抛出 ConcurrentModificationException 异常,表示在迭代过程中,源 list 中的数据被更改过了,这时强制不允许的。同样的,这个限制也适用于流式计算既 Stream,从后续的内容中可以知道,Stream 的迭代器是 Spliterator,也就是说该限制最终落地到的是 Spliterator 对象上的。
Unlike a Collection, there is nothing about a stream that requires it to be of finite size. While certain operations on an infinite stream (such as forEach) would never terminate under normal conditions, there are many operations that can deal perfectly well with infinite streams (e.g., limit will truncate a stream; findFirst or findAny will terminate as soon as they find a match, etc.) Similarly, an infinite stream can be turned into an Iterator and iterated directly.
上述描述中,最重要的一段话是最后一句,既是可以将 Infinite Stream 转换成 Iterator 来进行处理。后续中我们可以看到,在处理 I/O 输入流的过程中,正式通过 I/O 的输入流转换成 Iterator 的方式来实现这种无限流的处理的,具体用例可以参考 BufferedReader.lines() : Stream 方法。
这部分主要描述的是 lambda 相关的内容,
Comparator.comparing()
public static <T, U extends Comparable<? super U>> Comparator<T> comparing( Function<? super T, ? extends U> keyExtractor) { return (c1, c2) -> keyExtractor.apply(c1).compareTo(keyExtractor.apply(c2)); }
该静态方法接收一个 Function 对象既 keyExtractor,通过 keyExtractor 进行 T -> U 映射,也就是说,最终比较的是 $U_1$ 和 $U_2$;方法内部很有意思,通过 lambda 表达式返回的是一个 Comparator Functional Interface ,并接收 _c1_ 和 _c2_ 作为参数进行比较;来看看相关的用例,
List<Person> people = ... people.sort(comparing(p -> p.getLastName()));
这个例子中,people 充当的就是 _T_ 同时也充当的是参数 _c_,而 p.getLastName() 充当的就是 _U_;上面的例子默认是按照字母升序的方式进行排列的,那么如果我们想要按照降序的方式进行排列呢?
people.sort(comparing(p -> p.getLastName()).reversed());
如果我们想要实现 order by Last Name then First Name 呢?
Comparator<Person> c = Comparator.comparing(p -> p.getLastName()) .thenComparing(p -> p.getFirstName()); people.sort(c);
从中我们知道,Stream 的操作对源数据有严格的限制,既是将源当做 Constant,不允许修改;但有时候,我们期望借助 Stream 的特性对源数据进行集中的修改;为了满足这个特性,一些新的方法加入了 Collection 、 List 、 Map 和 Iterable 接口中来利用 lambda 和 Stream 的特性;
找到蓝色的元素,并将其修改为红色;
shapes.stream() .filter(s -> s.getColor() == BLUE) .forEach(s -> { s.setColor(RED); });
shapes 是一个 Stream 对象。
过滤出 BLUE 元素,并将这些元素保存到新的队列 ArrayList 中;
List<Shape> blue = shapes.stream() .filter(s -> s.getColor() == BLUE) .collect(new ArrayList<>());
如果每一个 Shape 都包含在一个 Box 中,那么我们想统计哪些 Box 包含了 BLUE 元素
Set<Box> hasBlueShape = shapes.stream() .filter(s -> s.getColor() == BLUE) .map(s -> s.getContainingBox()) .collect(new HashSet<>());
统计所有蓝色形状物体的重量
int sum = shapes.stream() .filter(s -> s.getColor() == BLUE) .map(s -> s.getWeight()) .sum();
假设我们有这样一个需求,找到某个专辑中至少有一首歌的评分为 4 颗星以上的专辑的名字,结果通过名称进行升序排列;
传统的实现方式
List<Album> favs = new ArrayList<>(); for (Album a : albums) { boolean hasFavorite = false; for (Track t : a.tracks) { if (t.rating >= 4) { hasFavorite = true; break; } } if (hasFavorite) favs.add(a); } Collections.sort(favs, new Comparator<Album>() { public int compare(Album a1, Album a2) { return a1.name.compareTo(a2.name); }});
使用 Java Stream
List<Album> sortedFavs = albums.stream() .filter(a -> a.tracks.anyMatch(t -> (t.rating >= 4))) .sorted(comparing(a -> a.name)) .collect(new ArrayList<>());
将一个输入的元素 T 转换成一个输入流 Stream R 并交由下游系统进行处理;比如我们要统计某个单词出现的频率,就可以将每一首歌曲的名字通过 flatMap 转换成 R 并交由下游系统进行处理;
Pattern pattern = Pattern.compile("//s+"); Map<String, Integer> wordFreq = tracks.stream() .flatMap(t -> pattern.splitAsStream(t.name)) // Stream<String> .collect(groupingBy(s -> s.toUpperCase(), counting()));
该例子的详情参考章节;