// 从3个电商询价,保存到数据库,串行执行,性能很慢 int p1 = getPriceByS1(); save(p1); int p2 = getPriceByS2(); save(p2); int p3 = getPriceByS3(); save(p3);
ExecutorService pool = Executors.newFixedThreadPool(3); Future<Integer> f1 = pool.submit(() -> getPriceByS1()); Future<Integer> f2 = pool.submit(() -> getPriceByS2()); Future<Integer> f3 = pool.submit(() -> getPriceByS3()); int p1 = f1.get(); // 阻塞,如果f2.get()很快,但f1.get()很慢,依旧需要等待 pool.execute(() -> save(p1)); int p2 = f2.get(); pool.execute(() -> save(p2)); int p3 = f3.get(); pool.execute(() -> save(p3));
ExecutorService pool = Executors.newFixedThreadPool(3); Future<Integer> f1 = pool.submit(() -> getPriceByS1()); Future<Integer> f2 = pool.submit(() -> getPriceByS2()); Future<Integer> f3 = pool.submit(() -> getPriceByS3()); BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(); pool.execute(() -> { try { // 先执行完先入队 queue.put(f1.get()); } catch (Exception ignored) { } }); pool.execute(() -> { try { queue.put(f2.get()); } catch (Exception ignored) { } }); pool.execute(() -> { try { queue.put(f3.get()); } catch (Exception ignored) { } }); for (int i = 0; i < 3; i++) { int price = queue.take(); pool.execute(() -> save(price)); }
// 默认使用无界的LinkedBlockingQueue public ExecutorCompletionService(Executor executor); public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue);
ExecutorService pool = Executors.newFixedThreadPool(3); CompletionService service = new ExecutorCompletionService(pool); // 异步执行 pool.submit(() -> getPriceByS1()); pool.submit(() -> getPriceByS2()); pool.submit(() -> getPriceByS3()); for (int i = 0; i < 3; i++) { // take会阻塞线程,先执行完先消费 Object price = service.take().get(); pool.execute(() -> { try { save((Integer) price); } catch (Exception ignored) { } }); }
// 如果阻塞队列为空,线程阻塞 public Future<V> take() throws InterruptedException; // 如果阻塞队列为空,返回null public Future<V> poll(); // 等待一段时间,阻塞队列依然为空,返回null public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
支持 并行 地调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回了,利用CompletionService可以实现
ExecutorService pool = Executors.newFixedThreadPool(3); CompletionService<Integer> service = new ExecutorCompletionService<>(pool); List<Future<Integer>> futures = new ArrayList<>(3); futures.add(service.submit(() -> geoCoderByS1())); futures.add(service.submit(() -> geoCoderByS2())); futures.add(service.submit(() -> geoCoderByS3())); try { // 获取第一个返回(take会阻塞) Integer price = service.take().get(); } catch (Exception ignored) { // 取消所有任务 for (Future<Integer> future : futures) { future.cancel(true); } }
转载请注明出处:http://zhongmingmao.me/2019/05/16/java-concurrent-completion-service/
访问原文「 Java并发 -- CompletionService 」获取最佳阅读体验并参与讨论