转载

Java8 parallelStream流--线程数的控制

Java 8 并行流(parallel stream)采用共享线程池,对性能造成了严重影响。底层使用通用的 fork/join 池来实现,该池是所有并行流共享的。默认情况,fork/join 池会为每个处理器分配一个线程。

public static void main(String[] args) throws Exception {
 List<String> list = Arrays.asList("1", "2", "3", "4", "5");

 // 线程数默认和CPU个数一致
 list.parallelStream().forEach((number) -> {
 try {
 System.out.println(Thread.currentThread().getName());
 Thread.sleep(5000);
 } catch (InterruptedException e) { }
 });

 list.parallelStream().forEach((number) -> {
 try {
 System.out.println(Thread.currentThread().getName());
 Thread.sleep(5000);
 } catch (InterruptedException e) { }
 });
打印结果:

main
ForkJoinPool.commonPool-worker-2
ForkJoinPool.commonPool-worker-11
ForkJoinPool.commonPool-worker-9
ForkJoinPool.commonPool-worker-4
缺点:
  • 线程数量和CPU核数一致
  • 需要等待线程全部执行完毕,主线程(调用线程)才继续往下执行
  • 共享线程池,不能对不同任务分别设定线程池大小(其实真实业务也不需要)
一种改进方案:
public static void main(String[] args) throws Exception {
 List<String> list = Arrays.asList("1", "2", "3", "4", "5");

 ForkJoinPool forkJoinPool = new ForkJoinPool(3);
 forkJoinPool.submit(() -> {
 list.parallelStream().forEach((number) -> {
 try {
 System.out.println(Thread.currentThread().getName());
 Thread.sleep(5000);
 } catch (InterruptedException e) { }
 });
 });

 ForkJoinPool forkJoinPool2 = new ForkJoinPool(3);
 ForkJoinTask<?> forkJoinTask = forkJoinPool2.submit(() -> {
 list.parallelStream().forEach((number) -> {
 try {
 System.out.println(Thread.currentThread().getName());
 Thread.sleep(5000);
 } catch (InterruptedException e) {
 }
 });
 });
 
 // 阻塞,等线程执行完毕
// forkJoinTask.get();

 // 阻止主线程关闭
 Thread.sleep(10000L);
 }
打印结果:

ForkJoinPool-1-worker-1
ForkJoinPool-2-worker-1
ForkJoinPool-1-worker-2
ForkJoinPool-2-worker-2
ForkJoinPool-2-worker-3
ForkJoinPool-1-worker-3
ForkJoinPool-1-worker-3
ForkJoinPool-2-worker-2
ForkJoinPool-2-worker-3
ForkJoinPool-1-worker-2
该方式中,主进程(调用线程)不会等parallelStream流执行完毕。如需等待,使用future.get()方法阻塞; 我们创建自己的线程池,所以(1)可以避免共享线程池,(2)可以分配比处理机数量更多的线程。

 
正文到此结束
Loading...