堕落的人生啊……
偶然间看到这个问题,对于标配 jdk1.8
的我们是不是分分钟拍出答案?
答曰:简单, Callable
,完美解决,下一题……
可是,身处 jdk1.4
(甚至更早)的前辈们,要怎么做才能拿到线程返回值呢?或者说,禁用 Callable
技能,怎么获取线程返回值?
嗯,这似乎是线程间通信的问题;只有 Runnable
作为武器,有些麻烦,接受挑战!
// 任务Task class Task implements Runnable{ @Getter Object result; //返回值 @Override public void run() { try { // 模拟某耗时逻辑 System.out.println(String.format("[%s] 执行中..",Thread.currentThread().getName())); TimeUnit.SECONDS.sleep(2L); } catch (InterruptedException e) { e.printStackTrace(); } // 计算得到最终结果 result = Integer.valueOf(9987); System.out.println(String.format("[%s] 执行完毕..",Thread.currentThread().getName())); } }
public static void main(String[] args) { // 使用了内部类,采用如下方式new CallbackTest callbackTest = new CallbackTest(); Task task = callbackTest.new Task(); final String threadName = "T-1"; Thread thread = new Thread(task,threadName); thread.start(); }
好, T-1
线程启动了,看样子能很好的执行任务,问题是 main方法
中怎么获取到 Task
的返回值 result
呢?
以目前的代码运行,效果绝对是T-1线程单飞,和main线程没啥联系。
我有一项能力,总是能第一时间相当最简易的方法。
public static void main(String[] args) { CallbackTest callbackTest = new CallbackTest(); Task task = callbackTest.new Task(); final String threadName = "T-1"; Thread thread = new Thread(task,threadName); thread.start(); // main线程频繁检查T-1线程 while (true){ if(task.getResult()!=null){ System.out.println(String.format("结果 task=%s",task.getResult())); break; } // 让cpu稍微冷静一下 TimeUnit.MILLISECONDS.sleep(200L); System.out.println(String.format("[main] 勤劳检查result中(result=%s)",task.getResult())); } }
运行效果,可能是这样的:
[T-1] 执行中.. [main] 勤劳检查result中(result=null) [main] 勤劳检查result中(result=null) [main] 勤劳检查result中(result=null) [main] 勤劳检查result中(result=null) [main] 勤劳检查result中(result=null) [main] 勤劳检查result中(result=null) [main] 勤劳检查result中(result=null) [main] 勤劳检查result中(result=null) [main] 勤劳检查result中(result=null) [T-1] 执行完毕.. [main] 勤劳检查result中(result=9987) 结果 task=9987
虽然已加入了对cpu而言人性化的休眠方法(sleep),但这依然不是个很好的方案。该方案极大的操劳了main线程,需要一遍遍的检查子线程的运行情况——子线程是否将最终结果赋值。
那有没有一种方式,可以在T-1运行完之后,告诉main线程呢?
作为一个老派(技术陈旧)的程序员,我首先想到的是 wait..notify组合
public static void main(String[] args){ CallbackTest callbackTest = new CallbackTest(); Task task = callbackTest.new Task(); final String threadName = "T-1"; Thread thread = new Thread(task,threadName); thread.start(); while (true){ //检查result状态,还没有赋值,则等待 if(task.getResult()==null){ System.out.println(String.format("[%s] 等待执行..",Thread.currentThread().getName())); synchronized (task){ try { task.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } if(task.getResult()!=null){ System.out.println(String.format("结果 task=%s",task.getResult())); break; } } }
class Task implements Runnable{ @Getter Object result; @Override public void run() { try { System.out.println(String.format("[%s] 执行中..",Thread.currentThread().getName())); TimeUnit.SECONDS.sleep(2L); } catch (InterruptedException e) { e.printStackTrace(); } result = Integer.valueOf(9987); //唤醒wait的对象 synchronized (this){ this.notify(); } System.out.println(String.format("[%s] 执行完毕..",Thread.currentThread().getName())); } }
改造后,执行效果如下:
[T-1] 执行中.. [main] 等待执行.. [T-1] 执行完毕.. 结果 task=9987
其实也可以使用 LockSupport
实现,和 wait / notify 类似,直接贴出完整代码吧:
public class CallbackTest { class Task implements Runnable{ @Getter Object result; // 构造函数传入调用线程(main线程) Thread runner; Task(Thread runner){ this.runner = runner; } @Override public void run() { try { System.out.println(String.format("[%s] 执行中..",Thread.currentThread().getName())); TimeUnit.SECONDS.sleep(2L); } catch (InterruptedException e) { e.printStackTrace(); } result = Integer.valueOf(9987); //唤醒main线程 synchronized (this){ LockSupport.unpark(runner); } System.out.println(String.format("[%s] 执行完毕..",Thread.currentThread().getName())); } } public static void main(String[] args) { CallbackTest callbackTest = new CallbackTest(); Task task = callbackTest.new Task(Thread.currentThread()); final String threadName = "T-1"; Thread thread = new Thread(task,threadName); thread.start(); while (true){ if(task.getResult()==null){ System.out.println(String.format("[%s] 等待执行..",Thread.currentThread().getName())); LockSupport.park(); //main线程阻塞 } if(task.getResult()!=null){ System.out.println(String.format("结果 task=%s",task.getResult())); break; } } } }
至此,我们相当于可以用自己的方式获取到Thread的返回值了,此时回顾下文章开始初的解答:
偶然间看到这个问题,对于标配`jdk1.8`的我们是不是分分钟拍出答案? 答曰:简单,`Callable`,完美解决,下一题……
当时很自然的就回答了 Callable
,先看看它是怎么用的。
public class CallbackTest { class Task implements Callable<Object> { @Override public Object call() { try { // 某耗时逻辑 System.out.println(String.format("[%s] 执行中..",Thread.currentThread().getName())); TimeUnit.SECONDS.sleep(2L); } catch (InterruptedException e) { e.printStackTrace(); } return Integer.valueOf(9987); } } public static void main(String[] args) throws ExecutionException, InterruptedException { CallbackTest callbackTest = new CallbackTest(); Task task = callbackTest.new Task(); ExecutorService es = Executors.newSingleThreadExecutor(); Future<Object> future = es.submit(task); System.out.println("结果:"+future.get()); es.shutdown(); } }
代码并不复杂,demo 中获取返回值的方式是 future.get()
,这是一个阻塞方法;在子线程执行完(return)之前会一直阻塞。没用过的开发兄弟(姐妹?)们自行科普吧,不多解释了。
Callable源码如下:
@FunctionalInterface public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
Callable本身就一接口,没什么玄机,玄机在 Future 或者说 FutureTask
上。
重头戏来了,看看源码是怎么实现从其它线程获取返回值的。
先瞧瞧FutureTask的江湖地位:
可以看出, FutureTask
是 Future接口
和 Runnable接口
的实现类 。
此事留个大概印象,我们来看下 FutureTask
是怎么和 Callable
关联上的?
(可对照下文,追下源码;如果实在不理解,可直接跳到本章节末尾结论处)
ExecutorService es = Executors.newSingleThreadExecutor(); Future<Object> future = es.submit(task);
以例子中的ExecutorService的submit方法作为入口,实际的实现方法为 AbstractExecutorService
的 submit
:
/* `AbstractExecutorService` */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); //注释1-构建了FutureTask execute(ftask); //注释2-最终会调用ftask的run方法,也就是调用`步骤1构建的FutureTask对象的run方法 return ftask; } ... protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); //1.1-调用FutureTask的构造函数 }
FutureTask
的构造函数: // callable是FutureTask的成员变量 private Callable<V> callable; public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; //为成员变量赋值 this.state = NEW; // ensure visibility of callable }
结论1
:经ExecutorService的穿针引线, Callable会最终赋值给FutureTask的成员变量
/* `AbstractExecutorService`的`submit` */ public <T> Future<T> submit(Callable<T> task) { ... execute(ftask); //注释2-最终会调用ftask的run方法,也就是调用`步骤1构建的FutureTask对象的run方法 ... } ↓↓↓↓↓ ↓↓↓↓↓ /* ThreadPoolExecutor的execute */ public void execute(Runnable command) { ... addWorker(null, false); //添加到workder中 ... } ↓↓↓↓↓ ↓↓↓↓↓ /* ThreadPoolExecutor的addWorker */ private boolean addWorker(Runnable firstTask, boolean core) { w = new Worker(firstTask); //`Worker`封装 final Thread t = w.thread; ... t.start(); //注释3-worker中的thread执行start方法,会调用对应Runnable的run方法 ... } ↓↓↓↓↓ ↓↓↓↓↓ /* 内部类`Work` */ final Thread thread; // 成员变量 Runnable firstTask; // 成员变量 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //赋值成员变量thread this.thread = getThreadFactory().newThread(this); //创建新线程,并赋值成员变量firstTask } // 3.1-`注释3`处的start,会执行此处的run方法,进而会调用runWorker方法 public void run() { runWorker(this); } final void runWorker(Worker w) { ... Runnable task = w.firstTask; ... task.run(); //##### 注意了,最终会调用到此处 ##### ... }
task.run()
中的task又是什么呢,就是在最开始赋值的FutureTask( 注释1 处),看下它的run方法
public void run() { ... Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); //会调用callable的call方法,这个方法中就是我们自己定义的逻辑 ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); //注释4-赋值动作 } ... }
结论2
:调用过程,经过一系列周转, 最终会调用Callable的call方法 (也就是我们的自定义逻辑)
//成员变量 private Object outcome; protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; //赋值给成员变量 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
结论就是, FutureTask包装了Callable,执行期call方法后将返回值赋值给成员变量
接下来探索下返回值的获取,即 Future.get()
的实现。
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) // 1-未完成状态,线程阻塞 s = awaitDone(false, 0L); return report(s); // 2-已完成状态,直接获取 } // 1.1-阻塞 private int awaitDone(boolean timed, long nanos) throws InterruptedException { ... LockSupport.park(this); //阻塞 ... // 2.1-返回了outcome private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
get() 的逻辑并不复杂:
outcome outcome
怎么样,是不是有点似曾相识?这和我们自己实现的那一版的逻辑是一致的!
再次看下set() 方法,找找里面的 LockSupport.unpark(Thread t)
方法,作为证据。
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); //这里看上去很可疑 } } private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (WAITERS.weakCompareAndSet(this, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); //吼吼吼,抓到你了,果然有LockSupport.unpark(t) } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
果然找到了park方法对应的unpark,证明我们的推断是正确的—— FutureTask
的核心实现思路 ,与我们自己的实现方式是一致的(尤其LockSupport版本), 即子线程未完成时阻塞,已完成时释放。
主逻辑分析完了,再来两个开胃甜点。
对比自己和源码的实现,都用LockSupport,使用的阻塞方法却不相同—— park()
vs park(Object blocker)
差别在哪?引用官方文档的解释:
The three forms of park each also support a blocker object parameter. This object is recorded while the thread is blocked to permit monitoring and diagnostic tools to identify the reasons that threads are blocked. 当线程被阻塞时记录此对象,以允许监视和诊断工具识别线程被阻塞的原因。 (Such tools may access blockers using method getBlocker(java.lang.Thread).) The use of these forms rather than the original forms without this parameter is strongly encouraged. 待有参数的park(Object blocker)是被强烈推荐的 The normal argument to supply as a blocker within a lock implementation is this.
按文档中的意思: 传入的blocker对象,相当于一个标志对象,线程阻塞时会记录下来。 下面的例子能明显看出差别
举例说明:(转自 https://www.jianshu.com/p/835... )
private static void parkVsParkBlocker() { Thread t1 = new Thread(() -> { LockSupport.park(); }, "t1"); t1.start(); Object blocker = new Object(); Thread t2 = new Thread(() -> { LockSupport.park(blocker); }, "t2"); t2.start(); LockSupport.getBlocker(t2); unpark(t1, 60); unpark(t2, 60); }
Print java stack trace of a given jvm process.
jstack jps -l | grep LockSupport | awk '{print $1}'
FutureTask作为抽象出的工具类,考虑了多线程环境下的get()的情况,这不部分在前文故意忽略了。
而并发环境下的数据统一,主要靠下面几个 volatile
关键字+ CAS
来达成。(经典模式)
// 状态,记录子线程执行情况 private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; // 记录子线程,运行Callable.call()的线程 private volatile Thread runner; // 等待节点,链表 private volatile WaitNode waiters;
volatile关键字,此处主要用于让其它线程可见(可见性);那 CAS
( C ompare A nd S weep)是做什么的?
本质上,它就是个乐观锁:
return true
return false
jdk 9
之前,主要靠Unsafe; jdk 9
开始,推出了 VarHandle
, 旨在替代 AtomicXX ,以及方便开发人员使用Unsafe的部分权能 。
以状态state的变更为例:
private volatile int state; /* 声明和赋值 */ private static final VarHandle STATE; static{ try { MethodHandles.Lookup l = MethodHandles.lookup(); //1 - 通过MethodHandles.lookup()声明MethodHandles.Lookup对象 STATE = l.findVarHandle(FutureTask.class, "state", int.class); //2 - 赋值VarHandle STATE,此时STATE和state就建立了某种联系 } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } } /* 调用 */ protected void set(V v) { if (STATE.compareAndSet(this, NEW, COMPLETING)) { // 3 - 当前对象,将state变量,由NEW改成COMPLETING outcome = v; STATE.setRelease(this, NORMAL); // final state finishCompletion(); } }