某个网站的数据来自Facebook、Twitter和Google,这就需要网站与互联网上的多个Web服务通信。可是,你并不希望因为等待某些服务的响应,阻塞应用程序的运行,浪费数十亿宝贵的CPU时钟周期。比如,不要因为等待Facebook的数据,暂停对来自Twitter的数据处理。
第7章中介绍的分支/合并框架以及并行流是实现并行处理的宝贵工具;它们将一个操作切分为多个子操作,在多个不同的核、CPU甚至是机器上并行地执行这些子操作。与此相反,如果你的意图是实现并发,而非并行,或者你的主要目标是在同一个CPU上执行几个松耦合的任务,充分利用CPU的核,让其足够忙碌,从而最大化程序的吞吐量,那么你其实真正想做的是避免因为等待远程服务的返回,或者对数据库的查询,而阻塞线程的执行,浪费宝贵的计算资源,因为这种等待的时间很可能相当长。
Future接口在Java 5中被引入,设计初衷是对将来某个时刻会发生的结果进行建模。它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在Future中触发那些潜在耗时的操作把调用线程解放出来,让它能继续执行其他有价值的工作,不再需要等待耗时的操作完成。Future的另一个优点是它比更底层的Thread更易用。要使用Future,通常你只需要将耗时的操作封装在一个Callable对象中,再将它提交给ExecutorService。使用Future以异步的方式执行一个耗时的操作:
线程可以在ExecutorService以并发方式调用另一个线程执行耗时操作的同时,去执行一些其他的任务。接着,如果你已经运行到没有异步操作的结果就无法继续任何有意义的工作时,可以调用它的get方法去获取操作的结果。如果操作已经完成,该方法会立刻返回操作的结果,否则它会阻塞你的线程,直到操作完成,返回相应的结果。如果该长时间运行的操作永远不返回了会怎样?Future提供了一个无需任何参数的get方法,推荐使用重载版本的get方法,它接受一个超时的参数,可以定义线程等待Future结果的最长时间,避免无休止的等待。下图是Future异步执行线程原理图。
Future接口有一定的局限性,比如,我们很难表述Future结果之间的依赖性。因此我们引入了CompletableFuture。接下来通过一个“最佳价格查询器“的应用,它会查询多个在线商店,依据给定的产品或服务找出最低的价格,来展现CompletableFuture实现异步应用。通过此例你能学到这些:
同步API和异步API:
同步操作中会为等待同步事件完成而等待1s,这种是无法接受的,对于程序体验来说是非常不好的。
Java 5引入了java.util.concurrent.Future接口表示一个异步计算(即调用线程可以继续运行,不会因为调用方法而阻塞)的结果。这意味着Future是一个暂时还不可知值的处理器,这个值在计算完成后,可以通过调用它的get方法取得。这种方式下,在进行价格查询的同时,还能执行一些其他的任务,比如查询其他商店中商品的价格,不会阻塞在那里等待第一家商店返回请求的结果。最后,如果所有有意义的工作都已经完成,所有要执行的工作都依赖于商品价格时,再调用Future的get方法。执行了这个操作后,要么获得Future中封装的值(如果异步任务已经完成),要么发生阻塞,直到该异步任务完成,期望的值能够访问。同时,如果某个商品价格计算发生异常,会将当前线程杀死,从而导致等待get方法返回结果的客户端永久地被阻塞。客户端可以使用重载版本的get方法,设置超时参数来避免。为了让客户端能了解无法提供请求商品价格的原因,你需要使用CompletableFuture的completeExceptionally方法将导致CompletableFuture内发生问题的异常抛出。
supplyAsync方法接受一个生产者(Supplier)作为参数,返回一个CompletableFuture对象,该对象完成异步执行后会读取调用生产者方法的返回值。生产者方法会交由ForkJoinPool池中的某个执行线程(Executor)运行,但是你也可以使用supplyAsync方法的重载版本,传递第二个参数指定不同的执行线程执行生产者方法。
CompletableFuture版本的程序似乎比并行流版本的程序还快那么一点儿。但是最后这个版本也不太令人满意。它们看起来不相伯仲,究其原因都一样:它们内部采用的是同样的通用线程池,默认都使用固定数目的线程,具体线程数取决于Runtime.getRuntime().availableProcessors()的返回值。然而,CompletableFuture具有一定的优势,因为它允许你对执行器(Executor)进行配置,尤其是线程池的大小,让它以更适合应用需求的方式进行配置,满足程序的要求,而这是并行流API无法提供的。 顺序执行和并行执行的原理对比:
图11-4的上半部分展示了使用单一流水线处理流的过程,我们看到,执行的流程(以虚线标识)是顺序的。事实上,新的CompletableFuture对象只有在前一个操作完全结束之后,才能创建。与此相反,图的下半部分展示了如何先将CompletableFutures对象聚集到一个列表中(即图中以椭圆表示的部分),让对象们可以在等待其他对象完成操作之前就能启动。
目前为止,你已经知道对集合进行并行计算有两种方式:要么将其转化为并行流,利用map这样的操作开展工作,要么枚举出集合中的每一个元素,创建新的线程,在CompletableFuture内对其进行操作。后者提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待I/O而发生阻塞。
通过在shop构成的流上采用流水线方式执行三次map操作,我们得到了结果。
代码如图:
原理图:
Java 8的CompletableFuture API提供了名为thenCompose的方法,它就是专门为这一目的而设计的,thenCompose方法允许你对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作。换句话说,你可以创建两个CompletableFutures对象,对第一个CompletableFuture对象调用thenCompose,并向其传递一个函数。当第一个 CompletableFuture执行完毕后,它的结果将作为该函数的参数,这个函数的返回值是以第一 个CompletableFuture的返回做输入计算出的第二个CompletableFuture对象。thenCompose方法像CompletableFuture类中的其他方法一样,也提供了一个以Async后缀结尾的版本thenComposeAsync。通常而言,名称中不带Async的方法和它的前一个任务一样,在同一个线程中运行;而名称以Async结尾的方法会将后续的任务提交到一个线程池,所以每个任务是由不同的线程处理的。
将两个CompletableFuture对象结合起来,无论他们是否存在依赖。thenCombine方法,它接收名为BiFunction的第二参数,这个参数 定义了当两个CompletableFuture对象完成计算后,结果如何合并。同thenCompose方法一样, thenCombine方法也提供有一个Async的版本。这里,如果使用thenCombineAsync会导致BiFunction中定义的合并操作被提交到线程池中,由另一个任务以异步的方式执行。
代码图:
原理图:
Java 8的CompletableFuture通过thenAccept方法提供了这一功能,它接收 CompletableFuture执行完毕后的返回值做参数。thenAccept方法也提供 了一个异步版本,名为thenAcceptAsync。异步版本的方法会对处理结果的消费者进行调度, 从线程池中选择一个新的线程继续执行,不再由同一个线程完成CompletableFuture的所有任 务。因为你想要避免不必要的上下文切换,更重要的是你希望避免在等待线程上浪费时间,尽快响应CompletableFuture的completion事件,所以这里没有采用异步版本。