要使任务和线程能安全、快速、可靠的停下来,并不是一件容易的事。java没有提供任何机制来安全地终止线程(Thread.stop和suspend等方法提供了这样的功能,但是存在严重缺陷,应该避免使用)。
但是java提供了 中断(Interruption)
,这是一种协作机制,能够使一个线程终止另一个线程的当前工作。
我们很少希望某个任务、线程或服务立即停止,因为这种立即停止会使共享的数据结构处于不一致的状态。一般使用的协作的方式:当需要停止时,它们首先会清除当前正在执行的工作,然后再结束。因为任务本身的代码比发出取消请求的代码更清除如何执行清除工作。
如果外部代码可以在某个操作正常完成之前,将其置入 完成
状态,那么这个操作就可以称为 可取消的
。取消某个操作的原因有很多:
协作机制能设置某个 已请求取消(Cancellation Request)
标志,而任务将定期查看这个标志。如果设置了这个标志,任务就提前结束。
private volatile boolean cancelled; public void run(){ while(!cancelled){ doSomething(); } } public void cancel(){ cancelled = true; }
一个可取消的任务必须拥有 取消策略(Cancellation Policy)
,这个策略中必须定义:其他代码如何 How
请求该任务,任务在何时 When
检查是否已经请求了取消,以及在响应取消时应该执行哪些 What
操作.
如果在使用中断某一个任务调用了一个阻塞方法,例如BlockingQueue.put,那么可能会产生一个更严重的问题:任务可能永远不会检查取消标志,因此永远也不会结束。
private final BlockingQueue<T> queue; private volatile boolean cancelled; public void run(){ while(!cancelled){ queue.put(something); } } public void cancel(){ cancelled = true; }
当前线程为生产者,当生产者在 queue.put()
方法上阻塞了,而这时候消费者希望取消生产者任务,执行了 cancel()
方法,但是生产阻塞在put上,也许永远也没有机会检查 cancelled标志
(如果消费者停止从队列中取数,put方法就会一直阻塞)。
在java的api或语言规范中,并没有将中断与任何取消语义关联起来,但实际上,如果在取消之外的其他操作中使用中断,那么都是不合适的,并且很难支撑起更大的应用。
// Thread中的中断方法 public class Thread{ public void interrupt(){} public void isInterrupted(){} public static boolean interrupted(){} }
阻塞库方法(例如Thread.sleep()和Object.wait())等,都会检查线程何时中断,并且在发现中断时提前返回。
它们在响应中断时执行的操作包括:
JVM并不能保证阻塞方法检测到中断的速度(实际上速度是很快的)。
调用interrupt并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。
对中断操作的正确理解是:它并不会真正的中断一个正在运行的线程,只是发出中断请求,然后等待线程在下一个合适的时刻中断自己(这些时刻也被称为取消点)。
一些自定义的取消机制无法与可阻塞的库函数实现良好交互。如果代码能够响应中断,那么可以用中断作为取消机制,并且利用许多类库提供的中断支持。
通常中断是实现取消的最合理方式。
private final BlockingQueue<T> queue; public void run(){ try{ while(!Thread.currentThread().isInterrupted()){ queue.put(something); } }catch(InterruptedException consumed){ // 个人理解:中断线程也许是靠抛出异常来退出的 } public void cancel(){ interrupted(); } }
中断策略规定线程如何解释中断请求:当发现中断时,应该做哪些工作,哪些工作单元对中断来说是原子操作,以及以多快的速度来响应中断。
最合理的中断策略是某种形式的 线程级(Thread-Level)
取消操作或是 服务级(Service-Level)
取消操作:
一个中断请求可能有一个或者多个接受者,中断线程池中的某个工作者线程,同时意味着“取消当前任务”和“关闭工作者线程”。
阻塞的库函数都只是抛出InterruptedException作为中断响应,它们的取消策略:尽快退出执行流程,并把中断信息传递给调用者,从而使使用栈中的上层代码可以采取进一步操作。
由于每个线程拥有各自的中断策略,因此除非你知道中断对该线程的含义,否则就不应该中断该线程。
当调用可中断的阻塞函数时(例如Thread.sleep或BlockingQueue.put等),有两种实用策略可用于处理InterruptedException:
/** * 传递异常的方法,可以直接通过throws抛出 **/ BlockingQueue<Task> queue; …… public Task getNextTask() throws InterruptedException { return queue.take(); }
如果不想或者不能传递InterruptedException,那么要寻找另一种方式来保存中断请求。一种标准的做法就是通过再次调用interrupt来恢复中断状态。除非在代码中实现了中断策略,否则不要无视InterruptedException。
只有实现线程中断策略的代码才可以屏蔽中断请求。在常规的任务和库代码中都不应该屏蔽中断请求。
boolean interrupted = false; try{ while(true){ try{ }catch(InterruptedException e){ interrupted = true; // 重新尝试 } } }finally{ if(interrupted){ Thread.currentThread().interrupt(); } }
如果代码不会调用可中断的阻塞方法,那么可以通过在任务代码中轮询当前线程的中断状态来响应中断。
中断可以用来获取线程的注意,并且由中断线程保存的信息,可以为中断的线程提供进一步的提示(访问这些信息的时候需要确保使用同步)。
private static final ScheduledExcutorService cancelExec = ... private static void timedRun(Runnable r, long timeout, TimeUnit unit){ final Thread taskThread = Thread.currentThread(); cancelExec.schedule(new Runnable(){ public void run(){ taskThread.interrupt(); } },timeout unit); r.run(); }
这是一个在指定时间内运行一个任意的Runnable的示例。它在调用线程中的运行任务,并安排了一个取消任务,在运行了指定的时间间隔后中断它。这解决了从任务中抛出未检查异常的问题,因为这个异常会被timeRun()的调用者所捕获。
但是问题是:在中断之前,应该了解它的中断策略。
因为timeRun()可以从任意一个线程中调用,因此它无法得知这个调用线程的中断策略。如果任务在超时前完成,那么中断timeRun所在线程的取消任务将爱timedRun返回到调用者之后启动。而且如果任务不响应中断,那么timeRun将会在任务结束时才返回,有可能超过的调用者所指定的时限。
public static void timedRun(final Runnable r, long timeout, TimeUnit unit) throws InerruptedException { class RethrowableTask implements Runnable { private volatile Throwable t; public void run() { try{ r.run(); }catch(Throwable t){ this.t = t; } } void rethrow(){ if(t != null){ throw launderThrowable(t); } } } RethrowableTask task = new RethrowableTask; final Thread taskThread = new Thread(task); taskThread.start(); cancelExec.schedule(new Runnable(){ public void run(){ taskThread.interrupt(); } }, timeout, unit); taskThread.join(unit.toMillis(timeout)); task.rethrow(); }
执行任务的线程拥有自己的执行策略,即使任务不响应中断,即时运行的方法仍能返回到它的调用者。
在启动任务线程之后,timeRun将执行一个限时的join方法。在join返回后,它将检查任务中是否有异常抛出,如果有的话,则会在调用timedRun的线程中再次抛出该异常。由于Throwable将在两个线程之间共享,因此设置为volatile类型,来保证安全发布。
但是该代码依赖一个限时的join,因此有着join的不足:无法知道执行控制是因为线程正常退出而返回,还是因为join超时而返回。
ExecutorService.submit()
将返回一个Future来描述任务。Future拥有一个cancel方法,该方法带有一个boolean类型的参数 mayInterruptIfRunning
,表示取消操作是否成功(只是表示能否接受中断,而不是表示任务是否能检测并处理中断)。
mayInterruptIfRunning
为true且任务当前正在某个线程运行,那么这个线程可以被中断。 mayInterruptIfRunning
为false,那么意味着 若任务还没启动,则不要运行它
。
执行任务的线程是由标准的Executor创建的,它实现了一种中断策略使得任务可以通过中断被取消,如果任务在标准Executor中运行,并通过它们的Future来取消任务,那么可以设置 mayInterruptIfRunning
。当尝试取消某个任务时,不宜直接中断线程池,因为你不知道当中断请求到达时线程正在运行什么任务,只能通过任务的Future来实现。
public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException { Future<?> task = taskExec.submit(r); try{ task.get(timeout,unit); }catch(TimeoutException e){ // 接下来任务将被取消 }catch(ExecutionException e){ // 如果任务中抛出了异常,则重新抛出该异常 throw launderThrowable(e.getCause()); }finally { // 如果任务已经结束,取消操作也不会带来任何影响 // 如果任务正在运行,那么将被中断 task.cancel(); } }
当 Future.get
抛出 InterruptedException
或 TimeoutException
时,如果你知道不再需要结果,那么就可以调用 Future.cancel
来取消任务。
在java库,许多可阻塞的方法都是通过提前返回或者抛出InterruptedException来响应中断请求的。然而并非所有的可阻塞方法或者阻塞机制都能响应中断。
如果一个线程由于执行同步的Socket I/O或者等待获取内置锁而阻塞,那么中断请求只能设置线程的中断状态,除此之外没有其他作用。
java.io
包中的 同步Socket I/O
:
InputStream
和 OutputStream
的 read和write方法
不会响应中断,但通过关闭底层的套接字,可以使由于执行read和write等方法被阻塞的线程抛出一个 SocketException
.
java.io
包中的 同步I/O
:
InterruptibaleChannel
上等待的线程时,将抛出 ClosedByInterruptException
并关闭链路(会使得其他在这条链路上阻塞的线程通用抛出 ClosedByInterruptException
)。 InterruptibaleChannel
,将导致所有在链路操作上阻塞的线程都抛出 AsynchronousCloseException
。大多数标准的Channel都实现了 InterruptibaleChannel
。
Selector的异步I/O
:
java.nio.channels中的Select.select
方法阻塞了,那么调用close或者wakeup方法会使线程抛出 ClosedSelectorException
并提前返回。 获得某个锁:
lockInterruptibly
方法,该方法允许在等待一个锁的同时能响应中断)。 public class ReaderThread extends Thread{ private final Socket socket; private final InputStream inputStream; public ReaderThread(Socket socket) throws IOException{ this.socket = socket; this.inputStream = socket.getInputStream(); } public void interrupt(){ try { socket.close(); }catch (IOException ignored){ }finally { super.interrupt(); } } public void run(){ try { byte[] buf = new byte[1000]; while (true){ int count = inputStream.read(buf); if(count<0){ break; }else if (count>0){ doSomething(buf,count); } } }catch (IOException e){ } } }
通过改写interrupt方法,既能处理标准的中断,也能关闭底层套接字。无论线程是在read或write方法中阻塞还是在某个可中断方法中阻塞,都可以被中断停止执行当前工作。
我们可以通过 newTaskFor
方法是ThreadPoolExecutor中的新增功能。当把一个Callable提交给 ExecutorService
时, submit()
会返回一个 Future
,我们可以通过这个Future来取消任务。newTaskFor是一个工厂方法,它将创建Future来代表任务。newTaskFor还能返回一个RunnableFuture接口,该接口拓展了 Future
和 Runnable
(由FutureTask实现)。
通过定制表示任务的Future可以改变Future.cancel的行为。例如,定制的取消代码可以实现日志记录或者收集取消操作的统计信息,以及取消一些不响应中断的操作。
public interface CancellableTask<T> extends Callable<T> { void cancel(); RunnableFuture<T> newTask(); } public class CancellingExecutor extends ThreadPoolExecutor{ ... protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { if (callable instanceof CancellableTask){ return ((CancellableTask<T>) callable).newTask(); }else { return super.newTaskFor(callable); } } public abstract class SocketUsingTask<T> implements CancellableTask<T> { private Socket socket; protected synchronized void setSocket(Socket s){ socket = s; } public synchronized void cancel(){ try{ if(socket != null){ socket.close(); } }catch(IOException ignored){} } public RunnableFuture<T> newTask(){ return new FutureTask<T>(this){ public boolean cancel(boolean mayInterruptIfRunning){ try{ SocketUsingTask.this.cancel(); }finally{ return super.cancel(mayInterruptIfRunning); } } } } } }
CancellableTask中定义了一个CancellableTask接口,该接口拓展了Callable,并增加了一个cancel方法和一个newTask工厂方法来构造RunnableFuture。CancellingExecutor拓展了ThreadPoolExecutor,并改写了newTaskFor使Cancellable可以创建自己的Future。
正确的封装原则:除非拥有某个线程,否则不能对该线程进行操控。
线程由Thread对象表示,并且像其他对象一样可以被自由的共享。但是线程有一个相应的所有者,即创建该线程的类。因此线程池是其工作线程的所有者,如果要中断这些线程,那么应该使用线程池。
与其他封装对象一样,线程的所有权是不可传递的:应用程序可以拥有服务,服务也可以拥有工作者线程,但是应用程序并不能拥有工作者线程,因此应用程序不能直接停止工作者线程。应用程序应该提供 生命周期方法(Lifecycle Method)
来关闭它自己以及它所拥有的线程。在ExecutorService中提供了shutdown和shutdownNow等方法,在其他拥有线程的服务中也应该提供类似的关闭机制。
对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,那就应该提供生命周期方法。
ExecutorService提供了两种关闭方法:
shutdown shutdownNow
一种关闭生产者-消费者服务的方式就是使用 毒丸(Poison Pill)对象
:毒丸是指一种放在队列上的对象,其含义是: 当得到这个对象,立即停止。
在FIFO队列中,毒丸对象将会确保消费者在关闭之前首先完成队列中的所有工作,再提交毒丸。毒丸对象之前的所有工作都会得到处理,而生产者在提交毒丸对象以后,将不会提交任何的工作。
当生产者和消费者的数量较大时,这种方法将变得难以使用。只有在无界队列中,毒丸对象才能可靠的工作。
当通过shutdownNow来强行关闭ExecutorService时,它会尝试取消正在执行的任务并且返回 已提交但是尚未执 行的任务
(以便调用者线程把这些任务写入日志或者做其他后续处理);shutdownNow返回的 List<Runnable>
可能与提交给ExecutorService的Runnable不同,它们可能被封装或者修改过。
但是我们无法通过普通方法找出哪些任务 已经开始但是尚未结束
,我们无法得知状态,除非执行线程中有某些检查。
导致线程提前死亡的最主要原因就是RuntimeException,由于某些异常表示了某种编程错误或其他类似的不可修复的错误,因此它们不会被捕获。它们不会在调用栈中逐层传递,而是默认的在控制台输出栈追踪信息,并终止线程。
任何代码都可能抛出一个RuntimeException。每当调用另一个方法时,都要对它的行为保持怀疑,不要盲目的认为它一定会正常返回,或者一定会抛出在方法原型中声明的异常。对调用的代码越不熟悉,越应该对其行为保持怀疑。
在任务处理线程的生命周期中,将通过某种抽象机制(如Runnable)来调用许多未知的代码,我们应该对这些线程能否表现出正确的行为表示怀疑。因此,这些线程应该在try-catch代码块中调用这些任务,就能捕获未检测的异常了,或者也可以使用try-finally代码块来确保框架能够知道线程非正常退出的情况。
public void run(){ Throwable thrown = null; try{ while(!isInterrupted){ runTask(getTaskFromWorkQueue()); } catch (Throwable e){ thrown = e; } finally { threadExited(this,thrown); } } }
在线程池内部构建一个工作者线程,如果任务抛出了一个未检查异常,那么它将使线程终结,但是会先同时框架它已经终结。然后框架可能会调用一个新的线程来代替这个工作线程,也可能不会,因为线程池正在关闭,或者已经有足够多的线程能满足需要。当使用这种方法,可以避免某个编写的糟糕的任务或插件时不会影响调用它的整个线程。
在Thread API中同样提供了 UncaughtExceptionHandler
,它能检测某个线程由于捕获异常而终结的情况。这个与前面的工作者线程是互补的,通过将二者结合在一起,可以有效的防止线程泄露问题。
public interface UncaughtExceptionHandler { void uncaughtExceptionHandle(Thread t, Throwable e); }
异常处理器如何捕获异常,取决于对服务质量的需求。最常见的响应方式将一个错误信息以及相应的栈追踪信息写入应用程序日志中。
public class UEHLogger implements Thread.UncaughtExceptionHandler { public void uncaughtExceptionHandler(Thread t, Throwable e){ Logger logger = Logger.getAnonymousLogger(); logger.log(Level.SEVERE, "Thread terminated with Exception:" + t.getName(), e); } }
可以通过 Thread.setUncaughtExceptionHandler
为每个线程设置一个UncaughtExceptionHandler,还可以使用 setDefaultUncaughtExceptionHandler
来设置默认的UncaughtExceptionHandler。
在运行时间较长的应用程序中,通常会为所有线程的未捕获异常指定同一个异常处理器,并且该处理器至少会将异常信息记录到日志中。
如果要为线程池中的所有线程设置一个UncaughtExceptionHandler,需要为ThreadPoolExecutor的构造函数提供一个ThreadFactory。标准线程池允许当发送未捕获异常时结束线程,但由于使用了一个try-finally代码块来接受其他通知,因此当线程结束时,将有新的线程来代替它。如果没有提供捕获异常处理器或其他的故障通知机制,那么任务会悄悄的失败,从而造成极大的混乱。如果你希望任务由于发生异常而失败的时获得通知,并且执行一些特定于任务的恢复操作,那么可以将 任务封装在能捕获异常的Runnable或Callable中
,或者
改写ThreadPoolExecutor的 afterExecute
方法
。
但是只有通过execute提交的任务,才能将异常交给捕获异常处理器,而通过submit提交的任务,无论是抛出的未检查异常还是已检查异常,都将被认为是任务返回状态的一部分。如果一个由submit提交的任务抛出异常,那么将被Future.get封装在ExecutionException中重新抛出。
JVM既可以正常关闭,也可以强行关闭。
正常关闭的触发方式:
也可以通过调用Runtime.halt或者在操作系统中发送SIGKILL等
在正常关闭中,JVM首先调用所有已注册的 关闭钩子(Shutdown Hook)
。关闭钩子是指通过Runtime.addShutdownHook注册的但尚未开始的线程。
runFinalizersOnExit
关闭钩子应该是线程安全的:
关闭钩子可以用于实现服务或应用程序的清理工作,例如删除临时文件,或者清除无法由系统自动清除的资源。
由于关闭钩子将并发执行,因此在关闭日志文件时可能导致其他需要日志服务的关闭钩子产生问题:因此,关闭钩子不应该依赖于那些可能被应用程序或其他的关闭钩子关闭的服务。实现这种功能的一种方式是对所有服务使用同一个关闭钩子,并且在该关闭钩子中执行一些列的关闭操作。这确保了关闭操作在单个线程中串行执行,从而避免了操作之间出现竞态条件或死锁等问题。
public void start(){ Runtime.getRuntime().addShutdownHook(new Thread(){ public void run(){ try{ LogService.this.stop(); }catch(InterruptedException ignored){} } }); }
有时候你希望创建一个线程来执行一些辅助工作,但是又不希望这个线程阻碍JVM的关闭,那么你可以使用 守护线程(Daemon Thread)
。
线程分为守护线程和普通线程。在JVM启动时创建的所有线程,除了主线程,其他都是守护线程(例如GC或其他辅助工作的线程)。
当创建一个新线程时,新线程将继承创建它的线程的守护状态。因此主线程创建的都是普通线程。
仅在于线程退出时发生的操作:当一个线程退出时,JVM会检查其他正在运行的线程,如果这些线程都是守护线程,那么JVM会正常的退出操作。当JVM停止时,所有仍然存在的守护线程将被直接抛弃:不会执行finally代码块,也不会执行回卷栈,JVM会直接退出。
我们应该尽量不使用守护线程 : 因为很少有操作能够在不进行清理的情况下被安全地抛弃。例如如果在守护线程中执行包含I/O操作的任务,那么将是一种危险的行为。守护线程最好用于执行“内部”任务,例如周期性的从缓存中移除无效数据。
此外,守护线程通常不能用来替代应用程序管理程序中的各个服务的生命周期。
当不再需要内存资源的时候,可以通过GC自动回收它们。对于一些其它资源,如文件句柄或者套接字句柄,当不再需要它们的时候,需要显式的还给操作系统。为了实现这个功能,垃圾回收期对定义了finalize方法的对象会进行特殊处理:在回收期释放它们以后,调用它们的finalize方法,从而保证一些持久化的资源被释放。
由于终结器可以在某个由jvm管理的线程中运行,因此终结器访问的任何状态都可能被多个线程访问,这样就必须对其访问操作进行同步。
终结器也无法保证它们在何时运行甚至是否运行,并且复杂的终结器将在对象上产生巨大的性能开销。
在大多数时候,使用finally代码块和显式的close方法,能够比使用终结器更好地管理资源。
唯一的例外情况是:当需要管理对象,并且该对象持有的资源是通过本地方法获得的。
避免使用终结器。
在任务,线程,服务以及应用程序等模块中的生命周期结束问题,可能会增加它们在设计和实现时的复杂性。java并没有提供某种抢占式的机制来取消或者终结线程。相反,它提供了一种协作式的中断机制来实现取消操作,但这要依赖于如何构建取消操作的协议,以及能否始终遵循这些协议。通过FutureTask和Executor框架,可以帮助我们构建可取消任务和服务。