Java8中引入了函数式计算以及Lambda和Stream等特性,其中的流式计算引入了收集器、组合器等规约操作用到概念,非常值得我们好好学习。
本文以「规约」为线索,先从Stream的reduce方法说起,然后延展到collect方法,以及Collectors中的groupingBy等方法。
Stream
带有一个 reduce
方法,通过该方法我们可以实现 count
、 max
、 min
、 sum
等功能,非常强大。
规约操作可以将流的所有元素组合成一个结果,该方法有三种重载版本。
Optional
该方法无初始值,所以返回值使用 Optional
表示结果可能不存在。
使用 Java
代码来表述如下:
T result = null; if (a == null) return result; T identity = a[0]; result = identity; for (int i = 1; i < n; i++) { result = accumulator.apply(result, a[i]); } return result;
这里的 identity
其实就是第一个元素,整体计算次数为 n - 1
。计算的顺序为,a[0]与a[1]进行二合运算,结果与a[2]做二合运算,一直到最后与a[n-1]做二合运算。
T reduce(T identity, BinaryOperator
identity
:循环计算的初始值 accumulator
:计算的累加器,其方法签名为 apply(T t,U u)
,在该 reduce
方法中第一个参数 t
为上次该函数计算的返回值,第二个参数 u
为 Stream
中的元素,这个函数把这两个值计算 apply
,得到的和会被赋值给下次执行这个方法的第一个参数。
使用 Java
代码来表述如下:
T result = identity; for (int i = 0; i < n; i++) { result = accumulator.apply(result, a[i]); } return result;
注意区分与一个参数的 reduce
方法的不同:它多了一个初始化的值,因此计算的顺序是 identity
与a[0]进行二合运算,结果与a[1]再进行二合运算…,最终与a[n-1]进行二合运算,一共计算 n
次。
<U> U reduce(U identity, BiFunction<U,? super T,U> accumulator, BinaryOperator combiner)
乍一看与两个参数的 reduce
方法几乎一致,但是 accumulator
的类型变成了 BiFunction
而不是 BinaryOperator
,并且还多了一个 combiner
参数,而它的类型是第二个方法里 accumulator
参数的类型—— BinaryOperator
。
其实 BinaryOperator
接口是实现了 BiFunction
接口的,定义如下:
public interface BinaryOperator
也就是说 BiFunction
的三个参数类型可以是一样的也可以完全不同,而 BinaryOperator
直接限定了三个参数类型必须相同。
OK,我们都知道 accumulator
是实现「累加」操作的,那么 combiner
的作用是什么呢?
如果你用 串行流
的方式来调用这个方法的话你会发现 combiner
并没有被调用,所有计算都在 accumulator
中执行,并返回结果。如下面的实例代码:
Integer ageSum = persons .stream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s/n", sum, p); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s/n", sum1, sum2); return sum1 + sum2; }); // accumulator: sum=0; person=Max // accumulator: sum=18; person=Peter // accumulator: sum=41; person=Pamela // accumulator: sum=64; person=David
其实这个组合器是在 并行流
的执行方式时被调用的,因为累加器会被并行调用,所以需要组合器用于计算各个子累加值的总和。
该方法是负责处理可变式规约—— Mutable Reduction
的, JavaDoc
中该定义解释如下:
A mutable reduction operation accumulates input elements into a mutable result container,
such as a Collection or StringBuilder, as it processes the elements in the stream.
If we wanted to take a stream of strings and concatenate them into a single long string, we
could achieve this with ordinary reduction:
String concatenated = strings.reduce("", String::concat)
We would get the desired result, and it would even work in parallel. However, we might not be
happy about the performance! Such an implementation would do a great deal of string copying, and
the run time would be O(n^2) in the number of characters. A more performant approach would be
to accumulate the results into a StringBuilder, which is a mutable container for accumulating
strings. We can use the same technique to parallelize mutable reduction as we do with ordinary reduction.
里面提到了一种实现拼接字符串的方法,它采用了每次创建一个新的字符串的方式(String::concat方法每次生成一个新的字符串),这样非常浪费内存空间。而 collect
方法本质上是变更了数据的状态,而不是对它进行了替换,这也就是为什么要求存储变量的容器是 a mutable result container
。
用伪代码标识上述规约过程如下,注意 result
是一个状态可变的对象,即 mutable
的:
R result = supplier.get(); for (T element : this stream) accumulator.accept(result, element); return result;
它的三参数方法声明如下:
<R> R collect(Supplier
所以对于字符串拼接来说,可以用下面的代码实现:
String concat = stringStream.collect(StringBuilder::new, StringBuilder::append, StringBuilder::append).toString();
再举一个例子:
Stream<Integer> stream = Stream.of(1, 2, 3, 4).filter(p -> p > 2); List<Integer> result = stream.collect(() -> new ArrayList<>(), (list, item) -> list.add(item), (one, two) -> one.addAll(two)); /* 或者使用方法引用 */ result = stream.collect(ArrayList::new, List::add, List::addAll);
这个例子即为过滤大于2的元素,将剩余结果收集到一个新的list中。
supplier accumulator combiner
该方法的参数较多,代码有点繁琐,其实 collect
还有另外一个重载方法:
<R, A> R collect(Collector<? super T, A, R> collector)
里面只有一个参数 Collector
,它是一个接口,看过它的定义就知道它暴露了几个方法,其中最重要的几个声明如下:
/** * A function that creates and returns a new mutable result container. * * @return a function which returns a new, mutable result container */ Supplier<A> supplier(); /** * A function that folds a value into a mutable result container. * * @return a function which folds a value into a mutable result container */ BiConsumer<A, T> accumulator(); /** * A function that accepts two partial results and merges them. The * combiner function may fold state from one argument into the other and * return that, or may return a new result container. * * @return a function which combines two partial results into a combined * result */ BinaryOperator<A> combiner();
可以看到其实它的内部三个方法跟 collect
三个参数版本的中的三个参数: supplier
、 accumulator
、 combiner
是一一对应的。
那么上面代码就可以变成下面这种写法:
List
该方法默认返回一个 Collector
,让我们来看一下 toList
方法是如何实现这个接口的:
public static <T> Collector<T, ?, List<T>> toList() { return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add, (left, right) -> { left.addAll(right); return left; }, CH_ID); }
supplier accumulator combiner
对具有相同特性的值进行分组是一个很常见的任务,Collectors提供了一个 groupingBy
方法,方法签名为:
<T,K,A,D> Collector<T,?,Map<K,D» groupingBy(Function<? super T,? extends K> classifier, Collector<? super T,A,D> downstream)
classifier
:分类器是一个 Function
,负责将数据按照某一规则进行分类并生成对应的 Key
downstream
:下行流收集器是一个 Collector
,对分类后的数据集进行收集 reduce
操作 假如要根据年龄来分组:
Map<Integer, List
假如我想要根据年龄分组,年龄对应的键值List存储的为 Person
的姓名:
Map<Integer, List
mapping
方法即为对各 组
进行 投影
操作,和 Stream
的 map
方法基本一致。
注意到方法的返回值其实也是一个 Collector
,也就是 groupingBy
的第二个参数一样,这说明我们可以使用多级的分组:
Map<String, Map<String, List<Person>>> peopleByStateAndCity = personStream.collect(groupingBy(Person::getState, groupingBy(Person::getCity)));
假如要根据姓名分组,获取每个姓名下人的年龄总和:
Map<String, Integer> sumAgeByName = people.stream().collect(groupingBy(p -> p.name, reducing(0, (Person p) -> p.age, Integer::sum))); /* 或者使用summingInt方法 */ sumAgeByName = people.stream().collect(groupingBy(p -> p.name, summingInt((Person p) -> p.age)));
这里的 reducing
方法是 Collectors
类的,和我们上面介绍的 Stream
的 reduce
方法在三个参数定义上有些不同:
Collector<T, ?, U> reducing(U identity, Function<? super T, ? extends U> mapper, BinaryOperator op) {
identity
:循环计算的初始值 mapper
:类型转换器,将参数T转换为U类型,当然这两个可以一样 op
:用于做 reduce
操作的 BinaryOperator
变量
这个方法在「多维度」的规约计算时很有用,比如 groupingBy
或 partitionBy
,在对一般的 Stream
进行规约时,直接用它自带的 reduce
方法即可,以下摘自 JavaDoc
:
The reducing() collectors are most useful when used in a multi-level reduction, downstream of groupingBy or partitioningBy.
To perform a simple reduction on a stream, use Stream.reduce(BinaryOperator) instead.
For example, given a stream of Person, to calculate tallest person in each city:
Comparator<Person> byHeight = Comparator.comparing(Person::getHeight); Map<City, Person> tallestByCity = people.stream().collect(groupingBy(Person::getCity, reducing(BinaryOperator.maxBy(byHeight))));
本文首次发布于ElseF’s Blog, 作者 @stuartlau , 转载请保留原文链接.
Previous
数学计算中的二分