// 存在未对账订单 while (existUnreconciledOrders()) { // 查询未对账订单 pOrder = getPOrder(); // 查询派送订单 dOrder = getDOrder(); // 执行对账操作 Order diff = check(pOrder, dOrder); // 将差异写入差异库 save(diff); }
getPOrder()和getDOrder()最为耗时,并且两个操作没有先后顺序的依赖,可以 并行处理
// 存在未对账订单 // 存在未对账订单 while (existUnreconciledOrders()) { // 查询未对账订单 Thread t1 = new Thread(() -> { pOrder = getPOrder(); }); t1.start(); // 查询派送订单 Thread t2 = new Thread(() -> { dOrder = getDOrder(); }); t2.start(); // 等待t1和t2结束 t1.join(); t2.join(); // 执行对账操作 Order diff = check(pOrder, dOrder); // 将差异写入差异库 save(diff); }
while循环里每次都会创建新的线程,而创建线程是一个 耗时 的操作,可以考虑 线程池 来优化
Executor executor = Executors.newFixedThreadPool(2); // 存在未对账订单 while (existUnreconciledOrders()) { // 查询未对账订单 executor.execute(() -> { pOrder = getPOrder(); }); // 查询派送订单 executor.execute(() -> { dOrder = getDOrder(); }); // 采用线程池方案,线程根本就不会退出,join()已经失效 // 如何实现等待?? // 执行对账操作 Order diff = check(pOrder, dOrder); // 将差异写入差异库 save(diff); }
Executor executor = Executors.newFixedThreadPool(2); // 存在未对账订单 while (existUnreconciledOrders()) { // 计数器初始化为2 CountDownLatch latch = new CountDownLatch(2); // 查询未对账订单 executor.execute(() -> { pOrder = getPOrder(); latch.countDown(); }); // 查询派送订单 executor.execute(() -> { dOrder = getDOrder(); latch.countDown(); }); // 等待两个查询操作结束 latch.await(); // 执行对账操作 Order diff = check(pOrder, dOrder); // 将差异写入差异库 save(diff); }
// 订单队列 private Vector<Order> pos; // 派送单队列 private Vector<Order> dos; // 执行回调的线程池 private Executor executor = Executors.newFixedThreadPool(1); // 传入回调函数 private final CyclicBarrier barrier = new CyclicBarrier(2, () -> { executor.execute(this::check); }); // 回调函数 private void check() { Order p = pos.remove(0); Order d = dos.remove(0); // 执行对账操作 Order diff = check(p, d); // 差异写入差异库 save(diff); } // 两个查询操作 private void getOrders() { Thread t1 = new Thread(() -> { // 循环查询订单库 while (existUnreconciledOrders()) { pos.add(getDOrder()); try { // 等待 barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }); t1.start(); Thread t2 = new Thread(() -> { // 循环查询派单库 while (existUnreconciledOrders()) { dos.add(getDOrder()); try { // 等待 barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }); t2.start(); }
转载请注明出处:http://zhongmingmao.me/2019/05/11/java-concurrent-countdown-latch-cyclic-barrier/
访问原文「 Java并发 -- CountDownLatch + CyclicBarrier 」获取最佳阅读体验并参与讨论