Java方法的异步调用最容易联想到使用线程,将需要异步执行的方法放在另外一个线程中执行:
new Thread(() -> { //Do whatever }).start();
如果想准确地等待并获得返回的结果,可以使用Java中Future是用来实现异步计算,其计算结果需要通过 get方法获取,问题是在计算完成之前调用get是阻塞的,这造成了非常严格的使用限制,使异步计算毫无意义。
CompletableFuture除了实现Future接口外,还实现了CompletionStage接口。 CompletionStage是一种承诺。它承诺最终将完成计算,最棒的是CompletionStage是它提供了大量的方法,可以让你能附加方法,在其完成时对这些方法执行的回调,这样我们就可以以非阻塞的方式构建系统。
最简单的异步计算
CompletableFuture.supplyAsync(this::sendMsg);
就这么简单。
supplyAsync方法参数需要提供一个方法,这个方法是我们想要异步执行的代码-在这里这个方法是sendMsg()。
这段异步代码被提交给ForkJoinPool.commonPool()执行
附加回调
回调的优点在于:在完成异步计算时再执行什么,而无需我们主动去等待结果,在第一个例子中,我们只是通过sendMsg在自己的线程中执行来异步发送消息。现在让我们添加一个回调函数,我们会在其中通知发送消息的方式。
CompletableFuture.supplyAsync(this::sendMsg) .thenAccept(this::notify);
thenAccept是添加回调的众多方法之一。它需要一个Consumer- 在我们的例子中notify- 它在sendMsg完成时处理其结果,也就是说sendMsg方法的输出结果正好是notify方法的入参。
链接多个回调
如果要继续将值从一个回调传递到另一个回调,thenAccept则不会删除它,因为Consumer不返回任何内容。
为了保持传递值,您可以简单地使用thenApply。
thenApply接受一个Function,但也返回一个Function:
CompletableFuture.supplyAsync(this::findReceiver) .thenApply(this::sendMsg) .thenAccept(this::notify);
异步任务将首先找到一个接收器findReceiver,然后在将结果传递给最后一个回调通知notify之前向接收器发送一条消息sendMsg。
构建异步系统
如果我们继续使用thenApply上面的例子,我们最终会得到嵌套的CompletionStage:
CompletableFuture.supplyAsync(this::findReceiver) .thenApply(this::sendMsgAsync); // Returns type CompletionStage<CompletionStage<String>>
这种嵌套的返回不是我们希望的,可以使用thenCompose,它能给一个Function返回CompletionStage。这将具有像flatMap一样的展平效果。
CompletableFuture.supplyAsync(this::findReceiver) .thenCompose(this::sendMsgAsync); // Returns type CompletionStage<String>
通过这种方式,我们可以在不丢失一层的情况下继续编写新功能CompletionStage
将回调作为单独的任务
到目前为止,我们所有的回调都在与它们的前任相同的线程上执行。如果您愿意,可以ForkJoinPool.commonPool()单独提交回调,而不是使用与前一个相同的线程,可以通过使用带Async异步后缀的方法版本来完成CompletionStage.
先看看再一个线程上执行:
CompletableFuture<String> receiver = CompletableFuture.supplyAsync(this::findReceiver); receiver.thenApply(this::sendMsg); receiver.thenApply(this::sendOtherMsg);
在上面的示例中,所有内容都将在同一个线程上执行。这导致最后一条消息等待第一条消息完成。
引入异步:
ompletableFuture<String> receiver = CompletableFuture.supplyAsync(this::findReceiver); receiver.thenApplyAsync(this::sendMsg); receiver.thenApplyAsync(this::sendOtherMsg);
注意到thenApplyAsync的async后缀,每条消息都作为单独的任务提交给ForkJoinPool.commonPool(),回调方法sendMsg和sendOtherMsg执行时异步的,后者不会等待前者完成。
当一切都出错时该怎么办
当发生异常时,幸运的是,CompletableFuture有一个很好的处理方式,使用exceptionally:
CompletableFuture.supplyAsync(this::failingMsg) .exceptionally(ex -> new Result(Status.FAILED)) .thenAccept(this::notify);
exceptionally 如果先前的计算失败并带有异常,则通过采用将执行的替代函数使我们有机会进行恢复回退。
回调会继续成功执行,只是输入参数是exceptionally执行的输出结果。
以受控方式处理超时
超时是我们编码人员经常需要照顾的事情。有时我们根本无法等待计算完成。这也适用于CompletableFutures。
这已经在Java 9中通过引入两种新方法来解决,这些方法将使我们能够处理超时 - orTimeout和completeOnTimeout。
CompletableFuture.supplyAsync(this::hangingMsg) .orTimeout(1, TimeUnit.MINUTES);
如果我们的挂消息在一分钟内没有完成,TimeoutException则会抛出一条消息。然后可以通过exceptionally转到回调来处理此异常。
另一个选择是使用completeOnTimeout它,这比抛出异常更好,因为它允许我们以一种很好的控制方式恢复。
CompletableFuture.supplyAsync(this::hangingMsg) .completeOnTimeout(new Result(Status.TIMED_OUT),1, TimeUnit.MINUTES);
回调取决于多个计算
如果一个的回调输入参数是需要依赖于两次计算结果,使用thenCombine:
下面代码是除了找到接收器之外,还要在发送消息之前执行创建一些内容的繁重工作:
CompletableFuture<String> to = CompletableFuture.supplyAsync(this::findReceiver); CompletableFuture<String> text = CompletableFuture.supplyAsync(this::createContent); to.thenCombine(text, this::sendMsg);
我们启动了两个异步作业 - 查找接收器并创建一些内容。然后我们thenCombine通过定义我们想要对这两个计算的结果做些什么。
回调取决于其中一个
我们现在已经介绍了依赖于两次计算的场景。现在,当你只需要其中一个的结果时呢?
CompletableFuture<String> firstSource = CompletableFuture.supplyAsync(this::findByFirstSource); CompletableFuture<String> secondSource = CompletableFuture.supplyAsync(this::findBySecondSource); firstSource.acceptEither(secondSource, this::sendMsg);
通过acceptEither实现等待两个计算结果,然后使用第二个返回的结果。