断言 -> 输入T, 输出 boolean
消费一个输入 -> 输入T, 不输出(void)
生产一个输出 -> 不输入, 输出T
输入T, 输出R的函数
一元函数: 输入1个输出 1个: 类型都是T
二元函数: 输入2个输出1个: 类型都是T
输入两个输出一个: 输入 T, U 输出 R, 常用于 reduce/sort等操作,
返回还是流stream的操作, 就是中间操作, 例如 map操作
有前后顺序依赖关系的操作, 比如
distinct
(前后不能相同)
sorted
(前后顺序要求)
limit
/ skip
(截取前面的/跳过前面的)
无前后依赖关系的操作:
map
/
mapToInt
/
flatMap
/
flatMapToInt
filter
peek
返回一个结果的(求和/汇总)等, 例如 sum/max/min/avg等
findFirst
findAny
allMatch
anyMatch
noneMatch
forEach
forEachOrdered
并行流中保证顺序的
collect
toArray
reduce
输入: BinaryOperator<T>
min
max
count
如果没有终止操作, 中间的操作动作都不会实际执行, 这就是惰性求值;
collection.stream
collection.parallelStream
array.stream
Stream.of
IntStream.range
IntStream.rangeClosed
LongStream.range
LongStream.rangeClosed
Random.longs
Random.ints
Random.doubles
Stream.generate
Stream.iterate
ForkJoinPool.commonPool
如何自定义线程池, 并使用到流并行时?
collect(Collectors.toList())
IntSummaryStatistics ageSumStats = students.stream().collect(Collectors.summarizingInt(Student::getAge))
Map<Boolean, List<Student>> genders = students.stream().collect(Collectors.partitioningBy(s->s.getGender() == Gender.MALE))
Map<Grade, List<Student>> grades = students.stream().collect(Collectors.groupingBy(Student::getGrade))
Map<Grade, Long> gradeCount = students.stream().collect(Collectors.groupingBy(Student::getGrade, Collectors.counting()));
counting()可以替换为同类的max/min/avg等
所有操作链式调用, 1个元素只迭代一次;
每一个中间操作返回一个新的stream流, 流内有一个属性: sourceStage; 指向同一个地方: 即 链表的头 Head
stream接口的实现类
java.util.stream.AbstractPipeline
java.util.stream.ReferencePipeline
属性: sourceStage --> ReferencePipeline$Head
完全链式调用: s1.a操作完->s1.b
public void testStream() { Random r = new Random(); Stream.generate(() -> r.nextInt()).limit(500) .peek(s -> System.out.println("peek: " + s)) // 1 .filter(s -> { // 2 System.out.println("filter: " + s); return s > 1000000; }) .sorted((i1, i2) -> { // 3. System.out.println("排序:" + i1 + ", " + i2); return i1.compareTo(i2); }) .peek(s -> System.out.println("peek2:" + s)) //4. .count(); }
1,2 都是无状态操作; 3是有状态操作; 4是无状态操作
3的有状态操作会截断 1, 2的无状态操作;
本来是1, 2的链式调用: s1.a->s1.b->s1.c 现在会变成: s1.a() ->s2.a() -> s1.b() ->s2.b()
output: 无sorted: 可以看到 纯无状态操作是纯链式: peek->filter->peek2
peek: 2047843427 filter: 2047843427 peek2:2047843427 peek: -1210662664 filter: -1210662664 peek: 835825054 filter: 835825054 peek2:835825054 peek: 2068471207 filter: 2068471207 peek2:2068471207 peek: -1139851578 filter: -1139851578 peek: -885776051 filter: -885776051 peek: 481902862 filter: 481902862 peek2:481902862 peek: 684461691 filter: 684461691 peek2:684461691 peek: 1417449012 filter: 1417449012 peek2:1417449012 peek: -40633821 filter: -40633821
output: 有sorted: 可以看到后面的无状态操作peek2被有状态的sorted打断, 变成: peek->filter... peek->filter 排序all, peek2 all
peek: -607778068 filter: -607778068 peek: 50926402 filter: 50926402 peek: -774310924 filter: -774310924 peek: 342023904 filter: 342023904 peek: 26606322 filter: 26606322 peek: 693727663 filter: 693727663 peek: -334751306 filter: -334751306 peek: -960784614 filter: -960784614 peek: 522967780 filter: 522967780 peek: -2144851449 filter: -2144851449 排序:342023904, 50926402 排序:26606322, 342023904 排序:26606322, 342023904 排序:26606322, 50926402 排序:693727663, 50926402 排序:693727663, 342023904 排序:522967780, 342023904 排序:522967780, 693727663 peek2:26606322 peek2:50926402 peek2:342023904 peek2:522967780 peek2:693727663
1, 2还是链式调用; 中间的3执行完后, 隔断了4, 所以4不会和1,2承续链式调用, 而是单独执行;
在4后加5: parallel():
sorted()
有状态操作不会并行; 1,2,4都会使用 ForkJoinPool线程池并行执行:
@Test public void testStream() { Random r = new Random(); long count = Stream.generate(() -> r.nextInt()).limit(10) .peek(s -> System.out.println(Thread.currentThread().getName() + " peek: " + s)) // 1 .filter(s -> { // 2 System.out.println(Thread.currentThread().getName() + " filter: " + s); return s > 1000000; }) .sorted((i1, i2) -> { // 3. System.out.println(Thread.currentThread().getName() + " 排序:" + i1 + ", " + i2); return i1.compareTo(i2); }) .peek(s -> System.out.println(Thread.currentThread().getName() + " peek2:" + s)) //4. .parallel() .count(); System.out.println(count); }
ForkJoinPool.commonPool-worker-1 peek: -1644694686 ForkJoinPool.commonPool-worker-1 filter: -1644694686 ForkJoinPool.commonPool-worker-1 peek: 1524371421 ForkJoinPool.commonPool-worker-1 filter: 1524371421 ForkJoinPool.commonPool-worker-1 peek: -1937453784 ForkJoinPool.commonPool-worker-1 filter: -1937453784 ForkJoinPool.commonPool-worker-1 peek: 991114309 ForkJoinPool.commonPool-worker-1 filter: 991114309 ForkJoinPool.commonPool-worker-1 peek: -109655961 ForkJoinPool.commonPool-worker-1 filter: -109655961 ForkJoinPool.commonPool-worker-1 peek: 878490064 ForkJoinPool.commonPool-worker-1 filter: 878490064 ForkJoinPool.commonPool-worker-1 peek: 2031919515 ForkJoinPool.commonPool-worker-1 filter: 2031919515 ForkJoinPool.commonPool-worker-1 peek: -1855129379 ForkJoinPool.commonPool-worker-1 filter: -1855129379 ForkJoinPool.commonPool-worker-1 peek: 1897985020 ForkJoinPool.commonPool-worker-1 filter: 1897985020 ForkJoinPool.commonPool-worker-1 peek: 352116584 ForkJoinPool.commonPool-worker-1 filter: 352116584 main 排序:991114309, 1524371421 main 排序:878490064, 991114309 main 排序:2031919515, 878490064 main 排序:2031919515, 991114309 main 排序:2031919515, 1524371421 main 排序:1897985020, 1524371421 main 排序:1897985020, 2031919515 main 排序:352116584, 1524371421 main 排序:352116584, 991114309 main 排序:352116584, 878490064 ForkJoinPool.commonPool-worker-6 peek2:991114309 main peek2:1524371421 ForkJoinPool.commonPool-worker-3 peek2:352116584 ForkJoinPool.commonPool-worker-2 peek2:2031919515 ForkJoinPool.commonPool-worker-7 peek2:1897985020 ForkJoinPool.commonPool-worker-4 peek2:878490064 6
结论: 有状态的并行操作"不一定"并行
为啥是"不一定"?
因为官方并未明说, 但是观察下来, 是非并行的!