最近在看JUC线程池 java.util.concurrent.ThreadPoolExecutor
的源码实现,其中了解到 java.util.concurrent.Future
的实现原理。从目前 java.util.concurrent.Future
的实现来看,虽然实现了异步提交任务,但是任务结果的获取过程需要主动调用 Future#get()
或者 Future#get(long timeout, TimeUnit unit)
,而前者是阻塞的,后者在异步任务执行时间不确定的情况下有可能需要进行轮询,这两种情况和异步调用的初衷有点相违背。于是笔者想结合目前了解到的 Future
实现原理的前提下扩展出支持(监听)回调的 Future
,当然这里不会参考 Guava
增强的 ListenableFuture
。本文编写的时候使用的JDK是JDK11,其他版本可能不适合。
并发大师 Doug Lea
在设计JUC线程池的时候,提供了一个顶层执行器接口 Executor
:
public interface Executor { void execute(Runnable command); }
实际上,这里定义的方法 Executor#execute()
是整套线程池体系最核心的接口,也就是 ThreadPoolExecutor
定义的核心线程、额外创建的线程(线程池最大线程容量 - 核心线程数)都是在这个接口提交任务的时候懒创建的,也就是说 ExecutorService
接口扩展的功能都是基于 Executor#execute()
的基础进行扩展。 Executor#execute()
方法只是单纯地把任务实例 Runnable
对象投放到线程池中分配合适的线程执行,但是由于方法返回值是 void
类型,我们是无法感知任务什么时候执行完毕。这个时候就需要对 Runnable
任务实例进行包装(下面是伪代码 + 伪逻辑):
// 下面这个Wrapper和Status类是笔者虚构出来 @RequiredArgsConstructor class Wrapper implements Runnable{ private final Runnable target; private Status status = Status.of("初始化"); @Override public void run(){ try{ target.run(); status = Status.of("执行成功"); }catch(Throwable t){ status = Status.of("执行异常"); } } }
我们只需要把 new Wrapper(原始Runnable实例)
投放到线程池执行,那么通过定义好的 Status
状态记录变量就能得知异步任务执行的状态,以及什么时候执行完毕(包括正常的执行完毕和异常的执行完毕)。这里仅仅解决了任务执行的状态获取,但是 Executor#execute()
方法法返回值是 void
类型的特点使得我们无法回调 Runnable
对象执行的结果。这个时候需要定义一个可以回调执行结果的接口,其实已经有现成的接口 Callable
:
@FunctionalInterface public interface Callable<V> { V call() throws Exception; }
这里遇到了一个问题:由于 Executor#execute()
只接收 Runnable
参数,我们需要把 Callable
接口适配到 Runnable
接口,这个时候,做一次简单的委托即可:
@RequiredArgsConstructor class Wrapper implements Runnable{ private final Callable callable; private Status status = Status.of("初始化"); @Getter private Object outcome; @Override public void run(){ try{ outcome = callable.call(); status = Status.of("执行成功"); }catch(Throwable t){ status = Status.of("执行异常"); outcome = t; } } }
这里把 Callable
实例直接委托给 Wrapper
,而 Wrapper
实现了 Runnable
接口,执行结果直接存放在定义好的 Object
类型的对象 outcome
中即可。当我们感知到执行状态已经结束,就可以从 outcome
中提取到执行结果。
上面一个小结仅仅对 Future
实现做一个相对合理的虚拟推演,实际上, RunnableFuture
才是JUC中常用的复合接口,它同时实现了 Runnable
和 Future
:
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
上一节提到的虚构出来的 Wrapper
类,在JUC中类似的实现是 java.util.concurrent.FutureTask
,它就是 Callable
和 Runnable
的适配器, FutureTask
实现了 RunnableFuture
接口:
public class FutureTask<V> implements RunnableFuture<V> { private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** The underlying callable; nulled out after running */ private Callable<V> callable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** Treiber stack of waiting threads */ private volatile WaitNode waiters; // 省略其他代码 }
注意到核心属性 state
表示执行状态, outcome
承载执行结果。接着看提交 Callable
类型任务的方法 ExecutorService#submit()
:
public interface ExecutorService extends Executor { // 省略其他接口方法 <T> Future<T> submit(Callable<T> task); }
当我们通过上述 ExecutorService#submit()
方法提交 Callable
类型任务的时候,实际上做了如下的步骤:
task
的存在性,如果为 null
抛出 NullPointerException
。 Callable
类型的 task
包装为 FutureTask
实例。 FutureTask
实例放到线程池中执行,也就是调用 Executor#execute(FutureTask实例)
。 FutureTask
实例的接口实例 RunnableFuture
(实际上是返回子接口 Future
实例)。
如果我们需要获取结果,可以 Future#get()
或者 Future#get(long timeout, TimeUnit unit)
获取,调用这两个方法的时候参看 FutureTask
里面的方法实现,得知步骤如下:
state
小于等于 COMPLETING(1)
,说明任务还在执行中,获取结果的请求线程会放入 WaitNode
类型的队列中进行阻塞。 state
和把结果赋值到 outcome
之外,唤醒所有阻塞获取结果的线程,然后调用钩子方法 FutureTask#done()
(具体见源码 FutureTask#finishCompletion()
)。
其实分析了这么多,笔者想指出的结论就是:
Callable
类型任务提交到线程池中执行完毕(包括正常执行完毕和异常执行完毕)之后,都会回调钩子方法 FutureTask#done()
。这个就是我们扩展可监听 Future
的理论依据。
先做一次编码实现,再简单测试其功能。
先定义一个 Future
接口的子接口 ListenableFuture
,用于添加可监听的回调:
public interface ListenableFuture<V> extends Future<V> { void addCallback(ListenableFutureCallback<V> callback, Executor executor); }
ListenableFutureCallback
是一个函数式回调接口:
@FunctionalInterface public interface ListenableFutureCallback<V> { void callback(V value, Throwable throwable); }
对于 ListenableFutureCallback
而言,回调的结果 value
和 throwable
是互斥的。正常执行完毕的情况下 value
将会是执行结果值, throwable
为 null
;异常执行完毕的情况下, value
将会是 null
, throwable
将会是抛出的异常实例。如果更习惯于分开处理正常执行完毕的结果和异常执行完毕的结果, ListenableFutureCallback
可以这样定义:
public interface ListenableFutureCallback<V> { void onSuccess(V value); void onError(Throwable throwable); }
接着定义 ListenableExecutorService
接口继承 ExecutorService
接口:
public interface ListenableExecutorService extends ExecutorService { <T> ListenableFuture<T> listenableSubmit(Callable<T> callable); /** * 定义这个方法是因为有些时候由于任务执行时间非常短,有可能通过返回的ListenableFuture实例添加回调之前已经执行完毕,因此可以支持显式传入回调 * * @param callable callable * @param callbacks callbacks * @param executor executor * @return ListenableFuture */ <T> ListenableFuture<T> listenableSubmit(Callable<T> callable, List<ListenableFutureCallback<T>> callbacks, Executor executor); }
然后添加一个执行单元适配器 ListenableFutureCallbackRunnable
,承载每次回调触发的调用(实现 Runnable
接口,从而支持异步执行):
@RequiredArgsConstructor public class ListenableFutureCallbackRunnable<V> implements Runnable { private final ListenableFutureCallback<V> callback; private final V value; private final Throwable throwable; @Override public void run() { callback.callback(value, throwable); } }
接着需要定义一个 FutureTask
的子类 ListenableFutureTask
,核心逻辑是覆盖 FutureTask#done()
方法触发回调:
// ListenableFutureTask public class ListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> { private final List<Execution<V>> executions = new ArrayList<>(); public ListenableFutureTask(Callable<V> callable) { super(callable); } public ListenableFutureTask(Runnable runnable, V result) { super(runnable, result); } public static <V> ListenableFutureTask<V> newTaskFor(Callable<V> callable) { return new ListenableFutureTask<>(callable); } @Override protected void done() { Iterator<Execution<V>> iterator = executions.iterator(); Throwable throwable = null; V value = null; try { value = get(); } catch (Throwable t) { throwable = t; } while (iterator.hasNext()) { Execution<V> execution = iterator.next(); ListenableFutureCallbackRunnable<V> callbackRunnable = new ListenableFutureCallbackRunnable<>(execution.getCallback(), value, throwable); // 异步回调 if (null != execution.getExecutor()) { execution.getExecutor().execute(callbackRunnable); } else { // 同步回调 callbackRunnable.run(); } } } @Override public void addCallback(ListenableFutureCallback<V> callback, Executor executor) { Execution<V> execution = new Execution<>(); execution.setCallback(callback); execution.setExecutor(executor); executions.add(execution); } } // Execution - 承载每个回调实例和对应的Executor,Executor实例为null则进行同步回调 @Data public class Execution <V>{ private Executor executor; private ListenableFutureCallback<V> callback; }
最后一步就是编写线程池 ListenableThreadPoolExecutor
,继承自 ThreadPoolExecutor
并且实现 ListenableExecutorService
接口:
public class ListenableThreadPoolExecutor extends ThreadPoolExecutor implements ListenableExecutorService { public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); } public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); } public ListenableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } @Override public <T> ListenableFuture<T> listenableSubmit(Callable<T> callable) { if (null == callable) { throw new IllegalArgumentException("callable"); } ListenableFutureTask<T> listenableFutureTask = ListenableFutureTask.newTaskFor(callable); execute(listenableFutureTask); return listenableFutureTask; } @Override public <T> ListenableFuture<T> listenableSubmit(Callable<T> callable, List<ListenableFutureCallback<T>> callbacks, Executor executor) { if (null == callable) { throw new IllegalArgumentException("callable"); } if (null == callbacks) { throw new IllegalArgumentException("callbacks"); } ListenableFutureTask<T> listenableFutureTask = ListenableFutureTask.newTaskFor(callable); for (ListenableFutureCallback<T> callback : callbacks) { listenableFutureTask.addCallback(callback, executor); } execute(listenableFutureTask); return listenableFutureTask; } }
引入 junit
,编写测试类如下:
public class ListenableFutureTest { private static ListenableExecutorService EXECUTOR; private static Executor E; @BeforeClass public static void before() { EXECUTOR = new ListenableThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadFactory() { private final AtomicInteger counter = new AtomicInteger(); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName(String.format("ListenableWorker-%d", counter.getAndIncrement())); return thread; } }); E = Executors.newFixedThreadPool(3); } @Test public void testListenableFuture1() throws Exception { ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> { Thread.sleep(1000); return "message"; }); future.addCallback((v, t) -> { System.out.println(String.format("Value = %s,Throwable = %s", v, t)); }, null); Thread.sleep(2000); } @Test public void testListenableFuture2() throws Exception { ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> { Thread.sleep(1000); throw new RuntimeException("exception"); }); future.addCallback((v, t) -> { System.out.println(String.format("Value = %s,Throwable = %s", v, t)); }, null); Thread.sleep(2000); } @Test public void testListenableFuture3() throws Exception { ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> { Thread.sleep(1000); return "message"; }); future.addCallback((v, t) -> { System.out.println(String.format("Value = %s,Throwable = %s", v, t)); }, E); System.out.println("testListenableFuture3 end..."); Thread.sleep(2000); } @Test public void testListenableFuture4() throws Exception { ListenableFuture<String> future = EXECUTOR.listenableSubmit(() -> { Thread.sleep(1000); throw new RuntimeException("exception"); }); future.addCallback((v, t) -> { System.out.println(String.format("Value = %s,Throwable = %s", v, t)); }, E); System.out.println("testListenableFuture4 end..."); Thread.sleep(2000); } }
执行结果:
// testListenableFuture1 Value = message,Throwable = null // testListenableFuture2 Value = null,Throwable = java.util.concurrent.ExecutionException: java.lang.RuntimeException: exception // testListenableFuture3 testListenableFuture3 end... Value = message,Throwable = null // testListenableFuture4 testListenableFuture4 end... Value = null,Throwable = java.util.concurrent.ExecutionException: java.lang.RuntimeException: exception
和预期的结果一致,注意一下如果 Callable
执行抛出异常,异常被包装为 ExecutionException
,要调用 Throwable#getCause()
才能得到原始的异常实例。
本文通过了解 ThreadPoolExecutor
和 Future
的实现原理做简单的扩展,使得异步提交任务变得更加优雅和简便。强化了动手能力的同时,也能加深对并发编程的一些认知。当然,本文只是提供一个十分简陋的实现,笔者其实还想到了如对回调处理的耗时做监控、回调打上分组标签执行等等更完善的功能,等到有需要的场景再进行实现。
这里记录一下过程中的一些领悟:
Executor#execute()
(本文完 c-1-d e-a-20190702)