并行流就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。
Java 8 中将并行进行了优化,我们可以很容易的对数据进行并行操作。Stream API可以声明性地通过 parallel()与sequential()在并行流与顺序流之间进行切换。
Fork/Join 框架,采用 “工作窃取”模式(work-stealing): 当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线 程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。
相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的 处理方式上.在一般的线程池中,如果一个线程正在执行的任务由于某些原因 无法继续运行,那么该线程会处于等待状态.而在fork/join框架实现中,如果 某个子问题由于等待另外一个子问题的完成而无法继续运行.那么处理该子 问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了线程的等待时间,提高了性能.
Fork/Join计算数值累加
/** * @author black猫 * @date 2019-11-26 * fork join 把大的任务拆分成小任务,再合并,利用线程,充分利用 多核cpu */ public class ForkJoinCalculate extends RecursiveTask<Long> { /*** * 开始值 */ private long start; /*** * 结束值 */ private long end; /*** * 临界值 大于这个值 就要拆分成多个小任务 */ private static final long THRESHOLD = 100000; /*** * 构造函数 * @param start 起始值 * @param end 结束值 */ public ForkJoinCalculate(long start, long end) { this.start = start; this.end = end; } /*** * main函数 测试 * @param args */ public static void main(String[] args) { // 0-1亿的累加 ForkJoinCalculate forkJoinCalculate = new ForkJoinCalculate(0, 1000000000); // fork join线程池 ForkJoinPool pool = new ForkJoinPool(); Long sum = pool.invoke(forkJoinCalculate); // 输出结果 System.out.println(sum); } /** * 计算数值的累加和 * 返回完成后的结果 * * @return the result of the computation */ @Override protected Long compute() { long length = end - start; if (length < THRESHOLD) { long sum = 0; for (long i = start; i <= end; i++) { sum += i; } return sum; } else { // 中间数 long middle = (start + end) / 2; // 左边 ForkJoinCalculate left = new ForkJoinCalculate(start, middle); // 子任务 left.fork(); // 右边 ForkJoinCalculate right = new ForkJoinCalculate(middle + 1, end); // 子任务 right.fork(); // 结果合起来 return left.join() + right.join(); } } }
对比for循环/ForkJoin框架/Stream顺序流和并行流,计算数值累加和时间
/** * @author black猫 * @date 2019-11-26 */ public class Stream04Test { long maxNum = 100000000000L; /*** * 传统的 for循环:计算 maxNum的累加和 */ @Test void test01() { long startTime = System.currentTimeMillis(); long sum = 0L; for (int i = 0; i < maxNum; i++) { sum += i; } System.out.println(sum); long endTime = System.currentTimeMillis(); System.out.println("time:" + (endTime - startTime) + "ms"); } /*** * fork join 框架:计算 maxNum的累加和 */ @Test void test02() { long startTime = System.currentTimeMillis(); //fork join 池 ForkJoinPool pool = new ForkJoinPool(); ForkJoinCalculate forkJoinCalculate = new ForkJoinCalculate(0, maxNum); Long sum = pool.invoke(forkJoinCalculate); System.out.println(sum); long endTime = System.currentTimeMillis(); System.out.println("time:" + (endTime - startTime) + "ms"); } /*** * stream顺序流:计算 maxNum的累加和 */ @Test void test03() { long startTime = System.currentTimeMillis(); Long optional = LongStream.rangeClosed(0L, maxNum).sequential().reduce(0, Long::sum); long endTime = System.currentTimeMillis(); System.out.println(optional); System.out.println("time:" + (endTime - startTime) + "ms"); } /*** * stream并行流:计算 maxNum的累加和 */ @Test public void test04() { long startTime = System.currentTimeMillis(); Long optional = LongStream.rangeClosed(0L, maxNum).parallel().reduce(0, Long::sum); long endTime = System.currentTimeMillis(); System.out.println(optional); System.out.println("time:" + (endTime - startTime) + "ms"); } }
类型/计算累加数值 | 一千万 | 一亿 | 十亿 | 一百亿 | 一千亿 |
---|---|---|---|---|---|
Test01(for循环) | 8ms | 50ms | 462ms | ||
Test02(fork join框架) | 20ms | 58ms | 249ms | 2189ms | 17887ms |
Test03(stream顺序流) | 85ms | 119ms | 693ms | 5370ms | 52361ms |
Test03(stream并行流) | 22ms | 35ms | 193ms | 1508ms | 14776ms |
Stream并行流在计算多任务中,当线程的上下文切换的时间小于线程运行时间时,会充分提高cpu多核的利用率,效果更好。