Java 8 的 CompletableFuture
private static void create() { // 新建一个已经完成的 CompletableFuture.<em>completedFuture</em>(""); // JDK 9+ // CompletableFuture.failedFuture(new Exception()); // 类似 SettableFuture.create() 新建一个壳子 CompletableFuture<String> hello = new CompletableFuture<>(); // 然后在合适的时机使它完成 hello.complete(""); hello.completeExceptionally(new Exception()); // 异步运行 CompletableFuture.<em>runAsync</em>(() -> System.<em>out</em>.println("CompletableFuture<Void>")); // 异步计算 CompletableFuture.<em>supplyAsync</em>(() -> "CompletableFuture<String>"); }
private static void then() { CompletableFuture.<em>completedFuture</em>("") // 完成后执行一个 Function 进行数据转换 类似 Stream.map .thenApply(String::length) // 联合另一个 CompletableFuture,都完成后执行一个 BiFunction 进行数据转换 .thenCombine(CompletableFuture.<em>completedFuture</em>(1), (length, num) -> "len+num=" + length + num) // 当两个都完成后执行一个动作 返回新的 CompletableFuture<Void> .thenAcceptBoth(CompletableFuture.<em>completedFuture</em>(2), (s, num) -> System.<em>out</em>.printf("%s,%d/n", s, num)) .thenApply(v -> "s") // 将完成后的结果转换为另一个 CompletableFuture 类似 Stream.flatMap .thenCompose(s -> CompletableFuture.<em>completedFuture</em>("compose:" + s)) // 当出现异常时 处理异常并返回异常时的值 .exceptionally(e -> "之前是什么类型这里就需要返回什么类型") // 完成后执行的动作 返回新的 CompletableFuture<Void> .thenAccept(System.<em>out</em>::println) // 类似 Accept 完成后执行的动作,返回新的 CompletableFuture<Void> .whenComplete((result, ex) -> { }) // 类似 Apply 完成后执行的动作,返回新的 CompletableFuture<U> .handle((result, ex) -> "new") ; }
该 CompletableFuture 在哪个线程完成的,它之后紧接着的 then 操作就在这个线程运行。
private static void thread() { CompletableFuture<String> f = new CompletableFuture<>(); CompletableFuture<Long> other = new CompletableFuture<>(); System.<em>out</em>.println("out: " + Thread.<em>currentThread</em>().getName()); f .thenApply(s -> { // 这里的线程是给 f 设置结果的线程 System.<em>out</em>.println("thenApply: " + Thread.<em>currentThread</em>().getName()); return s; }) .thenApplyAsync(s -> { // Async 结尾的方法不传 线程池则是 ForkJoinPoll.commonPoll System.<em>out</em>.println(Thread.<em>currentThread</em>().getName()); return s; }) .thenAccept(s -> { // 和上个操作是同一个线程 System.<em>out</em>.println(s + " " + Thread.<em>currentThread</em>().getName()); }) .thenCompose(v -> other) .thenApply(num -> { // 执行线程 取决于哪个 future 后完成 System.<em>out</em>.println(num + " .thenCompose.thenApply: " + Thread.<em>currentThread</em>().getName()); return num + 1; }) ; new Thread(() -> { // sleep(50); f.complete("Ha"); }, "MyThread-1").start(); new Thread(() -> { <em>sleep</em>(50); other.complete(1L); }, "MyThread-2").start(); } private static void sleep(long mills) { try { Thread.<em>sleep</em>(mills); } catch (InterruptedException e) { e.printStackTrace(); } }
如果是已完成的 Future, 调用各种 then 方法,就直接执行了;如果还没完成,则需要把各种 Function/Consumer 等需要执行的回调动作保存起来,待完成后再执行。
private static void debug() { // 实际开发一般都是一条链走到底,这里为了 Debug 好对比哪个实例是哪个 故分开写变量 CompletableFuture<String> hello = new CompletableFuture<>(); CompletableFuture<Void> print = hello.thenAccept(System.<em>out</em>::println); CompletableFuture<String> upper = hello.thenApply(String::toUpperCase); CompletableFuture<Void> v1 = upper.thenAccept(System.<em>out</em>::println); CompletableFuture<Void> v2 = print.thenCombine(upper, (aVoid, s) -> s.toCharArray()) .thenCompose(chars -> CompletableFuture.<em>completedFuture</em>(chars.length)) .thenAccept(System.<em>out</em>::println); hello.complete("Hello"); }
参考链接: