文章异常啰嗦且绕弯。
JDK 版本 : OpenJDK 11.0.1
IDE : idea 2018.3
FutureTask 是 jdk 中默认的 Future 实现类,常与 Callable 结合进行多线程并发操作。
import java.util.concurrent.*; public class FutureTaskDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { //创建一个线程池 ExecutorService pool = Executors.newFixedThreadPool(1); try{ //创建一个要执行的 Callable 对象 //此处其实 Runnable 对象也可以,但是通常不会那样做 Callable<String> task = () -> { //休眠三秒 TimeUnit.SECONDS.sleep(3); //返回一个字符串 return "hello"; }; //用 FutureTask 对象去包装 Callable FutureTask<String> futureTask = new FutureTask<>(task); //此处将 FutureTask 对象丢进线程池里 pool.submit(futureTask); //注意,此处的 futureTask 本质上是作为 Runnable 被丢进池子里的 //所以也可以用线程池的 execute(...) 方法 //pool.execute(futureTask) //还有一种更常见的执行方式是直接使用 Thread //new Thread(futureTask).start(); //获取结果 //注意,如果没有获取到的话此处会阻塞线程直到获取到为止 String result = futureTask.get(); //还有一种限时策略的结果获取 //超时的情况下会抛出异常 //String result = futureTask.get(1,TimeUnit.SECONDS); System.out.println(result); }finally { //关闭连接池 pool.shutdown(); } } }
回到 Demo 中的创建代码:
FutureTask<String> futureTask = new FutureTask<>(task);
追踪 FutureTask 的构造器:
//FutureTask.class public FutureTask(Callable<V> callable) { //有效性判断,不能为空 if (callable == null) throw new NullPointerException(); //记录下 callable 对象 this.callable = callable; //state 是一个 int 类型的对象,是一个 //NEW = 0 this.state = NEW; }
FutureTask 本身是 Runnable 的子类,其在被 ThreadPoolExecutor 或者 Thread 对象消费的时候也是被当做 Runnable 的实现类的。
所以其本身的核心逻辑就必然在 run() 方法中:
//FutureTask.class public void run() { //先判断状态,如果状态不是 NEW 就会直接返回 //RUNNER 是一个 VarHandler 类型的变量,指向了 FutureTask 中的 thread 变量,用于储存当前的线程 //但是如果 thread 已经不为 null,此处也会直接返回 //这两种返回条件都意味着此 FutureTask 的 run() 方法已经执行过了 if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread())) return; try { //获取 callable Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //执行 callable 的业务逻辑 result = c.call(); //ran 为成功标识 ran = true; } catch (Throwable ex) { //出错的情况下 result = null; ran = false; //不成功的情况下存入 exception setException(ex); } //如果成功的话会在此处进行操作 if (ran) set(result); } } finally { //置空 runner = null; int s = state; if (s >= INTERRUPTING) //如果此 FutreTask 的状态是中断状态,会在此处不断调用 Thread.yield() 空转 handlePossibleCancellationInterrupt(s); } }
此处有两个关键方法,即为 setException(...) 和 set(...):
//FutureTask.class protected void setException(Throwable t) { //用 CAS 操作比较并更新状态值 if (STATE.compareAndSet(this, NEW, COMPLETING)) { //outcome 是一个 Object 对象,用于存储 callable 的返回值 //此处由于报错了,所以储存的是错误对象 outcome = t; //EXCEPTIONAL = 3 STATE.setRelease(this, EXCEPTIONAL); //最后清理工作,主要用于唤醒等待线程和执行 callable finishCompletion(); } } //FutureTask.class protected void set(V v) { //基本逻辑和 setException(...) 方法雷同,只是 STATE 和 outcome 的储存值不同 if (STATE.compareAndSet(this, NEW, COMPLETING)) { outcome = v; STATE.setRelease(this, NORMAL); finishCompletion(); } }
再来看 finishCompletion() 方法:
//FutureTask.class private void finishCompletion() { //WaitNode 是 FutureTask 的静态内部类 //其本质上是单向链表的节点表示类,用于存放想要获取 Callable 的返回值但是被阻塞的线程的线程对象 for (WaitNode q; (q = waiters) != null;) { //此处使用 CAS 将 q 从 WAITERS 里去除 if (WAITERS.weakCompareAndSet(this, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { //此处置空线程对象,帮助 GC q.thread = null; //唤醒线程 LockSupport.unpark(t); } //接着往下遍历 WaitNode next = q.next; if (next == null) break; q.next = null; q = next; } break; } } //此方法是空的 done(); //置空 callable callable = null; }
之前提到过在 FutureTask 的 get(...) 方法中会阻塞线程,直到 Callable 执行完毕并能够获取返回值的时候才会结束阻塞。
所以 finishCompletion() 方法的主体其实就是去唤醒被阻塞的线程。
回到 Demo 中的创建代码:
String result = futureTask.get();
追踪 get() 方法:
//step 1 //FutureTask.class public V get() throws InterruptedException, ExecutionException { //此处先判断状态值,如果非 COMPLETING,即为还没完成,就会调用 awaitDone(...) 方法阻塞线程 int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); //返回结果 return report(s); } //step 2 //FutureTask.class private V report(int s) throws ExecutionException { //获取需要返回的对象 Object x = outcome; //如果是正常结束的就直接返回对象即可 if (s == NORMAL) return (V)x; //出错的情况下,抛异常 if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
再来看一下阻塞线程的 awaitDone(...) 方法:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { //循环的次数 long startTime = 0L; //节点对象 WaitNode q = null; //链表队列标识,代表该线程是否被加入链表中,初始为 false 代表未被加入 boolean queued = false; for (;;) { int s = state; if (s > COMPLETING) { //如果 Callable 的执行已经完成 if (q != null) q.thread = null; return s; }else if (s == COMPLETING) //Callable 的执行刚刚完成,后续工作还没做 Thread.yield(); else if (Thread.interrupted()) { //线程被中断了,会抛出错误 removeWaiter(q); throw new InterruptedException(); } else if (q == null) { //进入此处的判断证明 Callable 还未完成,所以会创建等待节点 //此处的 timed 传入为 false,不会在此返回 if (timed && nanos <= 0L) return s; q = new WaitNode(); //新建节点 }else if (!queued) //queued 初始为 false,进入此处的时候会将上一个判断条件中新建的 q 加入到链表的首节点中 //并且 queued 变成 true queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q); else if (timed) { //如果此操作是限时的,那么这里需要判断时间 final long parkNanos; if (startTime == 0L) { startTime = System.nanoTime(); if (startTime == 0L) startTime = 1L; parkNanos = nanos; } else { long elapsed = System.nanoTime() - startTime; if (elapsed >= nanos) { removeWaiter(q); return state; } parkNanos = nanos - elapsed; } if (state < COMPLETING) //此处挂起线程,时间为 parkNanos //本例中传入为 0L,所以是永久挂起 LockSupport.parkNanos(this, parkNanos); }else //永久挂起线程 LockSupport.park(this); } }
FutureTask 和 ThreadLocal 一样,都是 java.util.current 包中的小工具,封装不复杂,理解即可。
本文仅为个人的学习笔记,可能存在错误或者表述不清的地方,有缘补充