Java 1.5 有了 Future
, 可谓是跨了一大步,继而 Java 1.8 新加入一个 Future 的实现 CompletableFuture
, 从此线程与线程之间可以愉快的对话了。最初两个线程间的协调我采用过 Object 的 wait()
和 notify()
, Thread 的 join()
方法,那可算是很低级的 API 了,是否很多 Java 程序都不知道它们的存在,或根本没用过它们。
如果是简单的等待所有线程完成可使用 Java 1.5 的 CountDownLatch
, 这里有一篇介绍 CountDownLatch 协调线程
, 就是实现的 waitAll(threads) 功能。而 Java 8 的 CompletableFuture
的功能就多去,可简单使用它实现异步方法。虽说 CompletableFuture
实现了 Future
接口,但它多数方法源自于 CompletionStage
, 所以还里氏代换,用 Future
来引用 CompletableFuture
实例就很牵强了; 这也是为什么 PlayFramework 自 2.5 开始直接暴露的类型是 CompletionStage
而非其他两个。
顾名思义,CompletableFuture 代表着一个 Future 完成后该干点什么,具体大致有:
有时候可以把 Future 想像成与线程是一一对应的。
CompletableFuture
有太多太多的方法,并伴有 async
与 非 async
两个版本。本文之标题所谓 浅入
, 确不敢说是深入浅出,而且要达到对 CompletableFuture
的基本了解亦非本文的目的。看完之后只能知道何以谓之 Completable
, 不触及线程间的交互。
试想一下,如过不用 Future
或 CompletableFuture
, 想要实现等待某个线程完成之后才做后续的事,可以预设一段时间 Thread.sleep(xxx)
停下来等待,这很不可靠,时间短了线程没完,长了浪费时间; 或者采用来自于 巩固 Java Future 的使用
最后一段代码的方式
AtomicReference<String> reference = new AtomicReference<>(); new Thread(() -> { //do something that is time-consuming reference.set("I'm done"); //任务完成完设置 reference 的值 }).start(); while(reference.get() == null) { //耐心的等待,直到 reference.get() 有值为止 } System.out.println("Finally, " + reference.get());
当然,上面的代码改用 Future
来写会简单些,但仍然是调用 get()
来阻塞当前线程来等待,还每次要捕获 InterruptedException, ExecutionException 或 TimeoutException 异常。要是换作 CompletableFuture
来表述的话就更为直观,并且通过回调函数来处理后续操作,让代码行文更为流畅。
请看用 CompletableFuture
稍加润色的代码
package cc.unmi; import java.util.concurrent.CompletableFuture; public class Main { public static void main(String[] args) { CompletableFuture<Double> futurePrice = getPriceAsync(); //do anything you want, 当前线程不被阻塞 System.out.println(111); //线程任务完成的话,执行回调函数,不阻塞后续操作 futurePrice.whenComplete((aDouble, throwable) -> { System.out.println(aDouble); //do something else }); System.out.println(222); } static CompletableFuture<Double> getPriceAsync() { CompletableFuture<Double> futurePrice = new CompletableFuture<>(); new Thread(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } futurePrice.complete(23.55); }).start(); return futurePrice; } }
getPriceAsync()
就是一个异步方法,调用后马上返回得到一个 futurePrice, 用 Thread.sleep(5000)
模拟成一个耗时操作,线程执行完才设置 futurePrice 为完成状态并赋予结果。
CompletableFuture
的 whenComplete()
也是异步的,所以我们能看到输出结果如下
111
222
23.55
如果我们实际使用 CompletableFuture
时不调用 Future
接口的 get()
等方法,上面的引用类型可以改成 CompletationStage
, 以免受 get()
等方法的干扰。
上面的代码依然显式的用 new Thread(...).start()
来启动线程,如今我会尽量避免用这种方式来启动线程,而是用 Java 1.5 的 ExecutorService, 所以再加以改造:
package cc.unmi; import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; public class Main { public static void main(String[] args) throws IOException { CompletionStage<Double> futurePrice = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return 23.55; }); System.out.println(111); futurePrice.thenAccept(System.out::println); System.out.println(222); System.in.read(); } }
同样的输出结果。但如果把上面的 System.in.read()
移除掉,将看不到 23.55
的输出程序就直接退出了,为什么了呢?因为 CompletableFuture.supplyAsync()
方法默认把任务提交到 ForkJoinPool
线程池中执行,而它的线程设置了 daemon
属性为 true
, 所以它阻止不了主线程的退出,才用 System.in.read()
维持主线程的执行。如果换成别的线程池类型就可不需要代码 System.in.read()
, 再变
package cc.unmi; import java.io.IOException; import java.util.concurrent.*; public class Main { public static void main(String[] args) throws IOException { ExecutorService executor = Executors.newCachedThreadPool(); CompletionStage<Double> futurePrice = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return 23.55; }, executor); System.out.println(111); futurePrice.thenAccept(System.out::println); System.out.println(222); executor.shutdown(); } }
executor.shutdown()
并不是立即关掉线程池,而是采取更温柔, 安全的方式,等线程池中没有正在执行的任务时才关闭,从而结束主程序。
如果要深入了解 CompletableFuture
的用法更应该关注它的几个静态方法,以及 CompletionStage
接口中定义的所有法。