提高应用性能的时候很容易就会想起异步,异步去处理一些任务这样主线程可以尽快响应。
通过阅读本篇文章你将了解到:
查询所有商店某个商品的价格并返回,并且查询商店某个商品的价格的API为同步 一个Shop类,提供一个名为getPrice的同步方法
public class Shop { private Random random = new Random(); /** * 根据产品名查找价格 * */ public double getPrice(String product) { return calculatePrice(product); } /** * 计算价格 * * @param product * @return * */ private double calculatePrice(String product) { delay(); //random.nextDouble()随机返回折扣 return random.nextDouble() * product.charAt(0) + product.charAt(1); } /** * 通过睡眠模拟其他耗时操作 * */ private void delay() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } 复制代码
查询商品的价格为同步方法,并通过sleep方法模拟其他操作。这个场景模拟了当需要调用第三方API,但第三方提供的是同步API,在无法修改第三方API时如何设计代码调用提高应用的性能和吞吐量,这时候可以使用CompletableFuture类
Completable是Future接口的实现类,在JDK1.8中引入
使用new方法
CompletableFuture<Double> futurePrice = new CompletableFuture<>(); 复制代码
使用CompletableFuture#completedFuture静态方法创建
public static <U> CompletableFuture<U> completedFuture(U value) { return new CompletableFuture<U>((value == null) ? NIL : value); } 复制代码
参数的值为任务执行完的结果,一般该方法在实际应用中较少应用
使用 CompletableFuture#supplyAsync静态方法创建 supplyAsync有两个重载方法:
//方法一 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } //方法二 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); } 复制代码
使用CompletableFuture#runAsync静态方法创建 runAsync有两个重载方法
//方法一 public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } //方法二 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); } 复制代码
说明:
结果的获取:对于结果的获取CompltableFuture类提供了四种方式
//方式一 public T get() //方式二 public T get(long timeout, TimeUnit unit) //方式三 public T getNow(T valueIfAbsent) //方式四 public T join() 复制代码
说明:
示例:
public class AcquireResultTest { public static void main(String[] args) throws ExecutionException, InterruptedException { //getNow方法测试 CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(60 * 1000 * 60 ); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }); System.out.println(cp1.getNow("hello h2t")); //join方法测试 CompletableFuture<Integer> cp2 = CompletableFuture.supplyAsync((()-> 1 / 0)); System.out.println(cp2.join()); //get方法测试 CompletableFuture<Integer> cp3 = CompletableFuture.supplyAsync((()-> 1 / 0)); System.out.println(cp3.get()); } } 复制代码
说明:
异常处理:使用静态方法创建的CompletableFuture对象无需显示处理异常,使用new创建的对象需要调用completeExceptionally方法设置捕获到的异常,举例说明:
CompletableFuture completableFuture = new CompletableFuture(); new Thread(() -> { try { //doSomething,调用complete方法将其他方法的执行结果记录在completableFuture对象中 completableFuture.complete(null); } catch (Exception e) { //异常处理 completableFuture.completeExceptionally(e); } }).start(); 复制代码
店铺为一个列表:
private static List<Shop> shopList = Arrays.asList( new Shop("BestPrice"), new Shop("LetsSaveBig"), new Shop("MyFavoriteShop"), new Shop("BuyItAll") ); 复制代码
private static List<String> findPriceSync(String product) { return shopList.stream() .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))) //格式转换 .collect(Collectors.toList()); } 复制代码
private static List<String> findPriceAsync(String product) { List<CompletableFuture<String>> completableFutureList = shopList.stream() //转异步执行 .map(shop -> CompletableFuture.supplyAsync( () -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))) //格式转换 .collect(Collectors.toList()); return completableFutureList.stream() .map(CompletableFuture::join) //获取结果不会抛出异常 .collect(Collectors.toList()); } 复制代码
Find Price Sync Done in 4141 Find Price Async Done in 1033 复制代码
异步执行效率 提高四倍
在JDK1.8以前,通过调用线程池的submit方法可以让任务以异步的方式运行,该方法会返回一个Future对象,通过调用get方法获取异步执行的结果:
private static List<String> findPriceFutureAsync(String product) { ExecutorService es = Executors.newCachedThreadPool(); List<Future<String>> futureList = shopList.stream().map(shop -> es.submit(() -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))).collect(Collectors.toList()); return futureList.stream() .map(f -> { String result = null; try { result = f.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return result; }).collect(Collectors.toList()); } 复制代码
既生瑜何生亮,为什么仍需要引入CompletableFuture?
对于简单的业务场景使用Future完全没有,但是想将多个异步任务的计算结果组合起来,后一个异步任务的计算结果需要前一个异步任务的值等等,使用Future提供的那点API就囊中羞涩,处理起来不够优雅,这时候还是让CompletableFuture以 声明式 的方式优雅的处理这些需求
对前面计算结果进行处理,无法返回新值
提供了三个方法:
//方法一 public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) //方法二 public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) //方法三 public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) 复制代码
说明:
示例:
public class WhenCompleteTest { public static void main(String[] args) { CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "hello"); CompletableFuture<String> cf2 = cf1.whenComplete((v, e) -> System.out.println(String.format("value:%s, exception:%s", v, e))); System.out.println(cf2.join()); } } 复制代码
将前面计算结果的的CompletableFuture传递给thenApply,返回thenApply处理后的结果。可以认为通过thenApply方法实现 CompletableFuture<T>
至 CompletableFuture<U>
的转换。白话一点就是将CompletableFuture的计算结果作为thenApply方法的参数,返回thenApply方法处理后的结果
提供了三个方法:
//方法一 public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); } //方法二 public <U> CompletableFuture<U> thenApplyAsync( Function<? super T,? extends U> fn) { return uniApplyStage(asyncPool, fn); } //方法三 public <U> CompletableFuture<U> thenApplyAsync( Function<? super T,? extends U> fn, Executor executor) { return uniApplyStage(screenExecutor(executor), fn); } 复制代码
说明:
public class ThenApplyTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> result = CompletableFuture.supplyAsync(ThenApplyTest::randomInteger).thenApply((i) -> i * 8); System.out.println(result.get()); } public static Integer randomInteger() { return 10; } } 复制代码
这里将前一个CompletableFuture计算出来的结果扩大八倍
thenApply也可以归类为对结果的处理,thenAccept和thenApply的区别就是没有返回值
提供了三个方法:
//方法一 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { return uniAcceptStage(null, action); } //方法二 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { return uniAcceptStage(asyncPool, action); } //方法三 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor) { return uniAcceptStage(screenExecutor(executor), action); } 复制代码
说明:
public class ThenAcceptTest { public static void main(String[] args) { CompletableFuture.supplyAsync(ThenAcceptTest::getList).thenAccept(strList -> strList.stream() .forEach(m -> System.out.println(m))); } public static List<String> getList() { return Arrays.asList("a", "b", "c"); } } 复制代码
将前一个CompletableFuture计算出来的结果打印出来
thenCompose方法可以将两个异步操作进行流水操作
提供了三个方法:
//方法一 public <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(null, fn); } //方法二 public <U> CompletableFuture<U> thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(asyncPool, fn); } //方法三 public <U> CompletableFuture<U> thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) { return uniComposeStage(screenExecutor(executor), fn); } 复制代码
说明:
Function<? super T, ? extends CompletionStage<U>> fn
参数 => 当前CompletableFuture计算结果的执行 public class ThenComposeTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> result = CompletableFuture.supplyAsync(ThenComposeTest::getInteger) .thenCompose(i -> CompletableFuture.supplyAsync(() -> i * 10)); System.out.println(result.get()); } private static int getInteger() { return 666; } private static int expandValue(int num) { return num * 10; } } 复制代码
执行流程图:
thenCombine方法将两个无关的CompletableFuture组合起来,第二个Completable并不依赖第一个Completable的结果
提供了三个方法:
//方法一 public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { return biApplyStage(null, other, fn); } //方法二 public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { return biApplyStage(asyncPool, other, fn); } //方法三 public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) { return biApplyStage(screenExecutor(executor), other, fn); } 复制代码
说明:
示例:
public class ThenCombineTest { private static Random random = new Random(); public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> result = CompletableFuture.supplyAsync(ThenCombineTest::randomInteger).thenCombine( CompletableFuture.supplyAsync(ThenCombineTest::randomInteger), (i, j) -> i * j ); System.out.println(result.get()); } public static Integer randomInteger() { return random.nextInt(100); } } 复制代码
将两个线程计算出来的值做一个乘法在返回 执行流程图:
方法介绍:
//allOf public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { return andTree(cfs, 0, cfs.length - 1); } //anyOf public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { return orTree(cfs, 0, cfs.length - 1); } 复制代码
说明:
示例:
public class AllOfTest { public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> future1 = CompletableFuture.supplyAsync(() -> { System.out.println("hello"); return null; }); CompletableFuture<Void> future2 = CompletableFuture.supplyAsync(() -> { System.out.println("world"); return null; }); CompletableFuture<Void> result = CompletableFuture.allOf(future1, future2); System.out.println(result.get()); } } 复制代码allOf方法没有返回值,适合没有返回值并且需要前面所有任务执行完毕才能执行后续任务的应用场景
public class AnyOfTest { private static Random random = new Random(); public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> { randomSleep(); System.out.println("hello"); return "hello";}); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { randomSleep(); System.out.println("world"); return "world"; }); CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2); System.out.println(result.get()); } private static void randomSleep() { try { Thread.sleep(random.nextInt(10)); } catch (InterruptedException e) { e.printStackTrace(); } } } 复制代码两个线程都会将结果打印出来,但是get方法只会返回最先完成任务的结果。该方法比较适合只要有一个返回值就可以继续执行其他任务的应用场景
很多方法都提供了异步实现【带async后缀】,但是需小心谨慎使用这些异步方法,因为异步意味着存在上下文切换,可能性能不一定比同步好。如果需要使用异步的方法, 先做测试 ,用测试数据说话!!!
存在IO密集型的任务可以选择CompletableFuture,IO部分交由另外一个线程去执行。Logback、Log4j2异步日志记录的实现原理就是新起了一个线程去执行IO操作,这部分可以以CompletableFuture.runAsync(()->{ioOperation();})的方式去调用,有关Logback异步日志记录的原理可以参考这篇文章Logback异步日志记录。如果是CPU密集型就不推荐使用了推荐使用并行流
supplyAsync执行任务底层实现:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) { if (f == null) throw new NullPointerException(); CompletableFuture<U> d = new CompletableFuture<U>(); e.execute(new AsyncSupply<U>(d, f)); return d; } 复制代码
底层调用的是线程池去执行任务,而CompletableFuture中默认线程池为ForkJoinPool
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); 复制代码
ForkJoinPool线程池的大小取决于CPU的核数。之前写的 为什么阿里巴巴要禁用Executors创建线程池? 文章中提及过,CPU密集型任务线程池大小配置为CPU核心数就可以了,但是IO密集型,线程池的大小由**CPU数量 * CPU利用率 * (1 + 线程等待时间/线程CPU时间)**确定。而CompletableFuture的应用场景就是IO密集型任务,因此默认的ForkJoinPool一般无法达到最佳性能,我们需自己根据业务创建线程池
最后附: 示例代码 ,欢迎 Fork 与 Star