java8的并行Stream对开发者非常友好,非常容易实现并行计算。并行Stream底层使用ForkJoinTask实现Stream的并行处理,充分利用cpu的多核能力,Stream的API将底层复杂实现完全屏蔽了,开发者仅需调用一个方法即可实现并行计算,就是这么简单。
开启并行Stream只需在调用终止操作符之前调用 parallel()
方法即可开启并行计算。
@Test public void parallelStream(){ IntStream.range(0,100) .parallel() .forEach(e-> System.out.println(Thread.currentThread()+" "+e)); }
执行上述代码,可以看出代码将会交由ForkJoinPool中的线程执行。
这里使用分别使用并行Stream和穿行Stream计算0到100000000000之间的和
@Test public void testSum(){ //并行计算 long time= System.currentTimeMillis(); long sum1 = LongStream.rangeClosed(1,100000000000l).parallel().sum(); System.out.println(System.currentTimeMillis()-time); //串行计算 time = System.currentTimeMillis(); long sum2 = LongStream.rangeClosed(1,100000000000l).sum(); System.out.println(System.currentTimeMillis()-time); System.out.println("sum1 = "+sum1+" sum2 = "+sum2); Assert.assertTrue(sum1==sum2); }
执行结果如图,我的cpu是6核的,串行计算的时间差不多是并行计算的5倍;并行计算下充分利用了cpu的多核性能,如果数据量更大,两者之间的差距更大,并行Stream的优势更加明显。
sequential()
和 parallel()
默认情况下创建的stream都是串行的,parallel将串行流转成并行流,两者是互斥冲突的,sequential将并行流转成串行流,但并不意味者二者不可同时出现。
isParallel()
可以检测stream是否为并行stream。
@Test public void isParallel(){ IntStream stream = IntStream.range(0,100); stream.parallel(); Assert.assertTrue(stream.isParallel()); }
如代码所示,执行‘串行->并行->串行->并行’的转换,最终还是并行流。
@Test public void spspIsP(){ IntStream stream = IntStream.range(0,100); stream.parallel().map(e->e<<1).sequential().parallel(); Assert.assertTrue(stream.isParallel()); }
如代码所示,执行‘串行->并行->串行’的转换,最终还是串行流。
@Test public void spsIsS(){ IntStream stream = IntStream.range(0,100); stream.parallel().map(e->e<<1).sequential(); Assert.assertFalse(stream.isParallel()); }
上例子说明,流的串行和并行取决于最后调用的那个方法(sequential() or parallel())。
并行stream底层依赖于ForkJoinPool.commonPool线程池,这是一个jvm进程全局共享的线程,当这个线程池中执行了耗时操作,后面的任务将会堆积,造成性能问题;默认情况下这个线程池的大小为逻辑核数-1,当然你也可以通过jvm参数‘java.util.concurrent.ForkJoinPool.common.parallelism’来修改线程的池大小,这里并不建议使用这种方式,最好的方式是使用自定义的线程池。
如代码所示,创建4线程的ForkJoinPool线程池。
@Test public void CustomPool(){ ForkJoinPool forkJoinPool = new ForkJoinPool(4); forkJoinPool.submit(()->{ IntStream.range(0,100).parallel().forEach(e-> System.out.println(Thread.currentThread()+" "+e)); }).join(); }
代码执行结果如图,并行的stream在自定义的线程池中执行。
那为什么forkJoinPool.submit()就可以实现自定义线程池呢?查看源码得知,ForkJoinWorkerThread内部会持有ForkJoinPool的引用,在代码执行时,最终会调用doInvoke方法,通过Thread.currentThread可以知道是ForkJoinWorkerThread类型的线程,进而获取到ForkJoinPool,最终利用这个pool来执行计算。
/** * Implementation for invoke, quietlyInvoke. * * @return status upon completion */ private int doInvoke() { int s; Thread t; ForkJoinWorkerThread wt; return (s = doExec()) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (wt = (ForkJoinWorkerThread)t).pool. awaitJoin(wt.workQueue, this, 0L) : externalAwaitDone(); }
并行Stream给开发者带来了极大的便利,一行代码即可开启多线程并行计算,excellent!但是并行计算也要在一定条件下才能使用,比如任务之间不能有依赖、不能存在竞态条件等。