最近工作中需要用到异步编程实现一个多数据集并发异步查询的功能,以前也尝试使用过 CompletableFuture
,但没有深入原理探究,也没有归纳总结,今天正好有时间,集中学习下。
本文结构为:痛点->解决方案->应用场景->流程示例-->归纳总结
在Java8以前,异步编程的实现大概是下边这样子的:
class App { ExecutorService executor = ... ArchiveSearcher searcher = ... void showSearch(final String target) throws InterruptedException { Future<String> future = executor.submit(new Callable<String>() { public String call() { return searcher.search(target); } }); displayOtherThings(); // do other things while searching try { displayText(future.get()); // use future } catch (ExecutionException ex) { cleanup(); return; } } }
future.get()方法会阻塞直到计算完成,然后返回结果。
这样一来,如果我们需要在计算完成后执行后续操作,则只有两个选择:
isDone()
返回 true
。但这样会极大消耗计算资源。 为了解决这个痛点,Java8新增了 CompletableFuture
类,顾名思义这是一个"可完成"的 Future
。
CompletableFuture<T>
实现了 Future<T>
, CompletionStage<T>
,这样保证了我们可以继续使用 Future
的方法,同时复用了 CompletionStage
的各种功能。
下面就从一些常见的应用场景入手,逐个分析最适用的解决方案。当然有时对于部分场景,会有多个可选方案,这时候就需要从扩展性、可维护性、性能以及易读性等方面综合考虑,做出选择。
Runnable
对象,并调用 runAsync
方法: public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) { if (f == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); e.execute(new AsyncRun(d, f)); return d; }
runAsync
方法将返回一个 CompletableFuture<Void>
对象, CompletableFuture
不包含任何类型的返回结果。
Supplier<U>
对象,并调用 supplyAsync
方法: public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) { if (f == null) throw new NullPointerException(); CompletableFuture<U> d = new CompletableFuture<U>(); e.execute(new AsyncSupply<U>(d, f)); return d; }
supplyAsync
方法将返回一个 CompletableFuture<U>
对象,该 CompletableFuture
会包含一个由泛型 U
指定类型的计算结果。
Runnable
对象,并调用 thenRun
方法: public CompletableFuture<Void> thenRun(Runnable action) { return uniRunStage(null, action); } public CompletableFuture<Void> thenRunAsync(Runnable action) { return uniRunStage(asyncPool, action); } private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) { if (f == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.uniRun(this, f, null)) { UniRun<T> c = new UniRun<T>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; }
thenRun
方法将会返回一个不包含返回值的 CompletableFuture
。
thenRun
方法和 thenRunAsync
方法的区别是, thenRun
会在前置 CompletableFuture
的线程内执行,而 thenRunAsync
会在一个新线程中执行。
Consumer<? super T>
对象,并调用 thenAccept
方法: public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { return uniAcceptStage(null, action); } public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { return uniAcceptStage(asyncPool, action); } private CompletableFuture<Void> uniAcceptStage(Executor e, Consumer<? super T> f) { if (f == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.uniAccept(this, f, null)) { UniAccept<T> c = new UniAccept<T>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; }
thenAccept
方法将会返回一个不包含返回值的 CompletableFuture
。
thenAccept
方法和 thenAcceptAsync
方法的区别同 thenRun
方法和 thenRunAsync
方法。
Function<? super T,? extends U>
对象,并调用 thenApply
方法: public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); } public <U> CompletableFuture<U> thenApplyAsync( Function<? super T,? extends U> fn) { return uniApplyStage(asyncPool, fn); } private <V> CompletableFuture<V> uniApplyStage( Executor e, Function<? super T,? extends V> f) { if (f == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); if (e != null || !d.uniApply(this, f, null)) { UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; }
thenApply
方法将会返回一个包含 U
类型返回值的 CompletableFuture
。
thenApply
方法和 thenApplyAsync
方法的区别同 thenRun
方法和 thenRunAsync
方法。
CompletableFuture
的处理结果,则可以使用 Function<? super T, ? extends CompletionStage<U>>
对象,并调用 thenCompose
方法: public <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(null, fn); } public <U> CompletableFuture<U> thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(asyncPool, fn); } private <V> CompletableFuture<V> uniComposeStage( Executor e, Function<? super T, ? extends CompletionStage<V>> f) { ... Object r; Throwable x; if (e == null && (r = result) != null) { // try to return function result directly if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null) { return new CompletableFuture<V>(encodeThrowable(x, r)); } r = null; } try { @SuppressWarnings("unchecked") T t = (T) r; CompletableFuture<V> g = f.apply(t).toCompletableFuture(); Object s = g.result; if (s != null) return new CompletableFuture<V>(encodeRelay(s)); CompletableFuture<V> d = new CompletableFuture<V>(); UniRelay<V> copy = new UniRelay<V>(d, g); g.push(copy); copy.tryFire(SYNC); return d; } catch (Throwable ex) { return new CompletableFuture<V>(encodeThrowable(ex)); } } CompletableFuture<V> d = new CompletableFuture<V>(); UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f); push(c); c.tryFire(SYNC); return d; }
thenCombine
方法将会返回一个包含 CompletableFuture
类型返回值的 CompletableFuture
。
thenCompose
方法和 thenComposeAsync
方法的区别同 thenRun
方法和 thenRunAsync
方法。
CompletableFuture
和指定的 CompletableFuture
全部完成后触发,但不希望接收前边两个 CompletableFuture
的输出结果 T
、 U
,且处理完成后不返回结果,则可以使用 Runnable
,并调用 runAfterBoth
方法 public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) { return biRunStage(null, other, action); } public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) { return biRunStage(asyncPool, other, action); } private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o, Runnable f) { CompletableFuture<?> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.biRun(this, b, f, null)) { BiRun<T,?> c = new BiRun<>(e, d, this, b, f); bipush(b, c); c.tryFire(SYNC); } return d; }
runAfterBoth
方法会返回一个不包含返回值的 CompletableFuture
。
runAfterBoth
方法和 runAfterBothAsync
方法的区别同 thenRun
方法和 thenRunAsync
方法。
CompletableFuture
和指定的 CompletableFuture
全部完成后触发,并接收前边两个 CompletableFuture
的输出结果 T
、 U
,但处理完成后不返回结果,则可以使用 BiConsumer<? super T, ? super U>
,并调用 thenAcceptBoth
方法: public <U> CompletableFuture<Void> thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) { return biAcceptStage(null, other, action); } public <U> CompletableFuture<Void> thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) { return biAcceptStage(asyncPool, other, action); } private <U> CompletableFuture<Void> biAcceptStage( Executor e, CompletionStage<U> o, BiConsumer<? super T,? super U> f) { CompletableFuture<U> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.biAccept(this, b, f, null)) { BiAccept<T,U> c = new BiAccept<T,U>(e, d, this, b, f); bipush(b, c); c.tryFire(SYNC); } return d; }
thenAcceptBoth
方法会返回一个不包含返回值的 CompletableFuture
。
thenAcceptBoth
方法和 thenAcceptBothAsync
方法的区别同 thenRun
方法和 thenRunAsync
方法。
CompletableFuture
和指定的 CompletableFuture
全部完成后触发,并接收前边两个 CompletableFuture
的输出结果 T
、 U
,且处理完成后返回 V
作为处理结果,则可以使用 BiFunction<? super T,? super U,? extends V>
,并调用 thenCombine
方法: public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { return biApplyStage(null, other, fn); } public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { return biApplyStage(asyncPool, other, fn); } private <U,V> CompletableFuture<V> biApplyStage( Executor e, CompletionStage<U> o, BiFunction<? super T,? super U,? extends V> f) { CompletableFuture<U> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); if (e != null || !d.biApply(this, b, f, null)) { BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f); bipush(b, c); c.tryFire(SYNC); } return d; }
thenCombine
方法会返回一个包含 V
类型返回值的 CompletableFuture
。
thenCombine
方法和 thenCombineAsync
方法的区别同 thenRun
方法和 thenRunAsync
方法。
CompletableFuture
和指定的 CompletableFuture
任意一个完成后触发,但不希望接收前边两个 CompletableFuture
的输出结果 T
、 U
,且处理完成后不返回结果,则可以使用 Runnable
,并调用 runAfterEither
方法 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action) { return orRunStage(null, other, action); } public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action) { return orRunStage(asyncPool, other, action); } private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o, Runnable f) { CompletableFuture<?> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.orRun(this, b, f, null)) { OrRun<T,?> c = new OrRun<>(e, d, this, b, f); orpush(b, c); c.tryFire(SYNC); } return d; }
runAfterEither
方法将返回一个不包含返回值的 CompletableFuture
。
runAfterEither
方法和 runAfterEitherAsync
方法的区别同 thenRun
方法和 thenRunAsync
方法。
CompletableFuture
和指定的 CompletableFuture
任意一个完成时触发,并接收前边两个 CompletableFuture
的输出结果 T
、 U
,但处理完成后不返回结果,则可以使用 Consumer<? super T>
,并调用 acceptEither
方法: public CompletableFuture<Void> acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action) { return orAcceptStage(null, other, action); } public CompletableFuture<Void> acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action) { return orAcceptStage(asyncPool, other, action); } private <U extends T> CompletableFuture<Void> orAcceptStage( Executor e, CompletionStage<U> o, Consumer<? super T> f) { CompletableFuture<U> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.orAccept(this, b, f, null)) { OrAccept<T,U> c = new OrAccept<T,U>(e, d, this, b, f); orpush(b, c); c.tryFire(SYNC); } return d; }
acceptEither
方法将返回一个不包含返回值的 CompletableFuture
。
acceptEither
方法和 acceptEitherAsync
方法的区别同 thenRun
方法和 thenRunAsync
方法。
CompletableFuture
和指定的 CompletableFuture
任意一个完成后触发,并接收前边两个 CompletableFuture
的输出结果 T
、 U
,处理完成后返回 V
作为处理结果,则可以使用 Function<? super T, U>
,并调用 applyToEither
方法: public <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn) { return orApplyStage(null, other, fn); } public <U> CompletableFuture<U> applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn) { return orApplyStage(asyncPool, other, fn); } private <U extends T,V> CompletableFuture<V> orApplyStage( Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f) { CompletableFuture<U> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); if (e != null || !d.orApply(this, b, f, null)) { OrApply<T,U,V> c = new OrApply<T,U,V>(e, d, this, b, f); orpush(b, c); c.tryFire(SYNC); } return d; }
applyToEither
方法将返回一个包含 V
类型返回值的 CompletableFuture
。
applyToEither
方法和 applyToEitherAsync
方法的区别同 thenRun
方法和 thenRunAsync
方法。
CompletableFuture
全部执行完成后再触发,则应该将所有待等待的 CompletableFuture
任务全部传入 allOf
方法: public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { return andTree(cfs, 0, cfs.length - 1); } /** Recursively constructs a tree of completions. */ static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs, int lo, int hi) { CompletableFuture<Void> d = new CompletableFuture<Void>(); CompletableFuture<?> a, b; int mid = (lo + hi) >>> 1; if ((a = (lo == mid ? cfs[lo] : andTree(cfs, lo, mid))) == null || (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : andTree(cfs, mid+1, hi))) == null) throw new NullPointerException(); if (!d.biRelay(a, b)) { BiRelay<?,?> c = new BiRelay<>(d, a, b); a.bipush(b, c); c.tryFire(SYNC); } return d; }
allOf
方法将会返回一个不包含返回值的 CompletableFuture
,方便后置操作。
CompletableFuture
中任意一个执行完成后就触发,则应该将所有待等待的 CompletableFuture
任务全部传入 anyOf
方法: public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { return orTree(cfs, 0, cfs.length - 1); } /** Recursively constructs a tree of completions. */ static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs, int lo, int hi) { CompletableFuture<Object> d = new CompletableFuture<Object>(); CompletableFuture<?> a, b; int mid = (lo + hi) >>> 1; if ((a = (lo == mid ? cfs[lo] : orTree(cfs, lo, mid))) == null || (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : orTree(cfs, mid+1, hi))) == null) throw new NullPointerException(); if (!d.orRelay(a, b)) { OrRelay<?,?> c = new OrRelay<>(d, a, b); a.orpush(b, c); c.tryFire(SYNC); } return d; }
anyOf
方法将返回率先完成的 CompletableFuture
的返回结果,方便后置操作。
BiConsumer<? super T, ? super Throwable>
对象,并调用 whenComplete
方法: public CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(null, action); } public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(asyncPool, action); } private CompletableFuture<T> uniWhenCompleteStage( Executor e, BiConsumer<? super T, ? super Throwable> f) { if (f == null) throw new NullPointerException(); CompletableFuture<T> d = new CompletableFuture<T>(); if (e != null || !d.uniWhenComplete(this, f, null)) { UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; }
whenComplete
方法将会返回一个不包含返回值的 CompletableFuture
。
whenComplete
方法和 whenCompleteAsync
方法的区别同 thenRun
方法和 thenRunAsync
方法。
BiFunction<? super T, Throwable, ? extends U>
对象,并调用 handle
方法: public <U> CompletableFuture<U> handle( BiFunction<? super T, Throwable, ? extends U> fn) { return uniHandleStage(null, fn); } public <U> CompletableFuture<U> handleAsync( BiFunction<? super T, Throwable, ? extends U> fn) { return uniHandleStage(asyncPool, fn); } private <V> CompletableFuture<V> uniHandleStage( Executor e, BiFunction<? super T, Throwable, ? extends V> f) { if (f == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); if (e != null || !d.uniHandle(this, f, null)) { UniHandle<T,V> c = new UniHandle<T,V>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; }
handle
方法将会返回一个包含 U
类型返回值的 CompletableFuture
。
handle
方法和 handleAsync
方法的区别同 thenRun
方法和 thenRunAsync
方法。
Function<Throwable, ? extends T>
对象,并调用 exceptionally
方法: public CompletableFuture<T> exceptionally( Function<Throwable, ? extends T> fn) { return uniExceptionallyStage(fn); } private CompletableFuture<T> uniExceptionallyStage( Function<Throwable, ? extends T> f) { if (f == null) throw new NullPointerException(); CompletableFuture<T> d = new CompletableFuture<T>(); if (!d.uniExceptionally(this, f, null)) { UniExceptionally<T> c = new UniExceptionally<T>(d, this, f); push(c); c.tryFire(SYNC); } return d; }
exceptionally
方法将会返回一个包含 T
类型返回值的 CompletableFuture
。
public static void main(String[] args) { System.out.println(testCF()); } private static String testCF() { Map<String, Object> results = new HashMap<>(); List<CompletableFuture<Void>> completableFutures = Lists.newArrayList(); System.out.println(Thread.currentThread() + "testCF start ...... "); for (int i = 1; i <= 5; i++) { int finalI = i; CompletableFuture<Void> completableFuture = CompletableFuture .supplyAsync( () -> { System.out.println(Thread.currentThread() + "supplyAsync Job" + finalI); try { Thread.sleep(finalI * 2000); } catch (InterruptedException e) { throw new RuntimeException(e); } // if (true) { // throw new RuntimeException("测试异常" + finalI); // } return "Job" + finalI; }) .thenAcceptAsync( (queryResult) -> { System.out.println(Thread.currentThread() + "thenAcceptAsync " + queryResult); results.put(String.valueOf(finalI), queryResult); }) .whenCompleteAsync( (aVoid, throwable) -> { if (Objects.nonNull(throwable)) { System.err.println(Thread.currentThread() + "whenCompleteAsync Job" + throwable.getMessage()); } System.out.println(Thread.currentThread() + "whenCompleteAsync Job" + finalI); }) .exceptionally( throwable -> { System.err.println("exceptionally " + throwable.getMessage()); throw new DaportalException(throwable.getMessage()); }); completableFutures.add(completableFuture); } System.out.println(Thread.currentThread() + "for loop completed ...... "); CompletableFuture[] voidCompletableFuture = {}; return CompletableFuture .allOf(completableFutures.toArray(voidCompletableFuture)) .handle( (aVoid, throwable) -> { System.out.println(Thread.currentThread() + "handle ...... "); if (Objects.nonNull(throwable)) { System.err.println("handle " + throwable.getMessage()); throw new DaportalException(throwable.getMessage()); } try { return StringUtil.MAPPER.writeValueAsString(results); } catch (JsonProcessingException e) { throw new DaportalException(e.getMessage()); } }) .join(); }
执行输出:
Thread[main,5,main]testCF start ...... Thread[ForkJoinPool.commonPool-worker-1,5,main]supplyAsync Job1 Thread[ForkJoinPool.commonPool-worker-2,5,main]supplyAsync Job2 Thread[ForkJoinPool.commonPool-worker-3,5,main]supplyAsync Job3 Thread[main,5,main]for loop completed ...... Thread[ForkJoinPool.commonPool-worker-1,5,main]thenAcceptAsync Job1 Thread[ForkJoinPool.commonPool-worker-1,5,main]supplyAsync Job4 Thread[ForkJoinPool.commonPool-worker-2,5,main]whenCompleteAsync Job1 Thread[ForkJoinPool.commonPool-worker-2,5,main]supplyAsync Job5 Thread[ForkJoinPool.commonPool-worker-3,5,main]thenAcceptAsync Job2 Thread[ForkJoinPool.commonPool-worker-3,5,main]thenAcceptAsync Job3 Thread[ForkJoinPool.commonPool-worker-3,5,main]whenCompleteAsync Job2 Thread[ForkJoinPool.commonPool-worker-3,5,main]whenCompleteAsync Job3 Thread[ForkJoinPool.commonPool-worker-1,5,main]thenAcceptAsync Job4 Thread[ForkJoinPool.commonPool-worker-1,5,main]whenCompleteAsync Job4 Thread[ForkJoinPool.commonPool-worker-2,5,main]thenAcceptAsync Job5 Thread[ForkJoinPool.commonPool-worker-1,5,main]whenCompleteAsync Job5 Thread[ForkJoinPool.commonPool-worker-1,5,main]handle ...... {"1":"Job1","2":"Job2","3":"Job3","4":"Job4","5":"Job5"} Process finished with exit code 0
换行越多代表时间间隔越长
下面是整个流程中各个异步线程的工作情况:
以上所有 CompletableFuture
方法都将返回一个 CompletableFuture
,以此来支持链式编程。
所有带 async
后缀的方法会重新入栈并等待调度,并使用CAS乐观锁提升并发性能:
/** Returns true if successfully pushed c onto stack. */ final boolean tryPushStack(Completion c) { Completion h = stack; lazySetNext(c, h); return UNSAFE.compareAndSwapObject(this, STACK, h, c);// CAS乐观锁 }
所有带 async
后缀的方法都会包含一个加 Executor
参数的重载方法,用于指定外部线程池,默认 commonPool
大小是3(这一点可以从后续的“流程示例”得出):
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); ...... static Executor screenExecutor(Executor e) { if (!useCommonPool && e == ForkJoinPool.commonPool()) return asyncPool; if (e == null) throw new NullPointerException(); return e; }