CompletableFuture是高级的多线程功能,支持自定义线程池和系统默认的线程池,是多线程,高并发里面,经常需要用到的比直接创建线程,要简单易用的方法。
CompletableFuture主要是用于异步调用,内部封装了线程池,可以将请求或者处理过程,进行异步处理。创建线程有3种方式,直接继承Thread、实现Runnable接口、实现Callable接口。以生活中的一个例子来说明异步行为:电饭煲蒸饭。
以前呀,都是大锅饭,放上米,放上水,然后需要不断地加柴火,人要看着火,具体什么时候煮熟,也得偶尔打开看看,看看开没开锅,煮没煮熟。这种就是没有任何通知方式,没有返回值的Runnable,只管煮饭,煮没煮熟需要自己判断。
一个老板发现了这个商机,说能不能做一个东西,不用人一直看着,自动就能把米饭做好,所以电饭煲就出现了。 初代电饭煲的出现,算是解放了人力,再也不用看着火了,方便了很多,自己可以去做点其他的事情,热个牛奶,剪个鸡蛋什么的,但是至于饭什么时候熟,还得自己隔一段时间就得过去看一看。这就是Future的方式,虽然任务是异步执行的,但是要想获得这个结果,还得需要自己取。
时间继续推进,这个老板又有了新的想法,每隔一段时间,看看饭熟没熟还是有点浪费我看电视的时间,这个电饭煲能不能做好了,告诉我呢,这样我就直接来吃就行了。因此就有了这种可以预约、可以定时、可以保温的高级电饭煲。这个就对应着CompletableFuture,所有事情都是可以自动完成,即可以在完成之后,回调通知,也可以自己去等待。
示例代码: public static void main(String[] args){ CompletableFuture future = CompletableFuture.supplyAsync(() -> { System.out.println("电饭煲开始做饭"); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } return "白米饭"; }).thenAccept(result -> { System.out.println("开始吃米饭"); }); System.out.println("我先去搞点牛奶和鸡蛋"); future.join(); } 结果输出: 电饭煲开始做饭 我先去搞点牛奶和鸡蛋 开始吃米饭 复制代码
这样就可以一边等待米饭煮熟,一边去做其他事情。
CompletableFuture提供了方法大约有50多个,单纯一个个记忆,是很麻烦的,因此将其划分为以下几类:
接续类 CompletableFuture 最重要的特性,没有这个的话,CompletableFuture就没意义了,用于注入回调行为。
上面的方法很多,我们没必要死记硬背,按照如下规律,会方便很多,记忆规则:
记住上面几条,基本上就可以记住大部分的方法,剩下的其他方法,就可以单独记忆了。
创建CompletableFuture,其实就是将我们要煮的米饭,委托给电饭煲;要煮米饭,我们要准备这么几件事情,其一我们要制定制作米饭的方式,其二,我们要指定电饭煲。除此之外,我们也可以委托其他的事情,最后可以通过all或者any进行组合。
// 异步任务,无返回值,采用内部的forkjoin线程池 CompletableFuture c1 = CompletableFuture .runAsync(()->{System.out.println("打开开关,开始制作,就不用管了")}); // 异步任务,无返回值,使用自定义的线程池 CompletableFuture c11 = CompletableFuture .runAsync(()->{System.out.println("打开开关,开始制作,就不用管了")},newSingleThreadExecutor()); // 异步任务,有返回值,使用内部默认的线程池 CompletableFuture<String> c2 = CompletableFuture .supplyAsync(()->{System.out.println("清洗米饭");return "干净的米饭";}); // 只要有一个完成,则完成,有一个抛出异常,则携带异常 CompletableFuture.anyOf(c1,c2); // 必须等待所有的future全部完成才可以 CompletableFuture.allOf(c1,c2); 复制代码
常用的是下面的这几种 // 不抛出异常,阻塞的等待 future.join() // 有异常则抛出异常,阻塞的等待,无限等待 future.get() // 有异常则抛出异常,最长等待1个小时,一个小时之后,如果还没有数据,则异常。 future.get(1,TimeUnit.Hours) 复制代码
3种方式: // 完成 future.complete("米饭"); // 异常 future.completeExceptionally(); // 取消,参数并没有实际意义,没任何卵用。 future.cancel(false); 复制代码
接续方式有很多种,可以总结为一下三类:
CompletableFuture future = CompletableFuture.supplyAsync(()->{ System.out.println("投放和清洗制作米饭的材料"); return "干净的没有新冠病毒的大米"; }).thenAcceptAsync(result->{ System.out.println("通电,设定模式,开始煮米饭"); }).thenRunAsync(()->{ System.out.println("米饭做好了,可以吃了"); }) 复制代码
假如蒸米饭和、热牛奶、炒菜等已经是3个不同的CompletableFuture,可以使用接续方式2,将两个或者多个CompletableFuture组合在一起使用。
CompletableFuture rice = CompletableFuture.supplyAsync(()->{ System.out.println("开始制作米饭,并获得煮熟的米饭"); return "煮熟的米饭"; }) //煮米饭的同时呢,我又做了牛奶 CompletableFuture mike = CompletableFuture.supplyAsync(()->{ System.out.println("开始热牛奶,并获得加热的牛奶"); return "加热的牛奶"; }); // 我想两个都好了,才吃早饭,thenCombineAsync有入参,有返回值 mike.thenCombineAsync(rice,(m,r)->{ System.out.println("我收获了早饭:"+m+","+r); return m+r; }) // 有入参,无返回值 mike.thenAcceptBothAsync(rice,(m,r)->{ System.out.println("我收获了早饭:"+m+","+r); }); // 无入参,入参会之 mike.runAfterBothAsync(rice,()->{ System.out.println("我收获了早饭"); }); // 或者直接连接两个CompletableFuture rice.thenComposeAsync(r->CompletableFuture.supplyAsync(()->{ System.out.println("开始煮牛奶"); System.out.println("同时开始煮米饭"); return "mike"; })) 复制代码
如果我们只想做结果处理,也没有其他的接续动作,并且我们想要判断异常的情况,那么可以用接续方式3
whenCompleteAsync:处理完成或异常,无返回值 handleAsync:处理完成或异常,有返回值 CompletableFuture.supplyAsync(()->{ System.out.println("开始蒸米饭"); return "煮熟的米饭"; }).whenCompleteAsync((rich,exception)->{ if (exception!=null){ System.out.println("电饭煲坏了,米饭没做熟"); }else{ System.out.println("米饭熟了,可以吃了"); } }) // 有返回值 CompletableFuture.supplyAsync(()->{ System.out.println("开始蒸米饭"); return "煮熟的米饭"; }).handleAsync((rich,exception)->{ if (exception!=null){ System.out.println("电饭煲坏了,米饭没做熟"); }else{ System.out.println("米饭熟了,可以吃了"); } return "准备冷一冷再吃米饭"; }) // 异常处理 CompletableFuture.supplyAsync(()->{ System.out.println("开始蒸米饭"); return "煮熟的米饭"; }).handleAsync((rich,exception)->{ if (exception!=null){ System.out.println("电饭煲坏了,米饭没做熟"); }else{ System.out.println("米饭熟了,可以吃了"); } return "准备冷一冷再吃米饭"; }).exceptionally((exception)->{ // 前置动作必须的是一个又返回值的操作,不能是那种返回值的那种 return ""; }); 复制代码
CompletableFuture用了之后,才觉得这个东西,确实好用一些,不经意间就做成了异步处理,而且支持自定义的线程池,如果结合stream,可以轻松地实现多线程并发处理。
List<CompletableFuture<YoutubeVideoEntity>> futures = subVideosList.stream() .map(item -> CompletableFuture.supplyAsync(() -> this.getRetry(item) , ThreadPoolHolder.BG_CRAWLER_POOL) ).collect(Collectors.toList()); List<YoutubeVideoEntity> videoEntities = futures.stream().map(CompletableFuture::join) .filter(item -> item != null && item.getVideoId() != null).collect(Collectors.toList()); 复制代码