首先,这篇文章写的都是一些比较基础的内容,也就是从API层面解释一下我们平时用的比较多的东西,其实我倒是也想写点底层的东西,可是我也不懂啊。虽然比较基础,但可能却比较容易忽略吧,如果有不对的地方,希望大佬嘴下留情。
在Java中使用多线程,本质上还是对Thread对象的操作。线程池只是为了方便对线程的管理,避免频繁的创建和销毁线程带来不必要的系统开销,内部通过指定的线程数和阻塞队列实现。
创建一个Thread对象的时候一般会传递一个Runnable对象,任务逻辑就写在Runnable的run方法中。感觉这个Runnable的名字取得不太好,如果叫Task是不是会更好一些呢?
new Thread(()-> doXX() ).start(); 复制代码
上面的那种方式使用起来是挺简单,但会遇到一些问题,比如:能获取返回值不?
像上面这样是没办法获取返回值的,所以我们需要做一些处理,比如,将结果赋值给一个全局变量
private static int result; public static void main(String[] args) throws InterruptedException { new Thread(() -> { System.out.println("处理业务逻辑"); result = 1000; }).start(); Thread.sleep(1000); System.out.println(result); } 复制代码
result
就是一个全局变量,当任务执行完成之后,更新这个值。这其实都不能算是返回值,但有时候也能用:不需要立即知道任务的执行结果,在访问全部变量的时候,只需要获取它的值就好了。比如通过定时任务去更新缓存,不需要关注任务什么时候执行完成,我需要的只是缓存的值,任务执行了就获取最新的值,没有执行就获取旧值。
那假如我就是想现在获取返回值咋办?因为我要用这个返回值作为下面逻辑的输入。那或许可以通过轮询的方式检测全局变量来达到目的?
while(result == null){ } 复制代码
除了白白浪费CPU,好像也行啊?但我现在考虑的只是两个线程,如果有多个线程该对全局变量修改该怎么办呢?那用ThreadLocal?算了,就此打住吧
或许可以封装一下?再封装之前,先考虑几个问题
Task
public static void main(String[] args) throws InterruptedException { CallableThread callableThread = new CallableThread(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "ccccc"; }); callableThread.start(); System.out.println("开始时间 " + LocalDateTime.now()); System.out.println(callableThread.get()); System.out.println("结束时间 " + LocalDateTime.now()); } class CallableThread<T> extends Thread { private Task<T> task; private T result; private volatile boolean finished = false; public CallableThread(Task<T> task) { this.task = task; } @Override public void run() { synchronized (this) { result = task.call(); finished = true; notifyAll(); } } public T get() { synchronized (this) { while (!finished) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return result; } } } @FunctionalInterface interface Task<T> { T call(); } 复制代码
这样貌似也可以,但是不太好。Thread本来只是用于处理和线程相关的事情,现在将它和逻辑(Task)绑定在一起,如果有多个任务想共用一个Thread,那返回值怎么处理?
是否可以将这部分逻辑抽出来,放到一个新类当中?
public static void main(String[] args) throws InterruptedException { MyRunnable<String> myRunnable = new MyRunnable(() -> { // 模拟耗时的业务操作 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "我是结果"; }); System.out.println("开始时间 " + LocalDateTime.now()); new Thread(myRunnable).start(); System.out.println("result: " + myRunnable.get()); System.out.println("结束时间 " + LocalDateTime.now()); } class MyRunnable<T> implements Runnable { private Task<T> task; private T result; private volatile boolean finished = false; public MyRunnable(Task<T> task) { this.task = task; } @Override public void run() { synchronized (this) { result = task.call(); finished = true; notifyAll(); } } public T get() { synchronized (this) { while (!finished) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } return result; } } } 复制代码
这不是和java里面的Future有点像吗?确实有点像
Future
里面有几个比较核心的概念
获取任务返回值
、 获取任务执行状态
等常用方法的接口 下面看一个例子
public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<String> future = new FutureTask<>(() -> { Thread.sleep(3000); System.out.println(System.currentTimeMillis()); return "hehehh"; }); new Thread(future).start(); System.out.println("Start Get Result : " + System.currentTimeMillis()); System.out.println("Get Result : " + future.get() + System.currentTimeMillis()); } 复制代码
Future
接口除了提供获取返回值的接口,还提供了一些其他的接口,根据名字大概也可以猜到什么意思,不过多解释了。实在不行看看源码吧,这样子就很愉快了。
boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; 复制代码
FutureTask
同时实现了 Runnable
和 Future
接口,
在 FutureTask
中,任务的不同状态通过 state
变量来表示,状态有以下几种:
/* * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; 复制代码
因为 FutureTask
本身也实现了 Runnable 接口,所以核心关注它的run方法,执行逻辑其实比较简单:
NEW
或者通过cas更新 runner
失败,则直接返回 Callable#call
方法,根据执行结果,设置状态,
如果执行成功:先将state设置成 COMPLETING
,然后保存返回的结果保存到属性 outcome
,再将state设置成 NORMAL
,最后通过 LockSupport.unpark(t)
解除阻塞的线程;
如果执行失败:先将state设置成 COMPLETING
,然后异常信息保存到属性 outcome
,再将state设置成 EXCEPTIONAL
,最后通过 LockSupport.unpark(t)
解除阻塞的线程;
当我们通过 FutureTask#get
方法获取返回值的时候,会阻塞当前线程,那是通过什么方式阻塞当前线程的?是通过 LockSupport
阻塞的,这个推荐看看博客吧。我也是看博客的,自己也解释的没人家好,嗯,就是这样的
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // state > COMPLETING ,说明任务要么正常执行,要么异常结束,所以这里可以直接返回 if (s > COMPLETING) { if (q != null) q.thread = null; // 这应该是help GC吧? return s; } // 如果正在收尾阶段,交出CPU, 等下次循环 else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); // 通过UNSAFE 设置 waiters else if (!queued) // 将新的`WaitNode`添加到单向链表的头部,waiters即对应头节点 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); // 阻塞当前线程 } } 复制代码
上面我们看到了有一个 waiters
,这是用来干嘛的呢?它是一个单向链表结构,主要是为了处理多次调用 FutureTask#get
的情况,每调用一次 FutureTask#get
就会生成一个 WaitNode
节点,然后将它添加到单向链表的头部
那什么时候用到这个链表呢?在任务执行完成的时候,会执行 finishCompletion
方法,主要就是从头节点依次往下遍历,获取节点的 thread
属性,然后执行 LockSupport.unpark(thread)
解除阻塞
相对之前的那种方式来说, FutureTask
已经很好用了,直接通过 FutureTask#get
方法就可以获取返回值了,确实蛮方便的。
不过方便是方便,但假如我想在获取返回值之后执行一些其他的逻辑该怎么处理呢?其实我最直接的想法就是回调了。比如,我们可以对上面的 MyRunnable
代码再扩展一下,例如
public MyRunnable addListener(Consumer c) { // 这里是一个例子,肯定不会每次都new一个线程,一般是使用线程池 while (!finished) { try { Thread.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } c.accept(result); }).start(); return this; } 复制代码
我们给 MyRunnable
添加了一个 addListener
方法,接收一个 Consumer
作为入参,当任务执行完成之后就执行这段逻辑,如下:
public static void main(String[] args) throws InterruptedException { MyRunnable<String> myRunnable = new MyRunnable(() -> { // 模拟耗时的业务操作 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "我是结果"; }); System.out.println("开始时间 " + LocalDateTime.now()); new Thread(myRunnable).start(); myRunnable.addListener(result -> { System.out.println("当xxx执行完成之后,线程:" + Thread.currentThread().getName() + " 执行一些其他的任务"); result = result + " ggggg"; System.out.println(result); }); } 复制代码
ListenableFuture
是 guava
包里面的,对 Future
进行了增强, ListenableFuture
继承了 Future
,新增了一个添加回调的方法
/** * @param listener the listener to run when the computation is complete 回调逻辑 * @param executor the executor to run the listener in 回调在哪个线程池执行 */ void addListener(Runnable listener, Executor executor); 复制代码
ListenableFutureTask
继承了 FutureTask
并且是实现了 ListenableFuture
接口,看一个简单例子
public static void main(String[] args) throws InterruptedException { ListenableFutureTask futureTask = ListenableFutureTask.create(() -> { System.out.println("执行任务开始 " + LocalDateTime.now()); Thread.sleep(3000); System.out.println("执行任务完成 " + LocalDateTime.now()); return "结果"; }); futureTask.addListener(() -> System.out.println("获取结果之后,输出一条日志"), MoreExecutors.directExecutor()); new Thread(futureTask).start(); } 复制代码
原理就是将所有回调维护在一个单向链表中,也就是 ExecutionList
,然后通过重写``FutureTask#done`方法,在任务完成之后执行回调逻辑
// 每个回调就相当于是一个RunnableExecutorPair节点,所有RunnableExecutorPair节点构成一条链表,头插链表 private final ExecutionList executionList = new ExecutionList(); // ListenableFutureTask#addListener public void addListener(Runnable listener, Executor exec) { executionList.add(listener, exec); } // ExecutionList#add public void add(Runnable runnable, Executor executor) { // 上锁,因为它的内部属性 executed 可能会被任务逻辑线程更新,即 ListenableFutureTask 实现了 FutureTask 的done方法,然后会在里面更新 executed 的值为true // 还有一点,如果不加锁,当多个线程同时添加回调的时候,可能会造成节点丢失 synchronized (this) { // 如果任务还没有执行完成,就将当前节点添加到头节点 if (!executed) { runnables = new RunnableExecutorPair(runnable, executor, runnables); return; } } // 如果任务执行完成,就开始执行回调 executeListener(runnable, executor); } // ExecutionList#executeListener private static void executeListener(Runnable runnable, Executor executor) { try { // 直接将任务交给线程池 executor.execute(runnable); } catch (RuntimeException e) { log.log(Level.SEVERE, "RuntimeException while executing runnable " + runnable + " with executor " + executor, e); } } // ExecutionList.RunnableExecutorPair private static final class RunnableExecutorPair { final Runnable runnable; final Executor executor; @Nullable RunnableExecutorPair next; RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) { this.runnable = runnable; this.executor = executor; this.next = next; } } 复制代码
ListenableFutureTask
是怎么知道任务是否执行完成了呢? 在 FutureTask#finishCompletion
方法中,解除阻塞的线程之后,还会执行一个 done
方法,不过该方法在 FutureTask
没有任何逻辑,可以把它当作是一个模板方法,而 ListenableFutureTask
实现了该方法,如下:
// ListenableFutureTask#done protected void done() { executionList.execute(); } // ExecutionList#execute public void execute() { RunnableExecutorPair list; synchronized (this) { if (executed) { return; } // 首先将executed置为true executed = true; // runnables代表链表的头节点 list = runnables; runnables = null; // allow GC to free listeners even if this stays around for a while. } RunnableExecutorPair reversedList = null; // 这其实是一个倒置的过程,因为我们添加节点的时候,是插入到头部的,为了保证回调按照我们添加时的顺序执行,即 先添加先执行,所以做了一个倒置 while (list != null) { RunnableExecutorPair tmp = list; list = list.next; tmp.next = reversedList; reversedList = tmp; } // 遍历链表,依次执行回调逻辑 while (reversedList != null) { executeListener(reversedList.runnable, reversedList.executor); reversedList = reversedList.next; } } 复制代码
通过 ListenableFutureTask
,我们可以在任务执行完成之后执行一些回调逻辑。可是细心的同学会发现, 回调方法无法使用任务的返回值
,那假如我就是想先获取值然后再用这个返回值做下一步操作怎么办?还是只能先通过get方法阻塞当前线程吗?其实 guava
包中也给了我们相关的接口。先看一个例子:
public static void main(String[] args) throws InterruptedException { ListenableFutureTask futureTask = ListenableFutureTask.create(() -> { System.out.println("执行任务开始 " + LocalDateTime.now()); Thread.sleep(3000); System.out.println("执行任务完成 " + LocalDateTime.now()); return "结果"; }); Futures.addCallback(futureTask, new FutureCallback<String>() { @Override public void onSuccess(String result) { System.out.println("执行成功: " + result); } @Override public void onFailure(Throwable t) { System.out.println("执行失败"); } }); new Thread(futureTask).start(); } 复制代码
FutureCallback
接口里面有两个方法,分别对应任务执行成功逻辑和任务失败逻辑
void onSuccess(@Nullable V result); void onFailure(Throwable t); 复制代码
Futures
可以堪称是一个门面类,里面封装了一些操作
// Futures#addCallback public static <V> void addCallback( ListenableFuture<V> future, FutureCallback<? super V> callback) { // 这里使用了DirectExecutor线程池,即直接在当前线程执行 addCallback(future, callback, directExecutor()); } // Futures#addCallback public static <V> void addCallback(final ListenableFuture<V> future, final FutureCallback<? super V> callback, Executor executor) { Runnable callbackListener = new Runnable() { @Override public void run() { final V value; try { value = getDone(future); } catch (ExecutionException e) { callback.onFailure(e.getCause()); return; } catch (RuntimeException e) { callback.onFailure(e); return; } catch (Error e) { callback.onFailure(e); return; } callback.onSuccess(value); } }; // 最终还是将这部分逻辑封装成一个回调,然后在这个回调中获取返回值,根据返回值的结果执行相应的FutureCallback方法 future.addListener(callbackListener, executor); } // Futures#getDone public static <V> V getDone(Future<V> future) throws ExecutionException { checkState(future.isDone(), "Future was expected to be done: %s", future); return getUninterruptibly(future); } public static <V> V getUninterruptibly(Future<V> future) throws ExecutionException { boolean interrupted = false; try { while (true) { try { return future.get(); } catch (InterruptedException e) { interrupted = true; } } } finally { if (interrupted) { Thread.currentThread().interrupt(); } } } 复制代码
本质上其实就是将获取返回值的逻辑封装成一个回调,在这个回调中获取返回值,根据返回值的结果执行相应的 FutureCallback
方法,不过在使用上却方便了好多。
与我们直接通过get方法获取返回值然后再执行其他逻辑还是有区别的,因为我们直接调用 Future#get
方法会阻塞当前线程,而 guava
是在回调中执行这部逻辑,类似于一种通知机制,所以不会阻塞当前线程。
其实Spring里面也有一个 ListenableFutureTask
,实现上和 guava
大同小异,也是继承了 FutureTask
并且实现了自己的 ListenableFuture
接口,通过重写 FutureTask#done
方法,在该方法中获取返回值然后执行回调逻辑
public static void main(String[] args) { ListenableFutureTask future = new ListenableFutureTask(() -> "结果"); future.addCallback(new ListenableFutureCallback() { @Override public void onSuccess(Object result) { System.out.println("callback " + result); } @Override public void onFailure(Throwable ex) { System.out.println("执行失败 "); } }); new Thread(future).start(); } 复制代码
它的Callback是保存在两个Queue里面的: successCallbacks
, failureCallbacks
,数据结构是 LinkedList
private final Queue<SuccessCallback<? super T>> successCallbacks = new LinkedList<SuccessCallback<? super T>>(); private final Queue<FailureCallback> failureCallbacks = new LinkedList<FailureCallback>(); 复制代码
重写的done方法如下,逻辑很简单,就不解释了
protected void done() { Throwable cause; try { T result = get(); this.callbacks.success(result); return; }catch (InterruptedException ex) { Thread.currentThread().interrupt(); return; }catch (ExecutionException ex) { cause = ex.getCause(); if (cause == null) { cause = ex; } } catch (Throwable ex) { cause = ex; } this.callbacks.failure(cause); } 复制代码