如果你有一个阻塞的方法,比如 Thread.sleep(1000),而又不想阻塞当前线程 A,只需要把该方法包装成一个任务由另一个线程 B 执行即可。
ExecutorService pool = Executors.newFixedThreadPool(3); Future<Integer> future = pool.submit(() -> { Thread.sleep(1000); return 1; }); 复制代码
如果你需要在任务结束之后执行其他逻辑,一种方式是 A 线程先通过调用 future.get()
获取值,然后执行其他代码;但是 get 方法本身也是一个阻塞方法,在这期间 A 线程阻塞。
另外一种方法是 B 线程执行完任务后,继续执行后续逻辑。Netty 中的 Future,io.netty.util.concurrent.Future,通过回调方法 Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
实现了该功能。
Promise 接口继承了 Future 接口,在增加了 listener 的情况下,提供了 Promise<V> setSuccess(V result)
方法,可以在任务中手动设置返回值,并立即通知 listeners。
private static NioEventLoopGroup loopGroup = new NioEventLoopGroup(8); public void methodA() { Promise promise = methodA("ceee...eeeb"); promise.addListener(future -> { // 1 Object ret = future.get(); // 4. 此时可以直接拿到结果 // 后续逻辑由 B 线程执行 System.out.println(ret); }); // A 线程不阻塞,继续执行其他代码... } public Promise<ResponsePacket> methodB(String name) { Promise<ResponsePacket> promise = new DefaultPromise<>(loopGroup.next()); loopGroup.schedule(() -> { // 2 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("scheduler thread: " + Thread.currentThread().getName()); promise.setSuccess("hello " + name); // 3 }, 0, TimeUnit.SECONDS); return promise; } 复制代码
简单的使用 Promise 包括:
promise.addListener() loopGroup.schedule() promise.setSuccess()
// class: DefaultPromise public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); synchronized (this) { // 1. 增加 listener addListener0(listener); } if (isDone()) { // 2. 如果任务执行完了,通知所有 listener notifyListeners(); } return this; } 复制代码
继续看 addListener0:
private Object listeners; private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) { // 1. 添加第 1 个 listener 时,直接赋值即可 if (listeners == null) { listeners = listener; } // 3. 添加第 3 个以及更多 listener 时,直接加入数组即可 else if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).add(listener); } // 2. 添加第 2 个 listener 时,listeners 类型更改为 DefaultFutureListeners,内部实现为一个数组 else { listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener); } } 复制代码
由于可以添加多个 listener,很容易想到通过一个数组保存所有 listener。而实现类里面 listeners 类型为 Object,可能是考虑到大部分都只有 1 个 listener,节省内存空间。
将任务加入队列,由线程池执行。
// class: DefaultPromise public Promise<V> setSuccess(V result) { if (setSuccess0(result)) { // 如果设置成功,返回;否则抛异常 return this; } throw new IllegalStateException("complete already: " + this); } private boolean setSuccess0(V result) { // 设置 result return setValue0(result == null ? SUCCESS : result); } private boolean setValue0(Object objResult) { // cas 操作 if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { if (checkNotifyWaiters()) { notifyListeners(); } return true; } return false; } private synchronized boolean checkNotifyWaiters() { /** * 有些线程不是通过增加 listener 的方式获取结果,而是通过 promise.get() 方法获取, * 那么这些线程为阻塞状态;当设置了 result 后,需要唤醒这些线程 */ if (waiters > 0) { notifyAll(); } return listeners != null; // 只要存在 listener,就返回 true } 复制代码
继续查看 notifyListeners:
// class: DefaultPromise private void notifyListeners() { EventExecutor executor = executor(); if (executor.inEventLoop()) { final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get(); final int stackDepth = threadLocals.futureListenerStackDepth(); // TODO 嵌套监听 if (stackDepth < MAX_LISTENER_STACK_DEPTH) { threadLocals.setFutureListenerStackDepth(stackDepth + 1); try { // 1. 如果是 promise 绑定的线程,直接执行 notifyListenersNow(); } finally { threadLocals.setFutureListenerStackDepth(stackDepth); } return; } } // 2. 否则,加入任务调度, 因此 listener 方法最终还是由 promise 绑定的线程执行的 safeExecute(executor, new Runnable() { @Override public void run() { notifyListenersNow(); } }); } private void notifyListenersNow() { Object listeners; synchronized (this) { if (notifyingListeners || this.listeners == null) { return; } notifyingListeners = true; listeners = this.listeners; this.listeners = null; } for (;;) { // 依次通知所有 listener if (listeners instanceof DefaultFutureListeners) { notifyListeners0((DefaultFutureListeners) listeners); } else { notifyListener0(this, (GenericFutureListener<?>) listeners); } synchronized (this) { if (this.listeners == null) { notifyingListeners = false; return; } // 通知原先的 listeners 时,有可能有新的 listener 在此期间注册, 也需要通知到 listeners = this.listeners; this.listeners = null; } } } private static void notifyListener0(Future future, GenericFutureListener l) { try { l.operationComplete(future); // 执行 listener 中的方法 } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t); } } } 复制代码