在jdk5中,我们通过使用Future和Callable,可以在任务执行完毕后得到任务执行结果。可以使用isDone检测计算是否完成,使用cancle停止执行任务,使用阻塞方法get阻塞住调用线程来获取返回结果,使用阻塞方式获取执行结果,有违异步编程的初衷,而且Future的异常只能自己内部处理。
jdk8中加入了实现类CompletableFuture<T>,用于异步编程。底层做任务使用的是ForkJoin, 顾名思义,是将任务的数据集分为多个子数据集,而每个子集,都可以由独立的子任务来处理,最后将每个子任务的结果汇集起来。它是ExecutorService接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。 从api文档看,它实现了2个接口 CompletionStage<T>, Future<T>,CompletableFuture<T>拥有Future的所有特性。 CompletionStage支持lambda表达式,接口的方法的功能都是在某个阶段得到结果后要做的事情。 CompletableFuture内置lambda表达式,支持异步回调,结果转换等功能,它有以下Future实现不了的功能
合并两个相互独立的异步计算的结果。
等待异步任务的所有任务都完成。
等待异步任务的其中一个任务完成就返回结果。
任务完成后调用回调方法
任务完成的结果可以用于下一个任务。
任务完成时发出通知
提供原生的异常处理api
首先说下获取结果方式 CompletableFuture获取结果的方式有如下4个方法:
1:get 阻塞获取结果,实现Future的get接口,显式抛出异常
2:getNow(T valueIfAbsent) 获取执行结果,如果当前任务未执行完成,则返回valueIfAbsent
3: join 执行完成后返回执行结果,或者抛出unchecked异常
4: T get(long timeout, TimeUnit unit) 在有限时间内获取数据
以下是CompletableFuture的创建对象以及api的使用
public static <U> CompletableFuture<U> completedFuture(U value)
静态方法,返回一个已经计算好的CompletableFuture 比如
@Testpublic void testStatic() { CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("test"); //判断cf是否 执行完毕 assertTrue(completableFuture.isDone()); //getNow获取结果,如果获取不到,返回默认值null assertEquals("test", completableFuture.getNow(null)); }
completableFuture 还能主动结束运算,并显示处理异常,如下是异步执行的代码
@Test public void testActive() { CompletableFuture<String> completableFuture = new CompletableFuture(); new Thread(() -> { try { String string = null; string.length(); Thread.currentThread().sleep(2000); // 通知完成计算 ,并将结果complete返回 completableFuture.complete("complete"); } catch (Exception e) { // 处理异常 在获取结果地方可以捕获到异常 completableFuture.completeExceptionally(e); } }).start(); try { // 同步等待返回结果 如果thread内部未发生异常并执行了complete方法,将得到字符串“complete”的结果 System.out.println(completableFuture.join()); } catch (Exception e) { //捕获线程内部的异常 捕获空指针异常 System.out.println("发生异常了" + e.getMessage()); } }
CompletableFuture主要有以下四个工厂方式创建对象的静态方法:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) { return asyncSupplyStage(screenExecutor(executor), supplier); } public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) { return asyncRunStage(screenExecutor(executor), runnable); }
Supplier是java8函数式编程的一个接口,是一个生产者,可以不接收参数。只有一个get方法返回一个泛型实例。 很明显,Async结尾的都是可以异步执行,runAsync 接收一个Runnable函数式接口类型参数,不返回结算结果。supplyAsync接收一个函数式接口类型Supplier ,可以返回计算结果。以上方法如果不指定执行任务的线程池Executor ,则默认使用ForkJoinPool.commonPoolcommonPool执行任务。这些接口都支持lambda实现异步的操作。 以下是SupplyAsync异步执行的简单示例
@Testpublic void testSupplyAsync() { CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> { //执行耗时任务 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "glz"; }); //获取结果 System.out.println(cf.join()); }
上面的方法,执行任务是异步操作。但是调用线程还在等待结果。我们还可以给cf添加回调方法,在任务执行完成后使用cf的结果再做下一步操作,转换。所以 执行以下方法时,cf已经计算完毕。
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
从参数类型可以看到,这是接收一个cf计算的结果T,经过处理后返回参数类型为U的cf。 其中第一个方法是在cf完成的线程中调用。而带Async将在与调用者cf不同的线程中异步调用。
@Test public void testThenApply() { CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> { // 执行耗时任务 try { Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return "123"; }); // 这里cf的计算结果传个thenApply作为参数,执行字符串转int的方法,并返回一个cf对象cf1 CompletableFuture<Integer> cf1 = cf.thenApply(Integer::parseInt); // cf1的计算结果作为参数x传给thenApply,返回一个心得cf对象 cf2. CompletableFuture<Double> cf2 = cf1.thenApply(x -> x * 0.01); // 获取最终结果 System.out.println(cf2.join()); //如果回调函数比较耗时,可以使用异步的方法thenApplyAsync}
public CompletionStage<Void> thenAccept(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
入参是Consumer ,执行Consumer 后没有返回结果,所以称为消耗。
@Test public void thenAccept(){ CompletableFuture.supplyAsync(() -> "gong").thenAccept(x -> System.out.println(s+" lz")); }
结果是 gong lz
在执行cf后,如果得到的结果对下一步没有影响,也就是说下一步的操作并不关心上一步的结果,最终也不返回值,可以使用thenRun 参数传递一个Runnable.
public CompletionStage<Void> thenRun(Runnable action);public CompletionStage<Void> thenRunAsync(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor); @Test public void thenRun() { CompletableFuture.supplyAsync(() -> "hello").thenRun(() -> System.out.println("hello world")); }
public <U> CompletionStage<U> thenCompose (Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletionStage<U> thenComposeAsync (Function<? super T, ? extends CompletionStage<U>> fn); public <U> CompletionStage<U> thenComposeAsync (Function<? super T, ? extends CompletionStage<U>> fn, Executor executor);
thenCompose方法,可以将2个独立的任务进行流水线操作 。将当前cf的计算结果作为参数传递给后面的cf
@Test public void testCompose() { CompletableFuture<String> cf = CompletableFuture.completedFuture("hello") .thenCompose(result -> CompletableFuture.supplyAsync(() -> { System.out.println(result); return "result"; })); System.out.println(cf.join()); }
可以将2个完全不相干的对象的结果整合起来,2项任务可以同时执行,比如一个对外的接口服务,既查询数据库中要查询数据的总量,也要返回具体某一页的数据,可以一个cf负责执行查询总条数count的sql,一个查询一页数据。BiFunction是合并结果数据的函数
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
其中T是调用thenCombine的cf的结果数据,U是other的结果,v就是合并的结果类型。
@Test public void testCombine() { CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); System.out.println("cf1 is doning"); } catch (InterruptedException e) { e.printStackTrace(); } // 返回结果 return "hello"; }); CompletableFuture<String> result = cf1.thenCombine(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(500); System.out.println("cf2 is doning"); } catch (InterruptedException e) { e.printStackTrace(); }
ystem.out.println(result.join());
return "world"; }), (x, y) -> x + y);//合并2个操作结果
S
}
<U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> block) CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, Runnable action)
对于2个cf,我们只想在他们执行完成时,消耗执行结果,但是不做数据返回,,我们只是希望当完成时得到通知. 此方法与thenCombine相似,只不过返回 CompletableFuture<Void> ,只做消耗处理
@Test public void testAcceptBoth(){ CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "100"); CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100); CompletableFuture<Void> future = future1.thenAcceptBoth(future2, (s, i) -> System.out.println(Double.parseDouble(s + i))); try { future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
针对两个CompletionStage,将计算最快的那个CompletionStage的结果用来作为下一步的消耗。 此方法接受Consumer只对结果进行消耗.
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
@Test public void acceptEither() { CompletableFuture.supplyAsync(() -> { try { // 如果不加sleep,可能打印hello Thread.currentThread().sleep(10L); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }).acceptEither(CompletableFuture.supplyAsync(() -> "world"), result -> { System.out.println(result); }); }
针对两个CompletionStage,将计算的快的那个CompletionStage的结果用来作为下一步的转换操作。
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
fn是对 调用applyToEither的调用者和 other 2个计算最快的那个结果进行处理,传入t类型数据,返回一个CompletionStage
@Test public void applyToEither() { double result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "0.001"; }).applyToEither(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "0.002"; }), s -> Double.valueOf(s)).join(); System.out.println(result); } //由于返回0.002的cf睡眠时间比较短,先执行完毕,优先返回结果,所以2个cf最先返回0.002.最终result就是0.002
2个cf都执行完后,执行操作Runnable,Runnable不关心2个cf的执行结果
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor); @Test public void runAfterBoth() { CompletableFuture.supplyAsync(() -> "m").runAfterBothAsync(CompletableFuture.supplyAsync(() -> "n"), () -> System.out.println("hello world")); }
以上介绍的都是2个future的组合使用。cf还提供allOf,参数是cf数组,当数组中所有的cf都执行完成时,返回一个CompletableFuture<Void>。调用返回的cf的join方法阻塞等待cf数组中所有cf执行完成。 anyOf是当cf数组中任意一个cf执行完成后,就返回一个cf。
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs); public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
读者可自行编写示例代码
在上面手工创建cf对象中,介绍过异常的处理,同样使用工厂创建的cf也具有异常管理机制,读者可自行举一反三。
本文简单介绍了cf的使用方法,读者可参阅java8实战这本书,更深入学习CompletableFuture的应用场景。
参考: java8实战