在介绍Netty的ChannelFuture之前,我们先来看看JDK中的Future是如何实现的。总的来说就是任务提交的时候会使用装饰器模式,将任务包装成一个FutureTask。当执行器执行该Task的时候,不仅仅会执行用户提交的任务,还会执行装饰器添加的额外操作,例如在执行之前记录当前执行线程、执行完成后将任务结果保存在FutureTask对象内部等。
详细请看本人过去整理的随笔:
ChannelFuture是在Future基础上的完善,它支持添加监听器,在任务完成后自动执行相关操作。
这个其实就是观察者模式,个人认为就是在前面的C部分添加监听的方法调用,即执行线程执行完成后,如果发现该任务上有监听器,则该执行线程还会调用监听器接口。
话不多说,我们来看看它代码到底是怎么写的,跟Future的实现有何异同。相关实现关键内容在DefaultPromise类中,相对于前面的FutureTask。
private volatile Object result; private final EventExecutor executor; private Object listeners; private short waiters; private boolean notifyingListeners;
从对象属性中我们可以得知,该对象没有保存用户任务。实际上个人任务这是它跟Future最大的不同,Future所引用的本质上是一个任务,而ChannelFuture所引用的只有任务状态和任务结果。所以这个DefaultPromise对象不会被作为任务提交到执行器中。
private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { if (checkNotifyWaiters()) { notifyListeners(); } return true; } return false; }
当任务执行完成,通过setValue将值传入DefaultPromise对象时,唤醒等待的线程,并触发监听器。
public Promise<V> await() throws InterruptedException { if (isDone()) { return this; } if (Thread.interrupted()) { throw new InterruptedException(toString()); } checkDeadLock(); synchronized (this) { while (!isDone()) { incWaiters(); try { wait(); } finally { decWaiters(); } } } return this; } private synchronized boolean checkNotifyWaiters() { if (waiters > 0) { notifyAll(); } return listeners != null; }
从上述代码可知,这里的线程等待与唤醒方式使用的内置的wait()和notify()方法,而FutureTask的等待队列是单独实现的。
注:这里尚不清楚这两种实现方式之间的优劣。
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) { checkNotNull(listener, "listener"); synchronized (this) { addListener0(listener); } //添加监听器时,如果发现已经完成,则直接调用触发监听器 if (isDone()) { notifyListeners(); } return this; }
如果任务已经完成,则直接触发监听器。防止出现"调用setLisener的时候,任务已经完成,导致监听器不被触发"。
public boolean cancel(boolean mayInterruptIfRunning) { if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) { if (checkNotifyWaiters()) { notifyListeners(); } return true; } return false; }
从代码可以看出,它的实现与FutureTask有所不同,它并不会尝试调用执行线程的interrupt()方法来中断线程,只是将等待线程唤醒,并触发监听器。所以这个“取消”操作并不会影响代码的实际执行。
事实上“中断”也只是一种协作方式,它只是设置中断状态并将线程唤醒(如果该线程正处于挂起状态),如果用户代码中没有对中断状态进行判断,也没有使用wait()、sleep()等方法,中断操作也是不会实际“打断”代码执行的。
详细可看: 【杂谈】线程中断——Interrupt