转载

Java多线程中任务有返回值的情形

1. 应用场景

由于 Runnable 接口中的 run() 方法无返回值,当我们需要执行的任务需要返回一个对象的话, Runnable 接口虽然可以用过定义实例变量来完成同样的效果,但并非是最佳的选择。此时,应该定义实现了 Callable 接口的类,借助于Java的线程池来达到我们的目的。需要注意的是,这里只能调用ExecutorService对象的 submit(Callable callable) 方法,该方法将返回一个 Future 对象,通过调用该对象的get()方法拿到我们需要的返回值。

/**
* IntegerSquare类用于计算并返回输入的整数的平方
*/
public class IntegerSquare implements Callable<Integer> {
  private int num; //进行平方运算的数
  
  public IntegerSqure (int num) {
    this.num = num;
  }
  
  @Override
  public Integer call() throws Exception {
    return num * num; //返回num的平方
  }
}


/**
* 使用CachedThreadPool进行测试
*/
public class IntegerSqureDemo {
  public static void main (String[] args) {
    ExecutorService threadPool = Executors.newCachedThreadPool();
    List<Future<Integer>> resultList = new ArrayList<>();
    for (int i = 0; i < 100; ++i) {
      Future<Integer> result = threadPool.submit(new IntegerSquare(i)); //submit方法返回Future对象
      resultList.add(result);
    }
    for (Future<Integer> result : resultList) {
      try {
        System.out.println(result.get()); //调用Future对象的get()方法拿到任务的返回值
      } catch (InterruptedException e) {
        System.out.println("InterruptedException");
      } catch (ExecutionException e) {
        System.out.println("ExecutionException");
      }
    }
    threadPool.shutdown();
  }
}

2. 源码分析

为了了解利用 Callable 对象和线程池拿到返回值的过程,我对源代码进行了阅读。

  • AbstractExecutorService.java
//AbstractExecutorService抽象类实现了ExecutorService接口,后者有submit方法
    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

可以看到,该方法返回了一个 Future 对象 ftask ,而这个 ftask 又是一个 RunnableFuture 的对象,那么我们先看看 RunnableFuture 这个接口的定义。

  • RunnableFuture.java
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

可以看出,该接口继承了 RunnableFuture 接口,所以 submit 方法才可以返回一个 Future 的对象。进一步我们观察一下 Future 接口的定义方式。

  • Future.java
public interface Future<V> {
   /**
   * 尝试取消任务的执行。如果这个任务已经完成执行或者已经被取消,尝试失败。
   * 当任务尚未开始执行,调用call方法并且成功,那么这个任务将不会再被执行。
   * 当任务已经开始,参数mayInterruptIfRunning将决定是否打算相关线程以达到取消任务的目的
   */
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * 任务在完成执行之前被取消,返回true
     */
    boolean isCancelled();

    /**
    * 任务正常完成返回true
    */
    boolean isDone();

    /**
    * 等待计算结束,拿到返回值并且返回
    */
    V get() throws InterruptedException, ExecutionException;

    /**
    * 设定等待计算的timeout,如果timeout之前能拿到返回值则返回这个值,如果超时抛出TimeoutException
    * 异常
    */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

从源码可以看出, Future 对象可以实现取消任务、拿到任务返回值的功能,支持我们在主线程中记录 Future 对象并通过该对象拿到返回值或者异常。接下来我们看看 RunnableFuture 接口的一个实现类 FutureTask ,这个类也是 AbstractExecutorService 类的 submit(Callable<T> task) 方法中调用 newTaskFor(Callable<T> callable) 所返回的具体实现类。这里我们选取主要的成员变量和方法进行分析。

  • FutureTask
public class FutureTask<V> implements RunnableFuture<V> {
    /** 传入的Callable对象 */
    private Callable<V> callable;
    /** 返回值 */
    private Object outcome; // non-volatile, protected by state reads/writes

  
    /**
    * 该方法被get方法所调用,作用是将成员变量Object类型的outcome进行类型转换并返回。
    */
    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

    /** 使用Callable对象作为参数的构造函数 */
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    /**
    * 这就是我们在主线程中通过Future对象调用的用来拿到返回值的get方法
    */
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s); //通过report方法最终拿到需要的返回值
    }
  
    /** 计算得到返回值赋给成员变量outcome (Object类型),在run方法中被调用 */
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

    /**
    * 这个run()方法作为接口被ExecutorService的execute方法所调用,与runnable对象的run方法作用相同。
    * 执行过程是,调用callable的call方法,拿到返回值,并且调用set方法将返回值赋给成员变量outcome。
    */
    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
}

上述源码并非 FutureTask 类的全部,我仅仅选取了我感兴趣的部分进行记录,主要是帮助自己有一个对整个调用过程有个初步的理解。

3. 总结

  • 针对多线程任务中,任务有返回值的情形,我们应当创建实现 Callable 接口的类,利用线程池及其 submit 方法来完成。
  • submit 方法将 Callable 对象封装成一个同时实现 Runnable 接口和 Future 接口的 RunnableFuture 对象,对外也就是主线程,暴露其 Future 接口以拿到返回值。对内暴露两种接口,通过 run 方法对 callable 对象的 call 方法进行调用,计算并拿到返回值。
  • 当前是笔者盘JDK源码的初级阶段,文中如有错误之处,欢迎各位看官批评指正。
原文  https://segmentfault.com/a/1190000022262679
正文到此结束
Loading...