由于 Runnable
接口中的 run()
方法无返回值,当我们需要执行的任务需要返回一个对象的话, Runnable
接口虽然可以用过定义实例变量来完成同样的效果,但并非是最佳的选择。此时,应该定义实现了 Callable
接口的类,借助于Java的线程池来达到我们的目的。需要注意的是,这里只能调用ExecutorService对象的 submit(Callable callable)
方法,该方法将返回一个 Future
对象,通过调用该对象的get()方法拿到我们需要的返回值。
/** * IntegerSquare类用于计算并返回输入的整数的平方 */ public class IntegerSquare implements Callable<Integer> { private int num; //进行平方运算的数 public IntegerSqure (int num) { this.num = num; } @Override public Integer call() throws Exception { return num * num; //返回num的平方 } } /** * 使用CachedThreadPool进行测试 */ public class IntegerSqureDemo { public static void main (String[] args) { ExecutorService threadPool = Executors.newCachedThreadPool(); List<Future<Integer>> resultList = new ArrayList<>(); for (int i = 0; i < 100; ++i) { Future<Integer> result = threadPool.submit(new IntegerSquare(i)); //submit方法返回Future对象 resultList.add(result); } for (Future<Integer> result : resultList) { try { System.out.println(result.get()); //调用Future对象的get()方法拿到任务的返回值 } catch (InterruptedException e) { System.out.println("InterruptedException"); } catch (ExecutionException e) { System.out.println("ExecutionException"); } } threadPool.shutdown(); } }
为了了解利用 Callable
对象和线程池拿到返回值的过程,我对源代码进行了阅读。
AbstractExecutorService.java
//AbstractExecutorService抽象类实现了ExecutorService接口,后者有submit方法 public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
可以看到,该方法返回了一个 Future
对象 ftask
,而这个 ftask
又是一个 RunnableFuture
的对象,那么我们先看看 RunnableFuture
这个接口的定义。
RunnableFuture.java
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
可以看出,该接口继承了 Runnable
和 Future
接口,所以 submit
方法才可以返回一个 Future
的对象。进一步我们观察一下 Future
接口的定义方式。
Future.java
public interface Future<V> { /** * 尝试取消任务的执行。如果这个任务已经完成执行或者已经被取消,尝试失败。 * 当任务尚未开始执行,调用call方法并且成功,那么这个任务将不会再被执行。 * 当任务已经开始,参数mayInterruptIfRunning将决定是否打算相关线程以达到取消任务的目的 */ boolean cancel(boolean mayInterruptIfRunning); /** * 任务在完成执行之前被取消,返回true */ boolean isCancelled(); /** * 任务正常完成返回true */ boolean isDone(); /** * 等待计算结束,拿到返回值并且返回 */ V get() throws InterruptedException, ExecutionException; /** * 设定等待计算的timeout,如果timeout之前能拿到返回值则返回这个值,如果超时抛出TimeoutException * 异常 */ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
从源码可以看出, Future
对象可以实现取消任务、拿到任务返回值的功能,支持我们在主线程中记录 Future
对象并通过该对象拿到返回值或者异常。接下来我们看看 RunnableFuture
接口的一个实现类 FutureTask
,这个类也是 AbstractExecutorService
类的 submit(Callable<T> task)
方法中调用 newTaskFor(Callable<T> callable)
所返回的具体实现类。这里我们选取主要的成员变量和方法进行分析。
FutureTask
public class FutureTask<V> implements RunnableFuture<V> { /** 传入的Callable对象 */ private Callable<V> callable; /** 返回值 */ private Object outcome; // non-volatile, protected by state reads/writes /** * 该方法被get方法所调用,作用是将成员变量Object类型的outcome进行类型转换并返回。 */ @SuppressWarnings("unchecked") private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); } /** 使用Callable对象作为参数的构造函数 */ public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } /** * 这就是我们在主线程中通过Future对象调用的用来拿到返回值的get方法 */ public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); //通过report方法最终拿到需要的返回值 } /** 计算得到返回值赋给成员变量outcome (Object类型),在run方法中被调用 */ protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } /** * 这个run()方法作为接口被ExecutorService的execute方法所调用,与runnable对象的run方法作用相同。 * 执行过程是,调用callable的call方法,拿到返回值,并且调用set方法将返回值赋给成员变量outcome。 */ public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } }
上述源码并非 FutureTask
类的全部,我仅仅选取了我感兴趣的部分进行记录,主要是帮助自己有一个对整个调用过程有个初步的理解。
Callable
接口的类,利用线程池及其 submit
方法来完成。 submit
方法将 Callable
对象封装成一个同时实现 Runnable
接口和 Future
接口的 RunnableFuture
对象,对外也就是主线程,暴露其 Future
接口以拿到返回值。对内暴露两种接口,通过 run
方法对 callable
对象的 call
方法进行调用,计算并拿到返回值。