所谓异步其实就是实现一个无需等待被调用函数的返回值而让操作继续运行的方法
CompletableFuture<String> noArgsFuture = new CompletableFuture<>();
runAsync
方法可以在后台执行异步计算,但是此时并没有返回值。持有一个 Runnable
对象。
CompletableFuture noReturn = CompletableFuture.runAsync(()->{ //执行逻辑,无返回值 });
此时我们看到返回的是 CompletableFuture<T>
此处的 T
就是你想要的返回值的类型。其中的 Supplier<T>
是一个简单的函数式接口。
CompletableFuture<String> hasReturn = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { return "hasReturn"; } });
此时可以使用 lambda
表达式使上面的逻辑更加清晰
CompletableFuture<String> hasReturnLambda = CompletableFuture.supplyAsync(TestFuture::get); private static String get() { return "hasReturnLambda"; }
异步任务也是有返回值的,当我们想要用到异步任务的返回值时,我们可以调用 CompletableFuture
的 get()
阻塞,直到有异步任务执行完有返回值才往下执行。
我们将上面的 get()
方法改造一下,使其停顿十秒时间。
private static String get() { System.out.println("Begin Invoke getFuntureHasReturnLambda"); try { Thread.sleep(10000); } catch (InterruptedException e) { } System.out.println("End Invoke getFuntureHasReturnLambda"); return "hasReturnLambda"; }
然后进行调用
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> funtureHasReturnLambda = (CompletableFuture<String>) getFuntureHasReturnLambda(); System.out.println("Main Method Is Invoking"); funtureHasReturnLambda.get(); System.out.println("Main Method End"); }
可以看到输出如下,只有调用 get()
方法的时候才会阻塞当前线程。
Main Method Is Invoking Begin Invoke getFuntureHasReturnLambda End Invoke getFuntureHasReturnLambda Main Method End
除了等待异步任务返回值以外,我们也可以在任意时候调用 complete()
方法来自定义返回值。
CompletableFuture<String> funtureHasReturnLambda = (CompletableFuture<String>) getFuntureHasReturnLambda(); System.out.println("Main Method Is Invoking"); new Thread(()->{ System.out.println("Thread Is Invoking "); try { Thread.sleep(1000); funtureHasReturnLambda.complete("custome value"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Thread End "); }).run(); String value = funtureHasReturnLambda.get(); System.out.println("Main Method End value is "+ value);
我们可以发现输出是新起线程的输出值,当然这是因为我们的异步方法设置了等待10秒,如果此时异步方法等待1秒,新起的线程等待10秒,那么输出的值就是异步方法中的值了。
Main Method Is Invoking Begin Invoke getFuntureHasReturnLambda Thread Is Invoking Thread End Main Method End value is custome value
如果有一个异步任务的完成需要依赖前一个异步任务的完成,那么该如何写呢?是调用 get()
方法获得返回值以后然后再执行吗?这样写有些麻烦, CompletableFuture
为我们提供了方法来完成我们想要顺序执行一些异步任务的需求。 thenApply
、 thenAccept
、 thenRun
这三个方法。这三个方法的区别就是。
方法名 | 是否可获得前一个任务的返回值 | 是否有返回值 |
---|---|---|
thenApply
|
能获得 | 有 |
thenAccept
|
能获得 | 无 |
thenRun
|
不可获得 | 无 |
所以一般来说 thenAccept
、 thenRun
这两个方法在调用链的最末端使用。接下来我们用真实的例子感受一下。
//thenApply 可获取到前一个任务的返回值,也有返回值 CompletableFuture<String> seqFutureOne = CompletableFuture.supplyAsync(()-> "seqFutureOne"); CompletableFuture<String> seqFutureTwo = seqFutureOne.thenApply(name -> name + " seqFutureTwo"); System.out.println(seqFutureTwo.get()); //thenAccept 可获取到前一个任务的返回值,但是无返回值 CompletableFuture<Void> thenAccept = seqFutureOne .thenAccept(name -> System.out.println(name + "thenAccept")); System.out.println("-------------"); System.out.println(thenAccept.get()); //thenRun 获取不到前一个任务的返回值,也无返回值 System.out.println("-------------"); CompletableFuture<Void> thenRun = seqFutureOne.thenRun(() -> { System.out.println("thenRun"); }); System.out.println(thenRun.get());
返回的信息如下
seqFutureOne seqFutureTwo seqFutureOnethenAccept ------------- null ------------- thenRun null
我们可以发现这三个方法都带有一个后缀为 Async
的方法,例如 thenApplyAsync
。那么带 Async
的方法和不带此后缀的方法有什么不同呢?我们就以 thenApply
和 thenApplyAsync
两个方法进行对比,其他的和这个一样的。
这两个方法区别就在于谁去执行这个任务,如果使用 thenApplyAsync
,那么执行的线程是从 ForkJoinPool.commonPool()
中获取不同的线程进行执行,如果使用 thenApply
,如果 supplyAsync
方法执行速度特别快,那么 thenApply
任务就是主线程进行执行,如果执行特别慢的话就是和 supplyAsync
执行线程一样。接下来我们通过例子来看一下,使用 sleep
方法来反应 supplyAsync
执行速度的快慢。
//thenApply和thenApplyAsync的区别 System.out.println("-------------"); CompletableFuture<String> supplyAsyncWithSleep = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } return "supplyAsyncWithSleep Thread Id : " + Thread.currentThread(); }); CompletableFuture<String> thenApply = supplyAsyncWithSleep .thenApply(name -> name + "------thenApply Thread Id : " + Thread.currentThread()); CompletableFuture<String> thenApplyAsync = supplyAsyncWithSleep .thenApplyAsync(name -> name + "------thenApplyAsync Thread Id : " + Thread.currentThread()); System.out.println("Main Thread Id: "+ Thread.currentThread()); System.out.println(thenApply.get()); System.out.println(thenApplyAsync.get()); System.out.println("-------------No Sleep"); CompletableFuture<String> supplyAsyncNoSleep = CompletableFuture.supplyAsync(()->{ return "supplyAsyncNoSleep Thread Id : " + Thread.currentThread(); }); CompletableFuture<String> thenApplyNoSleep = supplyAsyncNoSleep .thenApply(name -> name + "------thenApply Thread Id : " + Thread.currentThread()); CompletableFuture<String> thenApplyAsyncNoSleep = supplyAsyncNoSleep .thenApplyAsync(name -> name + "------thenApplyAsync Thread Id : " + Thread.currentThread()); System.out.println("Main Thread Id: "+ Thread.currentThread()); System.out.println(thenApplyNoSleep.get()); System.out.println(thenApplyAsyncNoSleep.get());
我们可以看到输出为
------------- Main Thread Id: Thread[main,5,main] supplyAsyncWithSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main]------thenApply Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main] supplyAsyncWithSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main]------thenApplyAsync Thread Id : Thread[ForkJoinPool.commonPool-worker-1,5,main] -------------No Sleep Main Thread Id: Thread[main,5,main] supplyAsyncNoSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-2,5,main]------thenApply Thread Id : Thread[main,5,main] supplyAsyncNoSleep Thread Id : Thread[ForkJoinPool.commonPool-worker-2,5,main]------thenApplyAsync Thread Id : Thread[ForkJoinPool.commonPool-worker-2,5,main]
可以看到 supplyAsync
方法执行速度慢的话 thenApply
方法执行线程和 supplyAsync
执行线程相同,如果 supplyAsync
方法执行速度快的话,那么 thenApply
方法执行线程和 Main
方法执行线程相同。
将两个 CompletableFuture
组合到一起有两个方法
thenCompose() thenCombine()
我们定义两个异步任务,假设第二个定时任务需要用到第一个定时任务的返回值。
public static CompletableFuture<String> getTastOne(){ return CompletableFuture.supplyAsync(()-> "topOne"); } public static CompletableFuture<String> getTastTwo(String s){ return CompletableFuture.supplyAsync(()-> s + " topTwo"); }
我们利用 thenCompose()
方法进行编写
CompletableFuture<String> thenComposeComplet = getTastOne().thenCompose(s -> getTastTwo(s)); System.out.println(thenComposeComplet.get());
输出就是
topOne topTwo
如果还记得前面的 thenApply()
方法的话,应该会想这个利用 thenApply()
方法也是能够实现类似的功能的。
//thenApply CompletableFuture<CompletableFuture<String>> thenApply = getTastOne() .thenApply(s -> getTastTwo(s)); System.out.println(thenApply.get().get());
但是我们发现返回值是嵌套返回的一个类型,而想要获得最终的返回值需要调用两次 get()
例如我们此时需要计算两个异步方法返回值的和。求和这个操作是必须是两个异步方法得出来值的情况下才能进行计算,因此我们可以用 thenCombine()
方法进行计算。
CompletableFuture<Integer> thenComposeOne = CompletableFuture.supplyAsync(() -> 192); CompletableFuture<Integer> thenComposeTwo = CompletableFuture.supplyAsync(() -> 196); CompletableFuture<Integer> thenComposeCount = thenComposeOne .thenCombine(thenComposeTwo, (s, y) -> s + y); System.out.println(thenComposeCount.get());
此时 thenComposeOne
和 thenComposeTwo
都完成时才会调用传给 thenCombine
方法的回调函数。
在上面我们用 thenCompose()
和 thenCombine()
两个方法将两个 CompletableFuture
组装起来,如果我们想要将任意数量的 CompletableFuture
组合起来呢?可以使用下面两个方法进行组合。
allOf()
:等待所有 CompletableFuture
完后以后才会运行回调函数 anyOf()
:只要其中一个 CompletableFuture
完成,那么就会执行回调函数。注意此时其他的任务也就不执行了。 接下来演示一下两个方法的用法
//allOf() CompletableFuture<Integer> one = CompletableFuture.supplyAsync(() -> 1); CompletableFuture<Integer> two = CompletableFuture.supplyAsync(() -> 2); CompletableFuture<Integer> three = CompletableFuture.supplyAsync(() -> 3); CompletableFuture<Integer> four = CompletableFuture.supplyAsync(() -> 4); CompletableFuture<Integer> five = CompletableFuture.supplyAsync(() -> 5); CompletableFuture<Integer> six = CompletableFuture.supplyAsync(() -> 6); CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(one, two, three, four, five, six); voidCompletableFuture.thenApply(v->{ return Stream.of(one,two,three,four, five, six) .map(CompletableFuture::join) .collect(Collectors.toList()); }).thenAccept(System.out::println); CompletableFuture<Void> voidCompletableFuture1 = CompletableFuture.runAsync(() -> { try { Thread.sleep(1000); } catch (Exception e) { } System.out.println("1"); });
我们定义了6个 CompletableFuture
等待所有的 CompletableFuture
等待所有任务完成以后然后将其值输出。
anyOf()
的用法
CompletableFuture<Void> voidCompletableFuture1 = CompletableFuture.runAsync(() -> { try { Thread.sleep(1000); } catch (Exception e) { } System.out.println("voidCompletableFuture1"); }); CompletableFuture<Void> voidCompletableFutur2 = CompletableFuture.runAsync(() -> { try { Thread.sleep(2000); } catch (Exception e) { } System.out.println("voidCompletableFutur2"); }); CompletableFuture<Void> voidCompletableFuture3 = CompletableFuture.runAsync(() -> { try { Thread.sleep(3000); } catch (Exception e) { } System.out.println("voidCompletableFuture3"); }); CompletableFuture<Object> objectCompletableFuture = CompletableFuture .anyOf(voidCompletableFuture1, voidCompletableFutur2, voidCompletableFuture3); objectCompletableFuture.get();
这里我们定义了3个 CompletableFuture
进行一些耗时的任务,此时第一个 CompletableFuture
会率先完成。打印结果如下。
voidCompletableFuture1
我们了解了 CompletableFuture
如何异步执行,如何组合不同的 CompletableFuture
,如何顺序执行 CompletableFuture
。那么接下来还有一个重要的一步,就是在执行异步任务时发生异常的话该怎么办。我们先写个例子。
CompletableFuture.supplyAsync(()->{ //发生异常 int i = 10/0; return "Success"; }).thenRun(()-> System.out.println("thenRun")) .thenAccept(v -> System.out.println("thenAccept")); CompletableFuture.runAsync(()-> System.out.println("CompletableFuture.runAsync"));
执行结果为,我们发现只要执行链中有一个发生了异常,那么接下来的链条也就不执行了,但是主流程下的其他 CompletableFuture
还是会运行的。
CompletableFuture.runAsync
我们可以使用 exceptionally
进行异常的处理
//处理异常 CompletableFuture<String> exceptionally = CompletableFuture.supplyAsync(() -> { //发生异常 int i = 10 / 0; return "Success"; }).exceptionally(e -> { System.out.println(e); return "Exception has Handl"; }); System.out.println(exceptionally.get());
打印如下,可以发现其接收值是异常信息,也能够返回自定义返回值。
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero Exception has Handl
调用 handle()
方法也能够捕捉到异常并且自定义返回值,他和 exceptionally()
方法不同一点是 handle()
方法无论发没发生异常都会被调用。例子如下
System.out.println("-------有异常-------"); CompletableFuture.supplyAsync(()->{ //发生异常 int i = 10/0; return "Success"; }).handle((response,e)->{ System.out.println("Exception:" + e); System.out.println("Response:" + response); return response; }); System.out.println("-------无异常-------"); CompletableFuture.supplyAsync(()->{ return "Sucess"; }).handle((response,e)->{ System.out.println("Exception:" + e); System.out.println("Response:" + response); return response; });
打印如下,我们可以看到在没有发生异常的时候 handle()
方法也被调用了
-------有异常------- Exception:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero Response:null -------无异常------- Exception:null Response:Sucess