Java 8 并行流(parallel stream)采用共享线程池,对性能造成了严重影响。底层使用通用的 fork/join 池来实现,该池是所有并行流共享的。默认情况,fork/join 池会为每个处理器分配一个线程。
public static void main(String[] args) throws Exception {
List<String> list = Arrays.asList("1", "2", "3", "4", "5");
// 线程数默认和CPU个数一致
list.parallelStream().forEach((number) -> {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(5000);
} catch (InterruptedException e) { }
});
list.parallelStream().forEach((number) -> {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(5000);
} catch (InterruptedException e) { }
});
打印结果:
main
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-11
ForkJoinPool.commonPool-worker-9
ForkJoinPool.commonPool-worker-4
缺点:
public static void main(String[] args) throws Exception {
List<String> list = Arrays.asList("1", "2", "3", "4", "5");
ForkJoinPool forkJoinPool = new ForkJoinPool(3);
forkJoinPool.submit(() -> {
list.parallelStream().forEach((number) -> {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(5000);
} catch (InterruptedException e) { }
});
});
ForkJoinPool forkJoinPool2 = new ForkJoinPool(3);
ForkJoinTask<?> forkJoinTask = forkJoinPool2.submit(() -> {
list.parallelStream().forEach((number) -> {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(5000);
} catch (InterruptedException e) {
}
});
});
// 阻塞,等线程执行完毕
// forkJoinTask.get();
// 阻止主线程关闭
Thread.sleep(10000L);
}
打印结果:
ForkJoinPool-1-worker-1
ForkJoinPool-2-worker-1
ForkJoinPool-1-worker-2
ForkJoinPool-2-worker-2
ForkJoinPool-2-worker-3
ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-3
ForkJoinPool-2-worker-2
ForkJoinPool-2-worker-3
ForkJoinPool-1-worker-2
该方式中,主进程(调用线程)不会等parallelStream流执行完毕。如需等待,使用future.get()方法阻塞;
我们创建自己的线程池,所以(1)可以避免共享线程池,(2)可以分配比处理机数量更多的线程。