new Thread(()-> doXX() ).start(); 复制代码
private static int result; public static void main(String[] args) throws InterruptedException { new Thread(() -> { System.out.println("处理业务逻辑"); result = 1000; }).start(); Thread.sleep(1000); System.out.println(result); } 复制代码
while(result == null){ } 复制代码
public static void main(String[] args) throws InterruptedException { CallableThread callableThread = new CallableThread(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "ccccc"; }); callableThread.start(); System.out.println("开始时间 " + LocalDateTime.now()); System.out.println(callableThread.get()); System.out.println("结束时间 " + LocalDateTime.now()); } class CallableThread<T> extends Thread { private Task<T> task; private T result; private volatile boolean finished = false; public CallableThread(Task<T> task) { this.task = task; } @Override public void run() { synchronized (this) { result = task.call(); finished = true; notifyAll(); } } public T get() { synchronized (this) { while (!finished) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return result; } } } @FunctionalInterface interface Task<T> { T call(); } 复制代码
public static void main(String[] args) throws InterruptedException { MyRunnable<String> myRunnable = new MyRunnable(() -> { // 模拟耗时的业务操作 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "我是结果"; }); System.out.println("开始时间 " + LocalDateTime.now()); new Thread(myRunnable).start(); System.out.println("result: " + myRunnable.get()); System.out.println("结束时间 " + LocalDateTime.now()); } class MyRunnable<T> implements Runnable { private Task<T> task; private T result; private volatile boolean finished = false; public MyRunnable(Task<T> task) { this.task = task; } @Override public void run() { synchronized (this) { result = task.call(); finished = true; notifyAll(); } } public T get() { synchronized (this) { while (!finished) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return result; } } } 复制代码
、 获取任务执行状态
等常用方法的接口 下面看一个例子
public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<String> future = new FutureTask<>(() -> { Thread.sleep(3000); System.out.println(System.currentTimeMillis()); return "hehehh"; }); new Thread(future).start(); System.out.println("Start Get Result : " + System.currentTimeMillis()); System.out.println("Get Result : " + future.get() + System.currentTimeMillis()); } 复制代码
boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; 复制代码
同时实现了 Runnable
和 Future
在 FutureTask
中,任务的不同状态通过 state
/* * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; 复制代码
因为 FutureTask
本身也实现了 Runnable 接口,所以核心关注它的run方法,执行逻辑其实比较简单:
或者通过cas更新 runner
失败,则直接返回 Callable#call
如果执行成功:先将state设置成 COMPLETING
,然后保存返回的结果保存到属性 outcome
,再将state设置成 NORMAL
,最后通过 LockSupport.unpark(t)
如果执行失败:先将state设置成 COMPLETING
,然后异常信息保存到属性 outcome
,再将state设置成 EXCEPTIONAL
,最后通过 LockSupport.unpark(t)
当我们通过 FutureTask#get
方法获取返回值的时候,会阻塞当前线程,那是通过什么方式阻塞当前线程的?是通过 LockSupport
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // state > COMPLETING ,说明任务要么正常执行,要么异常结束,所以这里可以直接返回 if (s > COMPLETING) { if (q != null) q.thread = null; // 这应该是help GC吧? return s; } // 如果正在收尾阶段,交出CPU, 等下次循环 else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); // 通过UNSAFE 设置 waiters else if (!queued) // 将新的`WaitNode`添加到单向链表的头部,waiters即对应头节点 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); // 阻塞当前线程 } } 复制代码
上面我们看到了有一个 waiters
,这是用来干嘛的呢?它是一个单向链表结构,主要是为了处理多次调用 FutureTask#get
的情况,每调用一次 FutureTask#get
就会生成一个 WaitNode
那什么时候用到这个链表呢?在任务执行完成的时候,会执行 finishCompletion
方法,主要就是从头节点依次往下遍历,获取节点的 thread
属性,然后执行 LockSupport.unpark(thread)
相对之前的那种方式来说, FutureTask
已经很好用了,直接通过 FutureTask#get
不过方便是方便,但假如我想在获取返回值之后执行一些其他的逻辑该怎么处理呢?其实我最直接的想法就是回调了。比如,我们可以对上面的 MyRunnable
public MyRunnable addListener(Consumer c) { // 这里是一个例子,肯定不会每次都new一个线程,一般是使用线程池 while (!finished) { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } c.accept(result); }).start(); return this; } 复制代码
我们给 MyRunnable
添加了一个 addListener
方法,接收一个 Consumer
public static void main(String[] args) throws InterruptedException { MyRunnable<String> myRunnable = new MyRunnable(() -> { // 模拟耗时的业务操作 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "我是结果"; }); System.out.println("开始时间 " + LocalDateTime.now()); new Thread(myRunnable).start(); myRunnable.addListener(result -> { System.out.println("当xxx执行完成之后,线程:" + Thread.currentThread().getName() + " 执行一些其他的任务"); result = result + " ggggg"; System.out.println(result); }); } 复制代码
是 guava
包里面的,对 Future
进行了增强, ListenableFuture
继承了 Future
/** * @param listener the listener to run when the computation is complete 回调逻辑 * @param executor the executor to run the listener in 回调在哪个线程池执行 */ void addListener(Runnable listener, Executor executor); 复制代码
继承了 FutureTask
并且是实现了 ListenableFuture
public static void main(String[] args) throws InterruptedException { ListenableFutureTask futureTask = ListenableFutureTask.create(() -> { System.out.println("执行任务开始 " + LocalDateTime.now()); Thread.sleep(3000); System.out.println("执行任务完成 " + LocalDateTime.now()); return "结果"; }); futureTask.addListener(() -> System.out.println("获取结果之后,输出一条日志"), MoreExecutors.directExecutor()); new Thread(futureTask).start(); } 复制代码
原理就是将所有回调维护在一个单向链表中,也就是 ExecutionList
// 每个回调就相当于是一个RunnableExecutorPair节点,所有RunnableExecutorPair节点构成一条链表,头插链表 private final ExecutionList executionList = new ExecutionList(); // ListenableFutureTask#addListener public void addListener(Runnable listener, Executor exec) { executionList.add(listener, exec); } // ExecutionList#add public void add(Runnable runnable, Executor executor) { // 上锁,因为它的内部属性 executed 可能会被任务逻辑线程更新,即 ListenableFutureTask 实现了 FutureTask 的done方法,然后会在里面更新 executed 的值为true // 还有一点,如果不加锁,当多个线程同时添加回调的时候,可能会造成节点丢失 synchronized (this) { // 如果任务还没有执行完成,就将当前节点添加到头节点 if (!executed) { runnables = new RunnableExecutorPair(runnable, executor, runnables); return; } } // 如果任务执行完成,就开始执行回调 executeListener(runnable, executor); } // ExecutionList#executeListener private static void executeListener(Runnable runnable, Executor executor) { try { // 直接将任务交给线程池 executor.execute(runnable); } catch (RuntimeException e) { log.log(Level.SEVERE, "RuntimeException while executing runnable " + runnable + " with executor " + executor, e); } } // ExecutionList.RunnableExecutorPair private static final class RunnableExecutorPair { final Runnable runnable; final Executor executor; @Nullable RunnableExecutorPair next; RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) { this.runnable = runnable; this.executor = executor; this.next = next; } } 复制代码
是怎么知道任务是否执行完成了呢? 在 FutureTask#finishCompletion
方法中,解除阻塞的线程之后,还会执行一个 done
方法,不过该方法在 FutureTask
没有任何逻辑,可以把它当作是一个模板方法,而 ListenableFutureTask
// ListenableFutureTask#done protected void done() { executionList.execute(); } // ExecutionList#execute public void execute() { RunnableExecutorPair list; synchronized (this) { if (executed) { return; } // 首先将executed置为true executed = true; // runnables代表链表的头节点 list = runnables; runnables = null; // allow GC to free listeners even if this stays around for a while. } RunnableExecutorPair reversedList = null; // 这其实是一个倒置的过程,因为我们添加节点的时候,是插入到头部的,为了保证回调按照我们添加时的顺序执行,即 先添加先执行,所以做了一个倒置 while (list != null) { RunnableExecutorPair tmp = list; list = list.next; tmp.next = reversedList; reversedList = tmp; } // 遍历链表,依次执行回调逻辑 while (reversedList != null) { executeListener(reversedList.runnable, reversedList.executor); reversedList = reversedList.next; } } 复制代码
通过 ListenableFutureTask
,我们可以在任务执行完成之后执行一些回调逻辑。可是细心的同学会发现, 回调方法无法使用任务的返回值
,那假如我就是想先获取值然后再用这个返回值做下一步操作怎么办?还是只能先通过get方法阻塞当前线程吗?其实 guava
public static void main(String[] args) throws InterruptedException { ListenableFutureTask futureTask = ListenableFutureTask.create(() -> { System.out.println("执行任务开始 " + LocalDateTime.now()); Thread.sleep(3000); System.out.println("执行任务完成 " + LocalDateTime.now()); return "结果"; }); Futures.addCallback(futureTask, new FutureCallback<String>() { @Override public void onSuccess(String result) { System.out.println("执行成功: " + result); } @Override public void onFailure(Throwable t) { System.out.println("执行失败"); } }); new Thread(futureTask).start(); } 复制代码
void onSuccess(@Nullable V result); void onFailure(Throwable t); 复制代码
// Futures#addCallback public static <V> void addCallback( ListenableFuture<V> future, FutureCallback<? super V> callback) { // 这里使用了DirectExecutor线程池,即直接在当前线程执行 addCallback(future, callback, directExecutor()); } // Futures#addCallback public static <V> void addCallback(final ListenableFuture<V> future, final FutureCallback<? super V> callback, Executor executor) { Runnable callbackListener = new Runnable() { @Override public void run() { final V value; try { value = getDone(future); } catch (ExecutionException e) { callback.onFailure(e.getCause()); return; } catch (RuntimeException e) { callback.onFailure(e); return; } catch (Error e) { callback.onFailure(e); return; } callback.onSuccess(value); } }; // 最终还是将这部分逻辑封装成一个回调,然后在这个回调中获取返回值,根据返回值的结果执行相应的FutureCallback方法 future.addListener(callbackListener, executor); } // Futures#getDone public static <V> V getDone(Future<V> future) throws ExecutionException { checkState(future.isDone(), "Future was expected to be done: %s", future); return getUninterruptibly(future); } public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException { boolean interrupted = false; try { while (true) { try { return future.get(); } catch (InterruptedException e) { interrupted = true; } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } } 复制代码
本质上其实就是将获取返回值的逻辑封装成一个回调,在这个回调中获取返回值,根据返回值的结果执行相应的 FutureCallback
与我们直接通过get方法获取返回值然后再执行其他逻辑还是有区别的,因为我们直接调用 Future#get
方法会阻塞当前线程,而 guava
其实Spring里面也有一个 ListenableFutureTask
,实现上和 guava
大同小异,也是继承了 FutureTask
并且实现了自己的 ListenableFuture
接口,通过重写 FutureTask#done
public static void main(String[] args) { ListenableFutureTask future = new ListenableFutureTask(() -> "结果"); future.addCallback(new ListenableFutureCallback() { @Override public void onSuccess(Object result) { System.out.println("callback " + result); } @Override public void onFailure(Throwable ex) { System.out.println("执行失败 "); } }); new Thread(future).start(); } 复制代码
它的Callback是保存在两个Queue里面的: successCallbacks
, failureCallbacks
,数据结构是 LinkedList
private final Queue<SuccessCallback<? super T>> successCallbacks = new LinkedList<SuccessCallback<? super T>>(); private final Queue<FailureCallback> failureCallbacks = new LinkedList<FailureCallback>(); 复制代码
protected void done() { Throwable cause; try { T result = get(); this.callbacks.success(result); return; }catch (InterruptedException ex) { Thread.currentThread().interrupt(); return; }catch (ExecutionException ex) { cause = ex.getCause(); if (cause == null) { cause = ex; } } catch (Throwable ex) { cause = ex; } this.callbacks.failure(cause); } 复制代码