转载

Java异步编程-CompletableFuture

引言

最近工作中需要用到异步编程实现一个多数据集并发异步查询的功能,以前也尝试使用过 CompletableFuture ,但没有深入原理探究,也没有归纳总结,今天正好有时间,集中学习下。

本文结构为:痛点->解决方案->应用场景->流程示例-->归纳总结

异步编程

在Java8以前,异步编程的实现大概是下边这样子的:

class App {

        ExecutorService executor = ...
        ArchiveSearcher searcher = ...

        void showSearch(final String target) throws InterruptedException {
            Future<String> future = executor.submit(new Callable<String>() {
                public String call() {
                    return searcher.search(target);
                }
            });
            displayOtherThings(); // do other things while searching
            try {
                displayText(future.get()); // use future
            } catch (ExecutionException ex) {
                cleanup();
                return;
            }
        }
    }

future.get()方法会阻塞直到计算完成,然后返回结果。

这样一来,如果我们需要在计算完成后执行后续操作,则只有两个选择:

  1. 阻塞,等待执行完成。但这样一来异步就变成同步。
  2. 轮询,直到 isDone() 返回 true 。但这样会极大消耗计算资源。

为了解决这个痛点,Java8新增了 CompletableFuture 类,顾名思义这是一个"可完成"的 Future

CompletableFuture

CompletableFuture<T> 实现了 Future<T> , CompletionStage<T> ,这样保证了我们可以继续使用 Future 的方法,同时复用了 CompletionStage 的各种功能。

下面就从一些常见的应用场景入手,逐个分析最适用的解决方案。当然有时对于部分场景,会有多个可选方案,这时候就需要从扩展性、可维护性、性能以及易读性等方面综合考虑,做出选择。

创建CompletableFuture

  • 如果不希望取得计算结果,则可以使用 Runnable 对象,并调用 runAsync 方法:
public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }
    
    static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        e.execute(new AsyncRun(d, f));
        return d;
    }

runAsync 方法将返回一个 CompletableFuture<Void> 对象, CompletableFuture 不包含任何类型的返回结果。

  • 如果希望取得计算结果,则可以使用 Supplier<U> 对象,并调用 supplyAsync 方法:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
    
    static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                     Supplier<U> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<U> d = new CompletableFuture<U>();
        e.execute(new AsyncSupply<U>(d, f));
        return d;
    }

supplyAsync 方法将返回一个 CompletableFuture<U> 对象,该 CompletableFuture 会包含一个由泛型 U 指定类型的计算结果。

单CompletableFuture后置处理

  • 如果不希望接收前置CompletableFuture结果,且不返回处理结果,那么可以使用 Runnable 对象,并调用 thenRun 方法:
public CompletableFuture<Void> thenRun(Runnable action) {
        return uniRunStage(null, action);
    }
    
    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return uniRunStage(asyncPool, action);
    }
    
    private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        if (e != null || !d.uniRun(this, f, null)) {
            UniRun<T> c = new UniRun<T>(e, d, this, f);
            push(c);
            c.tryFire(SYNC);
        }
        return d;
    }

thenRun 方法将会返回一个不包含返回值的 CompletableFuture

thenRun 方法和 thenRunAsync 方法的区别是, thenRun 会在前置 CompletableFuture 的线程内执行,而 thenRunAsync 会在一个新线程中执行。

  • 如果希望接收前置CompletableFuture结果,但不返回处理结果,则可以使用 Consumer<? super T> 对象,并调用 thenAccept 方法:
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
        return uniAcceptStage(null, action);
    }

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
        return uniAcceptStage(asyncPool, action);
    }
    private CompletableFuture<Void> uniAcceptStage(Executor e,
                                                   Consumer<? super T> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        if (e != null || !d.uniAccept(this, f, null)) {
            UniAccept<T> c = new UniAccept<T>(e, d, this, f);
            push(c);
            c.tryFire(SYNC);
        }
        return d;
    }

thenAccept 方法将会返回一个不包含返回值的 CompletableFuture

thenAccept 方法和 thenAcceptAsync 方法的区别同 thenRun 方法和 thenRunAsync 方法。

  • 如果希望接收前置CompletableFuture结果,并返回处理结果,则只能使用 Function<? super T,? extends U> 对象,并调用 thenApply 方法:
public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(asyncPool, fn);
    }
    
    private <V> CompletableFuture<V> uniApplyStage(
        Executor e, Function<? super T,? extends V> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<V> d =  new CompletableFuture<V>();
        if (e != null || !d.uniApply(this, f, null)) {
            UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
            push(c);
            c.tryFire(SYNC);
        }
        return d;
    }

thenApply 方法将会返回一个包含 U 类型返回值的 CompletableFuture

thenApply 方法和 thenApplyAsync 方法的区别同 thenRun 方法和 thenRunAsync 方法。

  • 如果希望接收前置CompletableFuture结果,并返回包含 CompletableFuture 的处理结果,则可以使用 Function<? super T, ? extends CompletionStage<U>> 对象,并调用 thenCompose 方法:
public <U> CompletableFuture<U> thenCompose(
        Function<? super T, ? extends CompletionStage<U>> fn) {
        return uniComposeStage(null, fn);
    }

    public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn) {
        return uniComposeStage(asyncPool, fn);
    }
    
    private <V> CompletableFuture<V> uniComposeStage(
        Executor e, Function<? super T, ? extends CompletionStage<V>> f) {
        ...
        Object r; Throwable x;
        if (e == null && (r = result) != null) {
            // try to return function result directly
            if (r instanceof AltResult) {
                if ((x = ((AltResult)r).ex) != null) {
                    return new CompletableFuture<V>(encodeThrowable(x, r));
                }
                r = null;
            }
            try {
                @SuppressWarnings("unchecked") T t = (T) r;
                CompletableFuture<V> g = f.apply(t).toCompletableFuture();
                Object s = g.result;
                if (s != null)
                    return new CompletableFuture<V>(encodeRelay(s));
                CompletableFuture<V> d = new CompletableFuture<V>();
                UniRelay<V> copy = new UniRelay<V>(d, g);
                g.push(copy);
                copy.tryFire(SYNC);
                return d;
            } catch (Throwable ex) {
                return new CompletableFuture<V>(encodeThrowable(ex));
            }
        }
        CompletableFuture<V> d = new CompletableFuture<V>();
        UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f);
        push(c);
        c.tryFire(SYNC);
        return d;
    }

thenCombine 方法将会返回一个包含 CompletableFuture 类型返回值的 CompletableFuture

thenCompose 方法和 thenComposeAsync 方法的区别同 thenRun 方法和 thenRunAsync 方法。

双CompletableFuture联合处理

  • 如果希望当前置 CompletableFuture 和指定的 CompletableFuture 全部完成后触发,但不希望接收前边两个 CompletableFuture 的输出结果 TU ,且处理完成后不返回结果,则可以使用 Runnable ,并调用 runAfterBoth 方法
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                                Runnable action) {
        return biRunStage(null, other, action);
    }

    public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                     Runnable action) {
        return biRunStage(asyncPool, other, action);
    }

    private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o,
                                               Runnable f) {
        CompletableFuture<?> b;
        if (f == null || (b = o.toCompletableFuture()) == null)
            throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        if (e != null || !d.biRun(this, b, f, null)) {
            BiRun<T,?> c = new BiRun<>(e, d, this, b, f);
            bipush(b, c);
            c.tryFire(SYNC);
        }
        return d;
    }

runAfterBoth 方法会返回一个不包含返回值的 CompletableFuture

runAfterBoth 方法和 runAfterBothAsync 方法的区别同 thenRun 方法和 thenRunAsync 方法。

  • 如果希望当前置 CompletableFuture 和指定的 CompletableFuture 全部完成后触发,并接收前边两个 CompletableFuture 的输出结果 TU ,但处理完成后不返回结果,则可以使用 BiConsumer<? super T, ? super U> ,并调用 thenAcceptBoth 方法:
public <U> CompletableFuture<Void> thenAcceptBoth(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(null, other, action);
    }

    public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(asyncPool, other, action);
    }

    private <U> CompletableFuture<Void> biAcceptStage(
        Executor e, CompletionStage<U> o,
        BiConsumer<? super T,? super U> f) {
        CompletableFuture<U> b;
        if (f == null || (b = o.toCompletableFuture()) == null)
            throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        if (e != null || !d.biAccept(this, b, f, null)) {
            BiAccept<T,U> c = new BiAccept<T,U>(e, d, this, b, f);
            bipush(b, c);
            c.tryFire(SYNC);
        }
        return d;
    }

thenAcceptBoth 方法会返回一个不包含返回值的 CompletableFuture

thenAcceptBoth 方法和 thenAcceptBothAsync 方法的区别同 thenRun 方法和 thenRunAsync 方法。

  • 如果希望当前置 CompletableFuture 和指定的 CompletableFuture 全部完成后触发,并接收前边两个 CompletableFuture 的输出结果 TU ,且处理完成后返回 V 作为处理结果,则可以使用 BiFunction<? super T,? super U,? extends V> ,并调用 thenCombine 方法:
public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(null, other, fn);
    }

    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(asyncPool, other, fn);
    }

    private <U,V> CompletableFuture<V> biApplyStage(
        Executor e, CompletionStage<U> o,
        BiFunction<? super T,? super U,? extends V> f) {
        CompletableFuture<U> b;
        if (f == null || (b = o.toCompletableFuture()) == null)
            throw new NullPointerException();
        CompletableFuture<V> d = new CompletableFuture<V>();
        if (e != null || !d.biApply(this, b, f, null)) {
            BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);
            bipush(b, c);
            c.tryFire(SYNC);
        }
        return d;
    }

thenCombine 方法会返回一个包含 V 类型返回值的 CompletableFuture

thenCombine 方法和 thenCombineAsync 方法的区别同 thenRun 方法和 thenRunAsync 方法。

  • 如果希望当前置 CompletableFuture 和指定的 CompletableFuture 任意一个完成后触发,但不希望接收前边两个 CompletableFuture 的输出结果 TU ,且处理完成后不返回结果,则可以使用 Runnable ,并调用 runAfterEither 方法
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
                                                  Runnable action) {
        return orRunStage(null, other, action);
    }

    public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                       Runnable action) {
        return orRunStage(asyncPool, other, action);
    }

    private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o,
                                               Runnable f) {
        CompletableFuture<?> b;
        if (f == null || (b = o.toCompletableFuture()) == null)
            throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        if (e != null || !d.orRun(this, b, f, null)) {
            OrRun<T,?> c = new OrRun<>(e, d, this, b, f);
            orpush(b, c);
            c.tryFire(SYNC);
        }
        return d;
    }

runAfterEither 方法将返回一个不包含返回值的 CompletableFuture

runAfterEither 方法和 runAfterEitherAsync 方法的区别同 thenRun 方法和 thenRunAsync 方法。

  • 如果希望当前置 CompletableFuture 和指定的 CompletableFuture 任意一个完成时触发,并接收前边两个 CompletableFuture 的输出结果 TU ,但处理完成后不返回结果,则可以使用 Consumer<? super T> ,并调用 acceptEither 方法:
public CompletableFuture<Void> acceptEither(
        CompletionStage<? extends T> other, Consumer<? super T> action) {
        return orAcceptStage(null, other, action);
    }

    public CompletableFuture<Void> acceptEitherAsync(
        CompletionStage<? extends T> other, Consumer<? super T> action) {
        return orAcceptStage(asyncPool, other, action);
    }

    private <U extends T> CompletableFuture<Void> orAcceptStage(
        Executor e, CompletionStage<U> o, Consumer<? super T> f) {
        CompletableFuture<U> b;
        if (f == null || (b = o.toCompletableFuture()) == null)
            throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        if (e != null || !d.orAccept(this, b, f, null)) {
            OrAccept<T,U> c = new OrAccept<T,U>(e, d, this, b, f);
            orpush(b, c);
            c.tryFire(SYNC);
        }
        return d;
    }

acceptEither 方法将返回一个不包含返回值的 CompletableFuture

acceptEither 方法和 acceptEitherAsync 方法的区别同 thenRun 方法和 thenRunAsync 方法。

  • 如果希望当前置 CompletableFuture 和指定的 CompletableFuture 任意一个完成后触发,并接收前边两个 CompletableFuture 的输出结果 TU ,处理完成后返回 V 作为处理结果,则可以使用 Function<? super T, U> ,并调用 applyToEither 方法:
public <U> CompletableFuture<U> applyToEither(
        CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return orApplyStage(null, other, fn);
    }

    public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return orApplyStage(asyncPool, other, fn);
    }

    private <U extends T,V> CompletableFuture<V> orApplyStage(
        Executor e, CompletionStage<U> o,
        Function<? super T, ? extends V> f) {
        CompletableFuture<U> b;
        if (f == null || (b = o.toCompletableFuture()) == null)
            throw new NullPointerException();
        CompletableFuture<V> d = new CompletableFuture<V>();
        if (e != null || !d.orApply(this, b, f, null)) {
            OrApply<T,U,V> c = new OrApply<T,U,V>(e, d, this, b, f);
            orpush(b, c);
            c.tryFire(SYNC);
        }
        return d;
    }

applyToEither 方法将返回一个包含 V 类型返回值的 CompletableFuture

applyToEither 方法和 applyToEitherAsync 方法的区别同 thenRun 方法和 thenRunAsync 方法。

多CompletableFuture联合处理

  • 如果希望若干 CompletableFuture 全部执行完成后再触发,则应该将所有待等待的 CompletableFuture 任务全部传入 allOf 方法:
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
    }
    
    /** Recursively constructs a tree of completions. */
    static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,
                                           int lo, int hi) {
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        CompletableFuture<?> a, b;
        int mid = (lo + hi) >>> 1;
        if ((a = (lo == mid ? cfs[lo] :
                  andTree(cfs, lo, mid))) == null ||
            (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
                  andTree(cfs, mid+1, hi)))  == null)
            throw new NullPointerException();
        if (!d.biRelay(a, b)) {
            BiRelay<?,?> c = new BiRelay<>(d, a, b);
            a.bipush(b, c);
            c.tryFire(SYNC);
        }
        return d;
    }

allOf 方法将会返回一个不包含返回值的 CompletableFuture ,方便后置操作。

  • 如果希望若干 CompletableFuture 中任意一个执行完成后就触发,则应该将所有待等待的 CompletableFuture 任务全部传入 anyOf 方法:
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
        return orTree(cfs, 0, cfs.length - 1);
    }
    
    /** Recursively constructs a tree of completions. */
    static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs,
                                            int lo, int hi) {
        CompletableFuture<Object> d = new CompletableFuture<Object>();
        CompletableFuture<?> a, b;
        int mid = (lo + hi) >>> 1;
        if ((a = (lo == mid ? cfs[lo] :
                  orTree(cfs, lo, mid))) == null ||
            (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
                  orTree(cfs, mid+1, hi)))  == null)
            throw new NullPointerException();
        if (!d.orRelay(a, b)) {
            OrRelay<?,?> c = new OrRelay<>(d, a, b);
            a.orpush(b, c);
            c.tryFire(SYNC);
        }
        return d;
    }

anyOf 方法将返回率先完成的 CompletableFuture 的返回结果,方便后置操作。

CompletableFuture完成处理

  • 如果希望接收前置CompletableFuture结果和可能发生的异常,但不打算返回处理结果,则可以使用 BiConsumer<? super T, ? super Throwable> 对象,并调用 whenComplete 方法:
public CompletableFuture<T> whenComplete(
        BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(null, action);
    }

    public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(asyncPool, action);
    }
    
    private CompletableFuture<T> uniWhenCompleteStage(
        Executor e, BiConsumer<? super T, ? super Throwable> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<T> d = new CompletableFuture<T>();
        if (e != null || !d.uniWhenComplete(this, f, null)) {
            UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f);
            push(c);
            c.tryFire(SYNC);
        }
        return d;
    }

whenComplete 方法将会返回一个不包含返回值的 CompletableFuture

whenComplete 方法和 whenCompleteAsync 方法的区别同 thenRun 方法和 thenRunAsync 方法。

  • 如果希望接收前置CompletableFuture结果和可能发生的异常,并打算返回处理结果,则可以使用 BiFunction<? super T, Throwable, ? extends U> 对象,并调用 handle 方法:
public <U> CompletableFuture<U> handle(
        BiFunction<? super T, Throwable, ? extends U> fn) {
        return uniHandleStage(null, fn);
    }

    public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn) {
        return uniHandleStage(asyncPool, fn);
    }
    
    private <V> CompletableFuture<V> uniHandleStage(
        Executor e, BiFunction<? super T, Throwable, ? extends V> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<V> d = new CompletableFuture<V>();
        if (e != null || !d.uniHandle(this, f, null)) {
            UniHandle<T,V> c = new UniHandle<T,V>(e, d, this, f);
            push(c);
            c.tryFire(SYNC);
        }
        return d;
    }

handle 方法将会返回一个包含 U 类型返回值的 CompletableFuture

handle 方法和 handleAsync 方法的区别同 thenRun 方法和 thenRunAsync 方法。

CompletableFuture异常处理

  • 如果希望处理前置CompletableFuture可能发生的异常,并打算返回处理结果,则可以使用 Function<Throwable, ? extends T> 对象,并调用 exceptionally 方法:
public CompletableFuture<T> exceptionally(
        Function<Throwable, ? extends T> fn) {
        return uniExceptionallyStage(fn);
    }
    
    private CompletableFuture<T> uniExceptionallyStage(
        Function<Throwable, ? extends T> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<T> d = new CompletableFuture<T>();
        if (!d.uniExceptionally(this, f, null)) {
            UniExceptionally<T> c = new UniExceptionally<T>(d, this, f);
            push(c);
            c.tryFire(SYNC);
        }
        return d;
    }

exceptionally 方法将会返回一个包含 T 类型返回值的 CompletableFuture

流程示例

public static void main(String[] args) {
        System.out.println(testCF());
    }



    private static String testCF() {
        Map<String, Object> results = new HashMap<>();
        List<CompletableFuture<Void>> completableFutures = Lists.newArrayList();

        System.out.println(Thread.currentThread()
            + "testCF start ...... ");

        for (int i = 1; i <= 5; i++) {

            int finalI = i;
            CompletableFuture<Void> completableFuture = CompletableFuture
                .supplyAsync(
                    () -> {
                        System.out.println(Thread.currentThread()
                            + "supplyAsync Job" + finalI);
                        try {
                            Thread.sleep(finalI * 2000);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
//                        if (true) {
//                            throw new RuntimeException("测试异常" + finalI);
//                        }
                        return "Job" + finalI;
                    })
                .thenAcceptAsync(
                    (queryResult) -> {
                        System.out.println(Thread.currentThread()
                            + "thenAcceptAsync " + queryResult);
                        results.put(String.valueOf(finalI), queryResult);
                    })
                .whenCompleteAsync(
                    (aVoid, throwable) -> {
                        if (Objects.nonNull(throwable)) {
                            System.err.println(Thread.currentThread()
                                + "whenCompleteAsync Job" + throwable.getMessage());
                        }
                        System.out.println(Thread.currentThread()
                            + "whenCompleteAsync Job" + finalI);
                    })
                .exceptionally(
                    throwable -> {
                        System.err.println("exceptionally " + throwable.getMessage());
                        throw new DaportalException(throwable.getMessage());
                    });

            completableFutures.add(completableFuture);
        }

        System.out.println(Thread.currentThread()
            + "for loop completed  ...... ");

        CompletableFuture[] voidCompletableFuture = {};
        return CompletableFuture
            .allOf(completableFutures.toArray(voidCompletableFuture))
            .handle(
                (aVoid, throwable) -> {
                    System.out.println(Thread.currentThread()
                        + "handle ...... ");
                    if (Objects.nonNull(throwable)) {
                        System.err.println("handle " + throwable.getMessage());
                        throw new DaportalException(throwable.getMessage());
                    }
                    try {
                        return StringUtil.MAPPER.writeValueAsString(results);
                    } catch (JsonProcessingException e) {
                        throw new DaportalException(e.getMessage());
                    }
                })
            .join();

    }

执行输出:

Thread[main,5,main]testCF start ...... 
Thread[ForkJoinPool.commonPool-worker-1,5,main]supplyAsync Job1
Thread[ForkJoinPool.commonPool-worker-2,5,main]supplyAsync Job2
Thread[ForkJoinPool.commonPool-worker-3,5,main]supplyAsync Job3
Thread[main,5,main]for loop completed  ...... 



Thread[ForkJoinPool.commonPool-worker-1,5,main]thenAcceptAsync Job1
Thread[ForkJoinPool.commonPool-worker-1,5,main]supplyAsync Job4




Thread[ForkJoinPool.commonPool-worker-2,5,main]whenCompleteAsync Job1
Thread[ForkJoinPool.commonPool-worker-2,5,main]supplyAsync Job5




Thread[ForkJoinPool.commonPool-worker-3,5,main]thenAcceptAsync Job2
Thread[ForkJoinPool.commonPool-worker-3,5,main]thenAcceptAsync Job3
Thread[ForkJoinPool.commonPool-worker-3,5,main]whenCompleteAsync Job2
Thread[ForkJoinPool.commonPool-worker-3,5,main]whenCompleteAsync Job3








Thread[ForkJoinPool.commonPool-worker-1,5,main]thenAcceptAsync Job4
Thread[ForkJoinPool.commonPool-worker-1,5,main]whenCompleteAsync Job4








Thread[ForkJoinPool.commonPool-worker-2,5,main]thenAcceptAsync Job5
Thread[ForkJoinPool.commonPool-worker-1,5,main]whenCompleteAsync Job5
Thread[ForkJoinPool.commonPool-worker-1,5,main]handle ...... 
{"1":"Job1","2":"Job2","3":"Job3","4":"Job4","5":"Job5"}


Process finished with exit code 0

换行越多代表时间间隔越长

下面是整个流程中各个异步线程的工作情况:

Java异步编程-CompletableFuture

总结

以上所有 CompletableFuture 方法都将返回一个 CompletableFuture ,以此来支持链式编程。

所有带 async 后缀的方法会重新入栈并等待调度,并使用CAS乐观锁提升并发性能:

/** Returns true if successfully pushed c onto stack. */
    final boolean tryPushStack(Completion c) {
        Completion h = stack;
        lazySetNext(c, h);
        return UNSAFE.compareAndSwapObject(this, STACK, h, c);// CAS乐观锁
    }

所有带 async 后缀的方法都会包含一个加 Executor 参数的重载方法,用于指定外部线程池,默认 commonPool 大小是3(这一点可以从后续的“流程示例”得出):

private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
    
    ......
    
    static Executor screenExecutor(Executor e) {
        if (!useCommonPool && e == ForkJoinPool.commonPool())
            return asyncPool;
        if (e == null) throw new NullPointerException();
        return e;
    }
原文  https://segmentfault.com/a/1190000021288472
正文到此结束
Loading...