我们知道,Java 里把 Promise 叫作 CompletableFuture,相比那个只能用于线程同步的 Future,CompletableFuture 新增了很多方法用于串联异步事件,比如常用的一些:
thenApply
:拿到结果后对其 apply 一个函数,返回一个新的值 thenApply(T -> R)
thenCompose
:拿到结果后其 apply 一个函数,返回一个新的 CompletableFuture thenCompose(T -> CompletableFuture<R>)
thenAccept
:拿到结果后消费它,不需要返回结果 thenAccept(Consumer<T>)
如果不引入任何第三方库,CompletableFuture 仍是目前 Java 上最好的异步编程方式。之前一直觉得这个东西难用,直到我想明白一件事,证明了 CompletableFuture 虽然麻烦了点但是能做任何事情,然后用它的时候心里就没有这么膈应了。
本文会以一个例子来讲解:如何把 任意函数 转换成异步调用风格。用不用 CompletableFuture 倒是其次的,这项技术更多的是在于编程本身。
这篇文章不会谈论太多 CompletableFuture 的用法,你可以参考 Javadoc 或者 这篇文章 。
首先来(极不严谨地)说明一件事情, 为什么 CompletableFuture 是足够用的 ,换句话说,证明 CompletableFuture 能表达一切计算流程 。
如果你有一些函数式编程的基础,比如会一点 Haskell,这就是一句话的事情:CompletableFuture 其实是一个 Monad —— 因为它的 thenCompose
实现了 Monad 的 >>=
操作符。既然 Monad 能用来表示任何计算过程,CompletableFuture 当然也能。
class Applicative m => Monad (m :: * -> *) where (>>=) :: m a -> (a -> m b) -> m b -- thenCompose 实现了它 (>>) :: m a -> m b -> m b return :: a -> m a fail :: String -> m a {-# MINIMAL (>>=) #-} -- 这是在说:只要实现 (>>=) 就够了
其实想想也很明白,Monad 表示一个带 context 的计算过程,比如可能抛异常之类的(纯函数是不会抛异常的)。CompletableFuture 也一样,他包裹一串计算过程并且处理异常。
如果看不懂上面的也没关系,我们用另一种方式再说明一下:
任何程序的流程控制都可以用 if
和 goto
来组合起来。无论是 for
还是 while
循环,desurge 之后不过就是 if
和 goto
的组合。 通过 thenCompose
就可以表达 if
和 goto
:
这里说的不够严谨,其实 if 也是 surge,最终会变成条件跳转指令。
cf.thenCompose(v -> { if (v < 100) { return doStage1(); // doStage1() 返回一个 CompletableFuture,决定下一步做什么,相当于 goto } else { return doStage2(); // 同上 } })
你看这个例子, if
和 goto
都有了,所以无论程序的控制流多复杂,我们都能组合出来。怎么组合?别急,下面我们就来讲这个。
我们从一个普通的函数开始。考虑到复杂性和完整性,我们用 Merge 2 Sorted Streams 作为演示,如果你不清楚这个是干嘛的,可以先做一下 这道算法题 。
下面是最普通的实现,输入两个数组,输出一个数组:
Stream merge(Stream inputA, Stream inputB) { List<Integer> results = new ArrayList<>(); Integer headA = inputA.next(); Integer headB = inputB.next(); while (headA != null || headB != null) { if (headA == null || headB != null && headA > headB) { results.add(headB); headB = inputB.next(); } else { results.add(headA); headA = inputA.next(); } } return new Stream(results); } class Stream { private final Queue<Integer> numbers; public Stream(List<Integer> numbers) { this.numbers = new LinkedList<>(numbers); } public Integer { return numbers.poll(); } }
这个实现有什么问题呢?作为算法足够 OK。但是从工程意义上说,如果输入的 Stream 很大,包含 million 级的元素,那更好的方式是把 Stream 的输入输出作为 Iterator,只在 next()
的时候计算下一个需要的元素。这样内存占用是常数级的,完全不用担心数据量过大呢!
为了看清一步一步的变化过程,我们先假装 Java 有 Generator 语法 。标记为 Generator 的函数不再是一个函数,而是类似一个 Iterator。一旦调用 next()
,“函数”代码运行到 yield
返回一个值,然后函数似乎 停在 了这里。下次 next()
,“函数”又接着刚刚的地方运行。
如果有 Generator 的话,函数应该长下面这样,注意 [yield]
:
Stream merge(Stream inputA, Stream inputB) { Integer headA = inputA.next(); Integer headB = inputB.next(); while (headA != null || headB != null) { if (headA == null || headB != null && headA > headB) { [yield] headB; headB = inputB.next(); } else { [yield] headA; headA = inputA.next(); } } [yield] null; }
哇,这个函数几乎没有改动,真是太方便了!(然而并没有卵用)
现在我们回到现实:Java 并没有 Generator 语法,所以我们要人肉实现一个 Generator。
为了通用性,首先做一个 desurge,把 while 循环改成 if
和 goto
的组合,这太简单了:
Stream merge(Stream inputA, Stream inputB) { Integer headA = inputA.next(); Integer headB = inputB.next(); WHILE_LOOP: if (headA != null || headB != null) { if (headA == null || headB != null && headA > headB) { [yield] headB; headB = inputB.next(); } else { [yield] headA; headA = inputA.next(); } goto WHILE_LOOP; // again,假设 Java 也有 goto } [yield] null; }
下一步是去掉 yield
,刚刚说到 Generator 的每次 next()
似乎会让函数 停在 一个地方,如何实现 停在 一个地方?记下来呗!加一个标记 状态 的变量,这个状态会告诉我下次 next()
的时候从哪里继续运行。
首先画出函数的控制流图,然后做一件事:想象所有的 yield
之后都有一个断点,我们在断点处切开,标记它为某个 State,这样下次 next()
的时候就能从断点继续。
下图的 S0 ~ S2 是我标记好的断点,S0 就是起始位置,S1 是两个 yield result
之后断下来的地方(恰好是同一个地方),S2 是 yield null
之后断下来的地方。
我们按照图中的 State
标记机械地把它切开,就得到了下面这个类,它就是由 merge()
变换得到的 Generator:
class Merger implements Iterator<Integer> { // Arguments final Iterator inputA; final Iterator inputB; // Internal states private int state = 0; // 我们加上的状态变量 private Integer headA; // 变换前的局部变量,因为跨了多次 next() 调用,不能再是局部变量了 private Integer headB; // 同上 public Merger(Iterator inputA, Iterator inputB) { this.inputA = inputA; this.inputB = inputB; } public Integer next() { for (;;) { // 这个循环是有用的,往下看几行 switch (state) { case 0: headA = inputA.next(); headB = inputB.next(); state = 1; break; // 这里就用上了外层的循环 case 1: if (headA != null || headB != null) { if (headA == null || headB != null && headA > headB ) { final int result = headB; headB = inputB.next(); state = 1; // 可以省略 return result; // 变换前是 yield result } else { final int result = headA; headA = inputA.next(); state = 1; // 可以省略 return result; // 变换前是 yield result } } else { state = 2; return null; // 变换前是 yield null } case 2: // Generator 已经终结了(变换前:函数已经走到底了) throw new IllegalStateException("Generator has been exhausted!"); default: throw new AssertionError("Unreachable!"); } } } }
别急,最后我们会简化这些充满废话的代码。
阶段性总结一下:到现在为止,我们做了一件伟大的事情—— 把一个函数变成了 Iterator,函数已经不再是函数,而是一个状态机,这个状态记录了下次调用 next()
需要从哪继续 。
套用一下术语:“从哪继续”就是 Continuation ,把 Continuation 搞出来的这个过程称为 CPS 变换 。
呃…… 说好的 CompletableFuture 呢?离 CompletableFuture 只有一步之遥了!
先从接口下手。想象两个 Stream Input 都是从 IO 拿到的数据,所以每次 next()
其实背后都是一次 IO,应该把它用 CompletableFuture 包成异步的,接口大概长这样:
interface AsyncIterator<T> { CompletableFuture<T> next(); }
类似刚刚引入 Generator 一样,我们再假装有 await
关键字。 await
关键字表示异步地等待结果返回,有了它,函数就魔法般的暂停在等待异步 IO 的地方:
Stream merge(Stream inputA, Stream inputB) { Integer headA = inputA.next(); Integer headB = inputB.next(); WHILE_LOOP: if (headA != null || headB != null) { if (headA == null || headB != null && headA > headB) { Integer result = headB; headB = [await] inputB.next(); // await 会魔法般地等待 next() 完成再继续运行 [yield] result; } else { Integer result = headA; headA = [await] inputA.next(); [yield] result; } goto WHILE_LOOP; } [yield] null; }
因为 await
也会暂停这个“函数”,所以和刚刚对 yield
的处理一样,我们想象 await
这里有一个断点,我们也要为它设置 State 标记:
糟糕!这状态数有点多啊!好在 Java 8 提供了 Lambda 表达式,和 CompletableFuture 搭配食用口味更佳。图中的大多数状态都可以借助 Lambda 表达式来实现,节约了不少代码:
class Merger implements AsyncIterator<Integer> { // Arguments final Stream inputA; final Stream inputB; // Internal states private int state = 0; private Integer headA; private Integer headB; public Merger(Stream inputA, Stream inputB) { this.inputA = inputA; this.inputB = inputB; } public CompletableFuture<Integer> next() { switch (state) { case 0: return inputA.next().thenCompose(a -> { // State 1 在这里! headA = a; return inputB.next(); }).thenCompose(b -> { // State 2 在这里! headB = b; state = 3; return next(); // 相当于原来的外层循环 }); case 3: if (headA != null || headB != null) { if (headA == null || headB != null && headA > headB) { final Integer result = headB; return inputB.next().thenCompose(b -> { // State 4 在这里! headB = b; state = 3; // 可以省略 return CompletableFuture.completedFuture(result); }); } else { final Integer result = headA; return inputA.next().thenCompose(a -> { // State 5 在这里! headA = a; state = 3; // 可以省略 return CompletableFuture.completedFuture(result); }); } } else { state = 6; return CompletableFuture.completedFuture(null); } case 6: throw new IllegalStateException("Generator has been exhausted!"); default: throw new AssertionError("Unreachable!"); } } }
上面我们只用了 thenCompose
,理论上这是 OK 的,但是实际上 CompletableFuture 有上百个方法,最合适的才是坠吼的。
thenApply
; thenCombine
等待两个 CompletableFuture 都完成了再去调用 BiFunction (T, U) -> R
来消费。 思考题:有兴趣的读者可以思考一下 thenCombine
的实现。
整理一下上面的代码,比如这样:
static class Merger { // States enum State { START, ITERATING, DONE } // Arguments final Stream inputA; final Stream inputB; // Internal states private State state = State.START; private Integer headA; private Integer headB; public Merger(Stream inputA, Stream inputB) { this.inputA = inputA; this.inputB = inputB; } private CompletableFuture<Integer> next() { switch (state) { case START: // 这里做了小小的优化:这两个 next() 可以并行等待 return inputA.next().thenCombine(inputB.next(), (a, b) -> { headA = a; headB = b; state = State.ITERATING; return (Void)null; }).thenCompose(__ -> next()); case ITERATING: if (headA != null || headB != null) { if (headA == null || headB != null && headA > headB) { final Integer result = headB; return inputB.next().thenApply(b -> { // thenCompose 某个值 <=> thenApply headB = b; return result; }); } else { final Integer result = headA; return inputA.next().thenApply(a -> { // 同上 headA = a; return result; }); } } else { state = State.DONE; return CompletableFuture.completedFuture(null); } case DONE: throw new IllegalStateException("Generator has been exhausted!"); default: throw new AssertionError("Unreachable!"); } } }
任何函数都可以用 CompletableFuture 实现异步化,最通用的方式如下:
yield
(返回下一个结果)和 await
(等待输入值)来标记断点; yield
和 await
处断开,断开处标记为状态; 这一刻,我们都是(人肉)编译器。