// 递归任务 @AllArgsConstructor class Fibonacci extends RecursiveTask<Integer> { private final int n; @Override protected Integer compute() { if (n <= 1) { return n; } // 创建子任务 Fibonacci f1 = new Fibonacci(n - 1); f1.fork(); Fibonacci f2 = new Fibonacci(n - 2); f2.fork(); // 等待子任务结果并合并 return f1.join() + f2.join(); } } // 创建分治任务线程池 ForkJoinPool pool = new ForkJoinPool(4); // 创建分治任务 Fibonacci fibonacci = new Fibonacci(30); // 启动分治任务 System.out.println(pool.invoke(fibonacci)); // 832040
@AllArgsConstructor class MapReduce extends RecursiveTask<Map<String, Long>> { private String[] fc; private int start; private int end; @Override protected Map<String, Long> compute() { if (end - start == 1) { return calc(fc[start]); } else { int mid = (start + end) / 2; // 前半部分数据fork一个递归任务 MapReduce mr1 = new MapReduce(fc, start, mid); mr1.fork(); // 后半部分数据在当前任务中递归处理 MapReduce mr2 = new MapReduce(fc, mid, end); // 计算子任务,返回合并的结果 return merge(mr2.compute(), mr1.join()); } } // 统计单词数量 private Map<String, Long> calc(String line) { Map<String, Long> result = new HashMap<>(); String[] words = line.split("//s+"); for (String word : words) { if (result.containsKey(word)) { result.put(word, result.get(word) + 1); } else { result.put(word, 1L); } } return result; } // 合并结果 private Map<String, Long> merge(Map<String, Long> r1, Map<String, Long> r2) { Map<String, Long> result = new HashMap<>(r1); r2.forEach((word, count) -> { if (result.containsKey(word)) { result.put(word, result.get(word) + count); } else { result.put(word, count); } }); return result; } } String[] fc = {"hello world", "hello me", "hello fork", "hello join", "fork join in world"}; ForkJoinPool pool = new ForkJoinPool(3); MapReduce mapReduce = new MapReduce(fc, 0, fc.length); Map<String, Long> result = pool.invoke(mapReduce); result.forEach((word, count) -> System.out.println(word + " : " + count));
转载请注明出处:http://zhongmingmao.me/2019/05/17/java-concurrent-fork-join/
访问原文「 Java并发 -- Fork + Join 」获取最佳阅读体验并参与讨论