转载

Java 8新特性之CompletableFuture:组合式异步编程

随着多核处理器的出现,提升应用程序的处理速度最有效的方式就是可以编写出发挥多核能力的软件,我们已经可以通过切分大型的任务,让每个子任务并行运行,使用线程的方式,分支/合并框架(Java 7) 和并行流(Java 8)来实现。

现在很多大型的互联网公司都对外提供了API服务,比如百度的地图,微博的新闻,天气预报等等。很少有网站或网络应用汇以完全隔离的方式工作,而是采用混聚的方式:它会使用来自多个源的内容,将这些内容聚合在一起,方便用户使用。

比如实现一个功能,你需要在微博中搜索某个新闻,然后根据当前坐标获取天气预报。这些调用第三方信息的时候,不想因为等待搜索新闻时,放弃对获取天气预报的处理,于是我们可以使用 分支/合并框架 及并行流 来并行处理,将他们切分为多个子操作,在多个不同的核、CPU甚至是机器上并行的执行这些子操作。

相反,如果你想实现并发,而不是并行,或者你的主要目标是在同一个CPU上执行几个松耦合的任务,充分利用CPU的核,让其足够忙碌,从而最大化程序的吞吐量,那么你其实真正想做的是避免因为等待远程服务的返回,或者对数据库的查询,而阻塞线程的执行,浪费宝贵的计算资源,因为这种等待时间可能会很长。Future接口,尤其是它的新版实现CompletableFuture是处理这种情况的利器。

Future接口

Future接口在java 5中被引入,设计初衷是对将来某个时刻会发生的结果进行建模。它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在Future中触发那些可能会耗时的操作把调用线程解放出来,让它能继续执行其他工作,不用一直等待耗时的操作完成,比如:你拿了一袋子衣服到洗衣店去洗衣服,洗衣店会给你张发票,告诉你什么时候会洗好,然后你就可以去做其他的事了。Future的另一个优点是它比更底层的Thread更容易使用。使用Future只需要讲耗时的操作封装在一个Callable对象中,再将它提交给ExecutorService就可以了。 Java 8之前使用Future的例子:

public static void main(String[] args) {

//创建Executor-Service,通过他可以向线程池提交任务

ExecutorService executor = Executors.newCachedThreadPool();

//向executor-Service提交 Callable对象

Future<Double> future = executor.submit(new Callable<Double>() {

@Override

public Double call() throws Exception {

//异步的方式执行耗时的操作

return doSomeLongComputation();

}

});

//异步时,做其他的事情

doSomethingElse();

try{

//获取异步操作的结果,如果被阻塞,无法得到结果,那么最多等待1秒钟之后退出

Double result = future.get(1, TimeUnit.SECONDS);

System.out.print(result);

} catch (InterruptedException e) {

System.out.print("计算抛出一个异常");

} catch (ExecutionException e) {

System.out.print("当前线程在等待过程中被中断");

} catch (TimeoutException e) {

System.out.print("future对象完成之前已过期");

}

}

public static Double doSomeLongComputation() throws InterruptedException {

Thread.sleep(1000);

return 3 + 4.5;

}

public static void doSomethingElse(){

System.out.print("else");

}

这种方式可以再ExecutorService以并发的方式调用另外一个线程执行耗时的操作的同时,去执行一些其他任务。接着到已经没有任务运行时,调用它的get方法来获取操作的结果,如果操作完成,就会返回结果,否则会阻塞你的线程,一直到操作完成,返回响应的结果。

CompletableFuture

在java 8 中引入了CompletableFuture类,它实现了Future接口,使用了Lambda表达式以及流水线的思想,通过下面这个例子进行学习,比如:我们要做一个商品查询,根据折扣来获取价格。

public class Shop {

public double getPrice(String product) throws InterruptedException {

//查询商品的数据库,或链接其他外部服务获取折扣

Thread.sleep(1000);

return new Random().nextDouble() * product.charAt(0) + product.charAt(1);

}

}

当调用这个方法时,它会阻塞进程,等待事件完成。

将同步方法转换成异步方法

public Future<Double> getPriceAsync(String product){

//创建CompletableFuture对象

CompletableFuture<Double> futurePrice = new CompletableFuture<>();

new Thread (()->{

try {

//在另一个线程中执行计算

double price = getPrice(product);

//需要长时间计算的任务结束并得出结果时,设置future的返回值

futurePrice.complete(price);

} catch (InterruptedException e) {

e.printStackTrace();

}

}).start();

return futurePrice;

}

然后可以这样调用:

System.out.println("begin");

Future<Double> futurePrice = shop.getPriceAsync("ss");

System.out.println("doSomething");

System.out.println(futurePrice.get());

System.out.println("end");

begin

doSomething

171.47509091822835

end

这个例子中,首先会调用接口 立即返回一个Future对象,在这种方式下,在查询价格的同时,还可以处理其他任务。最后所有的工作都已经完成,然后再调用future的get方法。获得Future中封装的值,要么发生阻塞,直到该任务异步任务完成,期望的值能够返回。

错误处理

如果没有意外,这个代码工作的会非常正常。但是如果计算价格的过程中发生了错误,那么get会永久的被阻塞。这时可以使用重载的get方法,让它超过一个时间后就强制返回。应该尽量在代码中使用这种方式来防止程序永久的等待下去。超时会引发TimeoutException。但是这样会导致你无法知道具体什么原因导致Future无法返回,这时需要使用CompletableFUture的completeExceptionally方法将导致CompletableFuture内发生的问题抛出。

public Future<Double> getPriceAsync(String product){

//创建CompletableFuture对象

CompletableFuture<Double> futurePrice = new CompletableFuture<>();

new Thread (()->{

try {

double price = getPrice(product);

futurePrice.complete(price);

} catch (Exception ex) {

//抛出异常

futurePrice.completeExceptionally(ex);

}

}).start();

return futurePrice;

}

调用时:

System.out.println("begin");

Future<Double> futurePrice = shop.getPriceAsync("ss");

System.out.println("doSomething");

try {

System.out.println(futurePrice.get(1, TimeUnit.SECONDS));

} catch (TimeoutException e) {

System.out.print(e);

}

System.out.println("end");

设置超时时间,然后会将错误信息打印出来。

工厂方法supplyAsync创建CompletableFuture

使用工厂方法可以一句话来创建getPriceAsync方法

public Future<Double> getPriceAsync(String product) {

return CompletableFuture.supplyAsync(() -> getPrice(product));

}

supplyAsync方法接受一个生产者(Supplier)作为参数,返回一个CompletableFuture对象,该对象完成异步执行后悔读取调用生产者方法的返回值。生产者方法会交由ForkJoinPool池中的某个执行线程(Executor)运行,也可以调用supplyAsync方法的重载版本,传入第二个参数指定不同的线程执行生产者方法。 工厂方法返回的CompletableFuture对象也提供了同样的错误处理机制。

阻塞优化

例如现在有一个商品列表,然后输出一个字符串 商品名,价格 。

List<Shop> shops = Arrays.asList(

new Shop("one"),

new Shop("two"),

new Shop("three"),

new Shop("four"));

long start = System.nanoTime();

List<String> str = shops.stream().map(shop -> String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName()))).collect(toList());

System.out.print(str);

long end = System.nanoTime();

System.out.print((end - start) / 1000000);

[one price: 161.83, two price: 126.04, three price: 153.20, four price: 166.06]

4110

每次调用getPrice方法都会阻塞1秒钟,对付这种我们可以使用并行流来进行优化:

List<String> str = shops.parallelStream().map(shop -> String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName()))).collect(toList());

1137

明显速度提升了,现在对四个商品查询 实现了并行,所以只耗时1秒多点,下面我们尝试CompletableFuture:

List<CompletableFuture<String>> str2 = shops.stream().map(shop->

CompletableFuture.supplyAsync(

()->String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName())))).collect(toList());

我们使用工厂方法supplyAsync创建CompletableFuture对象,使用这种方式我们会得到一个List<CompletableFuture<String>>,列表中的每一个ComplatableFuture对象在计算完成后都会包含商品的名称。但是我们要求返回的是List<String>,所以需要等待所有的future执行完毕,再将里面的值提取出来,填充到列表中才能返回。

List<String> str3 =str2.stream().map(CompletableFuture::join).collect(toList());

为了返回List<String> 需要对str2添加第二个map操作,对List中的所有future对象执行join操作,一个接一个的等待他们的运行结束。CompletableFuture类中的join和Future接口中的get方法有相同的含义,并且声明在Future接口中,唯一的不同是join不会抛出任何检测到的异常。

1149

现在使用了两个不同的Stream流水线,而不是在同一个处理流的流水线上一个接一个的防治两个map操作。考虑流操作之间的延迟特性,如果你在单一流水线中处理流,发向不同商家的请求只能以同步、顺序执行的方式才会成功。因此每个创建CompletableFuture对象只能在前一个操作结束之后,再join返回计算结果。

更好的解决方式

并行流的版本工作的非常好,那是因为他可以并行处理8个任务,获取操作系统线程数量:

System.out.print(Runtime.getRuntime().availableProcessors());

但是如果列表是9个呢?那么执行结果就会2秒。因为他最多只能让8个线程处于繁忙状态。 但是使用CompletableFuture允许你对执行器Executor进行配置,尤其是线程池的大小,这是并行流API无法实现的。

定制执行器

//创建一个线程池,线程池的数目为100何商店数目二者中较小的一个值

final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),

new ThreadFactory() {

@Override

public Thread newThread(Runnable r) {

Thread t = new Thread(r);

t.setDaemon(true); //使用守护线程 ---这种方式不会阻止程序的关停

return t;

}

});

这个线程池是一个由守护线程构成的线程池,Java程序无法终止或退出正在运行中的线程,所以最后剩下的那个线程会由于一直等待无法发生的事件而引发问题。与此相反,如果将线程标记为守护进程,意味着程序退出时它也会被回收。这二者之间没有性能上的差异。现在可以将执行器作为第二个参数传递给supplyAsync方法了。

CompletableFuture.supplyAsync(

()->String.format("%s price: %.2f", shop.getName(), shop.getPrice(shop.getName()))

,executor)

这时,执行9个商品时,执行速度只有1秒。 执行18个商品时也是1秒。这种状态会一直持续,直到商店的数目达到我们之前计算的阀值。 处理需要大量使用异步操作的情况时,这几乎是最有效的策略。

对多个异步任务进行流水线操作

我们在商品中增加一个枚举Discount.Code 来代表每个商品对应不同的折扣率,创建枚举如下:

public class Discount {

public enum Code{

NONE(0),

SILVER(5),

GOLD(10),

PLATINUM(15),

DIAMOND(20);

private final int value;

Code(int value){

this.value = value;

}

}

}

现在我们修改 getPrice方法的返回格式为:ShopName:price:DiscountCode 使用  : 进行分割的返回值。

public String getPrice(String product){

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

Double price = new Random().nextDouble() * product.charAt(0) + product.charAt(1);

Discount.Code code = Discount.Code.values()[new Random().nextInt(Discount.Code.values().length)];

return String.format("%s:%.2f:%s",name,price,code);

}

返回值: one:120.10:GOLDD

将返回结果封装到 Quote 类中:

public class Quote {

private final String shopName;

private final double price;

private final Discount.Code discountCode;

public Quote(String shopName, double price, Discount.Code code) {

this.shopName = shopName;

this.price = price;

this.discountCode = code;

}

public static Quote parse(String s) {

String[] split = s.split(":");

String shopName = split[0];

double price = Double.parseDouble(split[1]);

Discount.Code discountCode = Discount.Code.valueOf(split[2]);

return new Quote(shopName, price, discountCode);

}

public String getShopName() {

return shopName;

}

public double getPrice() {

return price;

}

public Discount.Code getDiscountCode() {

return discountCode;

}

}

parse方法 通过getPrice的方法 返回的字符串 会返回Quote对象,此外 Discount服务还提供了一个applyDiscount方法,它接收一个Quote对象,返回一个字符串,表示该Quote的shop中的折扣价格:

public class Discount {

public enum Code{..

}

public static String applyDiscount(Quote quote){

return quote.getShopName() + "price :" + Discount.apply(quote.getPrice() ,quote.getDiscountCode());

}

public static double apply(double price,Code code){

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

return price * (100 - code.value) / 100;

}

}

Discount中 也模拟了远程操作 睡了1秒钟,首先我们尝试最直接的方式:

List<String> str = shops.stream()

.map(shop->shop.getPrice("hhhhh")) //获取 one:120.10:GOLDD 格式字符串

.map(Quote::parse) //转换为 Quote 对象

.map(Discount::applyDiscount) //返回 Quote的shop中的折扣价格

.collect(toList());

System.out.print(str);

8146

首先,我们调用getPrice远程方法将shop对象转换成了一个字符串。每个1秒

然后,我们将字符串转换为Quote对象。

最后,我们将Quote对象 调用 远程 Discount服务获取折扣,返回折扣价格。每个1秒

顺序执行4个商品是4秒,然后又调用了Discount服务又4秒 所以是8秒。 虽然我们现在把流转换为并行流 性能会很好 但是数量大于8时也很慢。相反,使用自定义CompletableFuture执行器能够更充分的利用CPU资源。

List<CompletableFuture<String>> priceFutures = shops.stream()

//异步获取每个shop中的价格

.map(shop -> CompletableFuture.supplyAsync(

() -> shop.getPrice("hhhhh", executor)

))

//Quote对象存在时,对其返回值进行转换

.map(future -> future.thenApply(Quote::parse))

//使用另一个异步任务构造期望的future,申请折扣

.map(future -> future.thenCompose(quote ->

CompletableFuture.supplyAsync(

() -> Discount.applyDiscount(quote), executor)

))

.collect(toList());

//等待流中的所有Future执行完毕,提取各自的返回值

List<String> str = priceFutures.stream().map(CompletableFuture::join).collect(toList());

System.out.print(str);

2126

使用的这三个map跟同步没有太大的区别,但是使用了CompletableFuture类提供的特性,在需要的地方把他们变成了异步操作。

thenApply方法:当第一个Future运行结束,返回CompletableFuture<String>对象转换为CompleTableFuture<Quote>对象。

thenCompose方法:将两个异步操作进行流水线,当第一个操作完成时,将其结果作为参数传递给第二个操作。换句话说,你可以创建两个CompletableFuture对象,对第一个对象调用thenCompose,并向其传递一个函数。

这个方法也有Async版本:thenComposeAsync,通常带后缀的版本是讲任务移交到一个新线程,不带后缀的在当前线程执行。对于这个例子我们没有加上后缀,因为对于最终结果,或者大致的时间而言都没有多少差别,少了很多线程切换的开销。

合并两个CompletableFuture,无论是否依赖

与上面不同,第二个CompletableFuture无需等待第一个CompletableFuture运行结束。而是,将两个完全不相干的CompletableFuture对象整合起来,不希望等到第一个任务完全结束才开始第二个任务。

这种情况应该使用thenCombine方法,它接受名为BiFunction的第二个参数,这个参数定义了当两个CompletableFuture对象完成计算后,结果如何合并。同thenCompose方法一样,thenCombine方法也提供了一个Async的版本。使用thenCombineAsync会导致BiFunction中定义的合并操作被提交到线程池中,由另一个任务以异步的方式执行。

回到这个例子,比如说我们现在需要第三个CompletableFuture来获取汇率,展示美元。当前两个CompletableFuture计算出结果,并由BiFunction方法完全合并后,由它来最终将诶书这一任务:

Future<Double> futurePriceUSD = CompletableFuture.supplyAsync(()->shops.get(0).getPrice("gg"))

.thenCombine(

CompletableFuture.supplyAsync(

()-> 0.66 //远程服务获取 汇率

),(price,rate) -> price * rate

);

这里 第一个参数price 是 getPrice的返回值 double , 第二个参数 rate 是第二个工厂方法返回的0.66 偷了个懒, 最后是他们的结果进行乘法操作 返回最终结果。

响应CompletableFuture的completion事件

在本章中,所有的延迟例子都是延迟1秒钟,但是在现实世界中,有时可能更糟。到目前为止,你所实现的方法必须等待所有的商品返回时才能现实商品的价格。而你希望的效果是,只要有商品返回商品价格就在第一时间显示出来,不用等待那些还没有返回的商品。

CompletableFuture[] futures = shops.stream()

.map(shop -> CompletableFuture.supplyAsync(

() -> shop.getPrice("hhhhh", executor)

))

.map(future -> future.thenApply(Quote::parse))

.map(future -> future.thenCompose(quote ->

CompletableFuture.supplyAsync(

() -> Discount.applyDiscount(quote), executor)

))

//在每个CompletableFuture上注册一个操作,该操作会在CompletableFuture完成后使用它的返回值。

//使用thenAccept将结果输出,它的参数就是 CompletableFuture的返回值。

.map(f -> f.thenAccept(System.out::println))

//你可以把构成的Stream的所有CompletableFuture<void>对象放到一个数组中,等待所有的任务执行完成

.toArray(size -> new CompletableFuture[size]);

//allOf方法接受一个CompletableFuture构成的数组,数组中所有的COmpletableFuture对象执行完成后,

//它返回一个COmpletableFuture<Void>对象。所以你需要哦等待最初Stream中的所有CompletableFuture对象执行完毕,

//对allOf方法返回的CompletableFuture执行join操作

CompletableFuture.allOf(futures).join();

Connected to the target VM, address: '127.0.0.1:62278', transport: 'socket'

8twoprice :113.31

threeprice :108.15

oneprice :137.844

Disconnected from the target VM, address: '127.0.0.1:62278', transport: 'socket'

fourprice :119.2725

3768

还有一个方法anyOf,对于CompletableFuture对象数组中有任何一个执行完毕就不在等待时使用。

小结:

1.执行比较耗时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可以改善程序的性能,加快程序的响应速度。

2.你应该尽可能的为客户提供异步API。使用CompletableFuture类提供的特性,能够轻松的实现这一目标。

3.CompletableFuture类还提供了异常管理的机制,然给你有机会抛出/管理异步任务执行中发生的异常。

4.将同步API的调用封装到一个CompletableFuture中,你能够以异步的方式使用其结果。

5.如果异步任务之间互相独立,或者他们之间某一些的结果是另一些的输入,你可以讲这些异步任务合并成一个。

6.你可以为CompletableFuture注册一个回调函数,在Future执行完毕或者他们计算的结果可用时,针对性的执行一些程序。

7.你可以决定在什么时候将诶书程序的运行,是等待由CompletableFuture对象构成的列表中所有的对象都执行完毕,还是只要其中任何一个首先完成就终止程序的运行。

Linux公社的RSS地址 : https://www.linuxidc.com/rssFeed.aspx

本文永久更新链接地址: https://www.linuxidc.com/Linux/2018-11/155194.htm

原文  https://www.linuxidc.com/Linux/2018-11/155194.htm
正文到此结束
Loading...