计算机技术发展迅猛,不管是在软件还是硬件方面都发展的非常快,电脑的CPU也在更新换代,强劲的CPU可以承担更多的任务。如果程序一直使用同步编程的话,那么将会浪费CPU资源。举个列子,一个CPU有10个通道,如果所有程序都走一个通道,那么剩余9个通道都是空闲的,那这9个通道都浪费掉了。
如果使用异步编程,那么其它9个通道都可以利用起来了,程序的吞吐量也上来了。也就是说要充分利用CPU资源,使其忙碌起来,而异步编程无疑是让其忙碌的一种方式。
在CompletableFuture出来之前,我们可以用Future接口进行异步编程,Future配合线程池一起工作,它把任务交给线程池,线程池中处理完毕后通过Future.get()方法来获取结果,Future.get()可以理解为一个回调操作,在回调之前我们还可以做其他事情。
下面一个例子用来模拟小明借图书场景:
public class FutureTest extends TestCase { // 申明一个线程池 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); public void testBook() { String bookName = "《飘》"; System.out.println("小明去图书馆借书"); Future<String> future = threadPoolExecutor.submit(() -> { // 模拟图书管理员找书花费时间 long minutes = (long) (Math.random() * 10) + 1; System.out.println("图书管理员花费了" + minutes + "分钟,找到了图书" + bookName); Thread.sleep((long) (Math.random() * 2000)); return bookName; }); // 等待过程中做其他事情 this.playPhone(); try { String book = future.get(); System.out.println("小明拿到了图书" + book); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } private void playPhone() { System.out.println("小明在玩手机等待图书"); } }
这是一个典型的Future使用方式,其中 future.get()
方法是阻塞的,程序运行会停留在这一行,我们的程序不可能一直等待下去,这个时候可以用 future.get(long timeout, TimeUnit unit)
方法,给定一个等待时间,如果超过等待时间还是没有拿到数据则抛出一个TimeoutException异常,我们可以catch这个异常,然后对异常情况做出处理。
现在假设小明最多等待2分钟,那么代码可以这么写:
String book = future.get(2, TimeUnit.MINUTES);
现在有这么一种情况,假设图书管理员找到书本之后,还需要交给助理,让助理录入图书信息,录完信息才把书交给小明。助理录入的过程也是异步的,也就是说,我们要实现多个异步进行流水线这样的功能。
可以发现Future用来处理多异步流水线非常困难,这个时候CompletableFuture就派上用场了,CompletableFuture自带流水线特性,就好比Collection对应的Stream。
下面来看下CompletableFuture的基本用法,我们将上面的例子使用CompletableFuture实现:
public class CompletableFutureTest extends TestCase { public void testBook() { String bookName = "《飘》"; System.out.println("小明去图书馆借书"); CompletableFuture<String> future = new CompletableFuture<>(); new Thread(() -> { // 模拟图书管理员找书花费时间 long minutes = (long) (Math.random() * 10) + 1; System.out.println("图书管理员花费了" + minutes + "分钟,找到了图书" + bookName); try { Thread.sleep((long) (Math.random() * 2000)); } catch (InterruptedException e) { e.printStackTrace(); } future.complete(bookName); }).start(); // 等待过程中做其他事情 this.playPhone(); try { String book = future.get(); System.out.println("小明拿到了图书" + book); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } private void playPhone() { System.out.println("小明在玩手机等待图书"); } }
其中 future.complete(bookName);
的意思是将结果返回,然后调用future.get()的地方就能获取到数据。
CompletableFuture还提供了一个静态方法 CompletableFuture.supplyAsync(Supplier)
用来快速创建任务,参数Supplier用来返回任务结果。
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 模拟图书管理员找书花费时间 long minutes = (long) (Math.random() * 10) + 1; System.out.println("图书管理员花费了" + minutes + "分钟,找到了图书《飘》"); try { Thread.sleep((long) (Math.random() * 2000)); } catch (InterruptedException e) { e.printStackTrace(); } return "《飘》"; });
如果不需要返回结果可以使用 CompletableFuture.runAsync(Runnable)
方法:
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> System.out.println("running"))
接下来,我们使用CompletableFuture完成上面说到的需求: 图书管理员找到书本之后,还需要交给助理,让助理录入图书信息
我们需要用到 CompletableFuture.thenCompose(Function)
方法,用法如下
public void testBook3() { String bookName = "《飘》"; System.out.println("小明去图书馆借书"); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 模拟图书管理员找书花费时间 long minutes = (long) (Math.random() * 10) + 1; System.out.println("图书管理员花费了" + minutes + "分钟,找到了图书"+bookName); try { Thread.sleep((long) (Math.random() * 2000)); } catch (InterruptedException e) { e.printStackTrace(); } return bookName; }) // thenCompose,加入第二个异步任务 .thenCompose((book/*这里的参数是第一个异步返回结果*/) -> CompletableFuture.supplyAsync(()-> { System.out.println("助理录入图书信息"); return book; })); // 等待过程中做其他事情 this.playPhone(); try { String book = future.get(); System.out.println("小明拿到了图书" + book); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
thenApply(Function)
的意思是对CompletableFuture返回结果做进一步处理,然后返回一个新的结果,它的参数使用的是Function,意味着可以使用lambda表达式,表达式会提供一个参数,然后需要一个返回结果。
public void testThenApply() throws ExecutionException, InterruptedException { int i = 0; CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> i + 1) // 将相加后的结果转成字符串 // v就是上面i+1后的结果 // 等同于:.thenApply((v) -> String.valueOf(v)) .thenApply(String::valueOf); String str = future.get(); System.out.println("String value: " + str); }
如果不需要返回结果,可以用 thenAccept(Consumer<? super T> action)
thenApply(),thenAccept()的区别是,thenApply提供参数,需要返回值,thenAccept只提供参数不需要返回值。
thenCompose()
的用法如下:
completableFuture1.thenCompose((completableFuture1_result) -> completableFuture2)
这段代码的意思是将completableFuture1中返回的结果带入到completableFuture2中去执行,然后返回completableFuture2中的结果,下面是一个简单的实例:
public void testThenCompose() throws ExecutionException, InterruptedException { int i=0; CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> i + 1) .thenCompose((j) -> CompletableFuture.supplyAsync(() -> j + 2)); Integer result = future.get(); }
打印:
result:3
thenCombine()
方法是将两个CompletableFuture任务结果组合起来
public void testThenCombine() throws ExecutionException, InterruptedException { int i=0; CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> i + 1) .thenCombine(CompletableFuture.supplyAsync(() -> i + 2), (result1, result2) -> { System.out.println("第一个CompletableFuture结果:" + result1); System.out.println("第二个CompletableFuture结果:" + result2); return result1 + result2; }); Integer total = future.get(); System.out.println("总和:" + total); }
打印:
第一个CompletableFuture结果:1 第二个CompletableFuture结果:2 总和:3
假设有一组CompletableFuture对象,现在需要这些CompletableFuture任务全部执行完毕,然后再接着做某些事情。针对这个需求,我们可以使用 CompletableFuture.join()
方法。
public void testJoin() { List<CompletableFuture> futures = new ArrayList<>(); System.out.println("100米跑步比赛开始"); for (int i = 0; i < 10; i++) { final int num = i + 1; futures.add(CompletableFuture.runAsync(() -> { int v = (int)(Math.random() * 10); try { Thread.sleep(v); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(num + "号选手到达终点,用时:" + (10 + v) + "秒"); })); } CompletableFuture<Double>[] futureArr = futures.toArray(new CompletableFuture[futures.size()]); CompletableFuture.allOf(futureArr).join(); System.out.printf("所有选手到达终点"); }
打印:
100米跑步比赛开始 3号选手到达终点,用时:16秒 1号选手到达终点,用时:15秒 2号选手到达终点,用时:11秒 5号选手到达终点,用时:13秒 4号选手到达终点,用时:15秒 8号选手到达终点,用时:10秒 6号选手到达终点,用时:18秒 10号选手到达终点,用时:12秒 7号选手到达终点,用时:19秒 9号选手到达终点,用时:18秒 所有选手到达终点
如果需要在任务处理完毕后做一些处理,可以使用 whenComplete(BiConsumer)
或 whenCompleteAsync(BiConsumer)
String bookName = "《飘》"; CompletableFuture<String> future = CompletableFuture. supplyAsync(() -> {System.out.println("图书管理员开始找书");return bookName;}) .thenApply((book) -> {System.out.println("找到书本,助理开始录入信息"); return book;}) .whenCompleteAsync(((book, throwable) -> { System.out.println("助理录入信息完毕,通知小明来拿书"); })); String book = future.get(); System.out.println("小明拿到书" + book);
打印:
图书管理员开始找书 找到书本,助理开始录入信息 助理录入信息完毕,通知小明来拿书 小明拿到书《飘》
对异常的处理通常分为两步,第一步抛出异常,第二步捕获异常,首先我们来看下CompletableFuture如何抛出异常。
CompletableFuture抛出异常有两种方式,第一种方式,如果CompletableFuture是直接new出来的对象,必须使用 future.completeExceptionally(e)
抛出异常,如果采用 throw new RuntimeException(e);
方式抛出异常,调用者是捕获不到的。
CompletableFuture<String> future = new CompletableFuture<>(); new Thread(()->{ String value = "http://"; try { // 模拟出错,给一个不存在的ENCODE value = URLEncoder.encode(value, "UTF-888"); future.complete(value); } catch (UnsupportedEncodingException e) { // future处理异常 future.completeExceptionally(e); // !!此方式调用者无法捕获异常 // throw new RuntimeException(e); } }).start();
第二中方式,如果CompletableFuture对象是由工厂方法(如 CompletableFuture.supplyAsync()
)创建的,可以直接 throw new RuntimeException(e)
,因为supplyAsync()封装的方法内部做了try…catch处理
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { String value = "http://"; try { // 模拟出错,给一个不存在的ENCODE value = URLEncoder.encode(value, "UTF-88"); return value; } catch (UnsupportedEncodingException e) { // 这样不行,可以throw //future.completeExceptionally(e); throw new RuntimeException(e); } });
接下来我们来看下如何捕获异常,捕获异常也分为两种。
第一种,在catch中捕获:
try { future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { // 捕获异常1,首先会到这里来 System.out.println("捕获异常1,msg:" + e.getMessage()); }
第二种,在future.whenCompleteAsync()或future.whenComplete()方法中捕获:
future.whenCompleteAsync((value, e)->{ if (e != null) { // 捕获异常2,这里也会打印 System.out.println("捕获异常2, msg:" + e.getMessage()); } else { System.out.println("返回结果:" + value); } });
CompletableFuture的出现弥补了Future接口在某些地方的不足,比如事件监听,多任务合并,流水线操作等。同时CompletableFuture配合lambda表达式让开发者使用起来更加方面,使得开发者在异步编程上多了一种选择。