熟悉Java的人都能很容易地写出如下代码:
public static class MyThread extends Thread { @Override public void run() { System.out.println("MyThread is running..."); } } public static void main(String[] args) { Thread t = new MyThread(); t.start(); } 复制代码
这是一个面试常问的基础问题,你应该肯定的回答线程只有五种状态,分别是:新建状态、就绪状态、执行状态、阻塞状态、终止状态。
由于Scheduler(调度器)的时间片分配算法,每个Running的线程会执行多长时间是未知的,因此线程能够在Runnable和Running之间来回转换。 阻塞状态的线程必须先进入就绪状态才能进入执行状态 。
Running线程在主动调用 Thread.sleep()
、 obj.wait()
、 thread.join()
时会进入 TIMED-WAITING
或 WAITING
状态并主动让出CPU执行权。如果是 TIMED-WAITING
,那么在经过一定的时间之后会主动返回并进入Runnable状态等待时间片的分配。
thread.join()
的底层就是当前线程不断轮询 thread
是否存活,如果存活就不断地 wait(0)
。
Running线程在执行过程中如果遇到了临界区( synchronized
修饰的方法或代码块)并且需要获取的锁正在被其他线程占用,那么他会主动将自己挂起并进入 BLOCKED
状态。
如果持有锁的线程退出临界区,那么在该锁上等待的线程都会被唤醒并进入就绪状态,但只有抢到锁的线程会进入执行状态,其他没有抢到锁的线程仍将进入阻塞状态。
如果某个线程调用了 obj
的 notify/notifyAll
方法,那么在该线程退出临界区时(调用 wait/notify
必须先通过 synchronized
获取对象的锁),被唤醒的等待在 obj.wait
上的线程才会从阻塞状态进入就绪状态获取 obj
的 monitor
,并且只有抢到 monitor
的线程才会从 obj.wait
返回,而没有抢到的线程仍旧会阻塞在 obj.wait
上
在执行状态下的线程执行完 run
方法或阻塞状态下的线程被 interrupt
时会进入终止状态,随后会被销毁。
public synchronized void start() { if (threadStatus != 0) throw new IllegalThreadStateException(); group.add(this); boolean started = false; try { start0(); started = true; } finally { try { if (!started) { group.threadStartFailed(this); } } catch (Throwable ignore) {} } } private native void start0(); 复制代码
start
方法主要做了三件事:
start0
,这是一个 native
方法,在往期文章《Java线程是如何实现的?》一文中谈到线程的调度将交给LWP,这里的启动新建线程同样属于此范畴。因此我们能够猜到此JNI(Java Native Interface)调用将会新建一个线程(LWP)并执行该线程对象的 run
方法 started
状态置为 true
表示已被启动过。正如初学线程时老师所讲的,线程的 start
只能被调用一次,重复调用会报错就是通过这个变量实现的。 我们将通过 Thread
来模拟这样一个场景:银行多窗口叫号。从而思考已经有 Thread
了为什么还要引入 Runnable
首先我们需要一个窗口线程模拟叫号(窗口叫号,相应号码的顾客到对应窗口办理业务)的过程:
public class TicketWindow extends Thread { public static final Random RANDOM = new Random(System.currentTimeMillis()); private static final int MAX = 20; private int counter; private String windowName; public TicketWindow(String windowName) { super(windowName); counter = 0; this.windowName = windowName; } @Override public void run() { System.out.println(windowName + " start working..."); while (counter < MAX){ System.out.println(windowName + ": It's the turn to number " + counter++); //simulate handle the business try { Thread.sleep(RANDOM.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } } } } 复制代码
然后编写一个叫号客户端模拟四个窗口同时叫号:
public class WindowThreadClient { public static void main(String[] args) { Stream.of("Window-1","Window-2","Window-3","Window-4").forEach( windowName -> new TicketWindow(windowName).start() ); } } 复制代码
你会发现同一个号码被叫了四次,显然这不是我们想要的。正常情况下应该是四个窗口共享一个叫号系统,窗口只负责办理业务而叫号则应该交给叫号系统,这是典型的OOP中的单一职责原则。
我们将线程和要执行的任务耦合在了一起,因此出现了如上所述的尴尬情况。线程的职责就是执行任务,它有它自己的运行时状态,我们不应该将要执行的任务的相关状态(如本例中的 counter
、 windowName
)将线程耦合在一起,而应该将业务逻辑单独抽取出来作为一个逻辑执行单元,当需要执行时提交给线程即可。于是就有了 Runnable
接口:
public interface Runnable { public abstract void run(); } 复制代码
因此我们可以将之前的多窗口叫号改造一下:
public class TicketWindowRunnable implements Runnable { public static final Random RANDOM = new Random(System.currentTimeMillis()); private static final int MAX = 20; private int counter = 0; @Override public void run() { System.out.println(Thread.currentThread().getName() + " start working..."); while (counter < MAX){ System.out.println(Thread.currentThread().getName()+ ": It's the turn to number " + counter++); //simulate handle the business try { Thread.sleep(RANDOM.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } } } } 复制代码
测试类:
public class WindowThreadClient { public static void main(String[] args) { TicketWindowRunnable ticketWindow = new TicketWindowRunnable(); Stream.of("Window-1", "Window-2", "Window-3", "Window-4").forEach( windowName -> new Thread(ticketWindow, windowName).start() ); } } 复制代码
如此你会发现没有重复的叫号了。但是这个程序并不是线程安全的,因为有多个线程同时更改 windowRunnable
中的 counter
变量,由于本节主要阐述 Runnable
的作用,因此暂时不对此展开讨论。
将 Thread
中的 run
通过接口的方式暴露出来还有一个好处就是对策略模式和函数式编程友好。
首先简单介绍一下策略模式,假设我们现在需要计算一个员工的个人所得税,于是我们写了如下工具类,传入基本工资和奖金即可调用 calculate
得出应纳税额:
public class TaxCalculator { private double salary; private double bonus; public TaxCalculator(double base, double bonus) { this.salary = base; this.bonus = bonus; } public double calculate() { return salary * 0.03 + bonus * 0.1; } } 复制代码
这样写有什么问题?我们将应纳税额的计算写死了: salary * 0.03 + bonus * 0.1
,而税率并非一层不变的,客户提出需求变动也是常有的事!难道每次需求变更我们都要手动更改这部分代码吗?
这时策略模式来帮忙:当我们的需求的输入是不变的,但输出需要根据不同的策略做出相应的调整时,我们可以将这部分的逻辑抽取成一个接口:
public interface TaxCalculateStrategy { public double calculate(double salary, double bonus); } 复制代码
具体策略实现:
public class SimpleTaxCalculateStrategy implements TaxCalculateStrategy { @Override public double calculate(double salary, double bonus) { return salary * 0.03 + bonus * 0.1; } } 复制代码
而业务代码仅调用接口:
public class TaxCalculator { private double salary; private double bonus; private TaxCalculateStrategy taxCalculateStrategy; public TaxCalculator(double base, double bonus, TaxCalculateStrategy taxCalculateStrategy) { this.salary = base; this.bonus = bonus; this.taxCalculateStrategy = taxCalculateStrategy; } public double calculate() { return taxCalculateStrategy.calculate(salary, bonus); } } 复制代码
将 Thread
中的逻辑执行单元 run
抽取成一个接口 Runnable
有着异曲同工之妙。因为实际业务中,需要提交给线程执行的任务我们是无法预料的,抽取成一个接口之后就给我们的应用程序带来了很大的灵活性。
另外在JDK1.8中引入了函数式编程和lambda表达式,使用策略模式对这个特性也是很友好的。还是借助上面这个例子,如果计算规则变成了 (salary + bonus) * 1.5
,可能我们需要新增一个策略类:
public class AnotherTaxCalculatorStrategy implements TaxCalculateStrategy { @Override public double calculate(double salary, double bonus) { return (salary + bonus) * 1.5; } } 复制代码
在JDK增加内部类语法糖之后,可以使用匿名内部类省去创建新类的开销:
public class TaxCalculateTest { public static void main(String[] args) { TaxCalculator taxCalaculator = new TaxCalculator(5000,1500, new TaxCalculateStrategy(){ @Override public double calculate(double salary, double bonus) { return (salary + bonus) * 1.5; } }); } } 复制代码
但是在JDK新增函数式编程后,可以更加简洁明了:
public class TaxCalculateTest { public static void main(String[] args) { TaxCalculator taxCalaculator = new TaxCalculator(5000, 1500, (salary, bonus) -> (salary + bonus) * 1.5); } } 复制代码
这对只有一个抽象方法 run
的 Runnable
接口来说是同样适用的。
查看 Thread
的构造方法,追溯到 init
方法(略有删减):
Thread parent = currentThread(); if (g == null) { if (g == null) { g = parent.getThreadGroup(); } } this.group = g; this.daemon = parent.isDaemon(); this.priority = parent.getPriority(); this.target = target; setPriority(priority); this.stackSize = stackSize; tid = nextThreadID(); 复制代码
g
是当前对象的 ThreadGroup
, 2~8
就是在设置当前对象所属的线程组,如果在 new Thread
时没有显式指定,那么默认将父线程(当前执行 new Thread
的线程)线程组设置为自己的线程组。
9~10
行,从父线程中继承两个状态:是否是守护线程、优先级是多少。当然了,在 new Thread
之后可以通过 thread.setDeamon
或 thread.setPriority
进行自定义
12
行,如果是通过 new Thread(Runnable target)
方式创建的线程,那么取得传入的 Runnable target
,线程启动时调用的 run
中会执行不空的 target
的 run
方法。理论上来讲创建线程有三种方式:
Runnable
接口 MyRunnable
,通过 new Thread(myRunnable)
执行 MyRunnable
中的 run
Thread
并重写 run
,通过 new MyThread()
执行重写的 run
Thread
并重写 run
,仍可向构造方法传入 Runnable
实现类实例: new MyThread(myRunnable)
,但是只会执行 MyThread
中重写的 run
,不会受 myRunnable
的任何影响。这种创建线程的方式有很大的歧义,除了面试官可能会拿来为难你一下,不建议这样使用 设置线程优先级,一共有10个优先级别对应取值 [0,9]
,取值越大优先级越大。但这一参数具有平台依赖性,这意味着可能在有的操作系统上可能有效,而在有的操作系统上可能无效,因为Java线程是直接映射到内核线程的,因此具体的调度仍要看操作系统。
设置栈大小。这个大小指的是栈的内存大小而非栈所能容纳的最大栈帧数目,每一个方法的调用和返回对应一个栈帧从线程的虚拟机栈中入栈到出栈的过程,在下一节中会介绍这个参数。虚拟机栈知识详见《深入理解Java虚拟机(第二版)》第二章。
设置线程的 ID
,是线程的唯一标识,比如偏向锁偏向线程时会在对象头的 Mark Word
中存入该线程的ID(偏向锁可见《并发编程的艺术》和《深入理解Java虚拟机》第五章)。
通过 nextThreadID
会发现是一个 static synchronized
方法,原子地取得线程序列号 threadSeqNumber
自增后的值:
public static void main(String[] args) { new Thread(() -> { System.out.println(Thread.currentThread().getId()); //11 }).start(); } 复制代码
为什么 main
中创建的第一个线程的ID是11(意味着他是JVM启动后创建的第11个线程)呢?这因为在JVM在执行 main
时会启动JVM进程的第一个线程(叫做 main
线程),并且会启动一些守护线程,比如GC线程。
这里要注意的是每个线程都有一个私有的虚拟机栈。所有线程的栈都存放在JVM运行时数据区域的虚拟机栈区域中。
Thread
提供了一个可以设置 stackSize
的重载构造方法:
public Thread(ThreadGroup group, Runnable target, String name, long stackSize) 复制代码
官方文档对该参数的描述如下:
The stack size is the approximate number of bytes of address space that the virtual machine is to allocate for this thread's stack. The effect of the stackSize parameter, if any, is highly platform dependent.
你能通过指定 stackSize
参数近似地指定虚拟机栈的内存大小( 注意 :是内存大小即字节数而不是栈中所能容纳的最大栈帧数目,而且这个大小指的是该线程的栈大小而并非是整个虚拟机栈区的大小)。且该参数具有高度的平台依赖性,也就是说在各个操作系统上,同样的参数表现出来的效果有所不同。
On some platforms, specifying a higher value for the stackSize
parameter may allow a thread to achieve greater recursion depth before throwing a StackOverflowError
. Similarly, specifying a lower value may allow a greater number of threads to exist concurrently without throwing an OutOfMemoryError
(or other internal error). The details of the relationship between the value of the stackSize
parameter and the maximum recursion depth and concurrency level are platform-dependent. On some platforms, the value of the stackSize parameter may have no effect whatsoever.
在一些平台上,为 stackSize
指定一个较大的值,能够允许线程在抛出栈溢出异常前达到较大的递归深度(因为方法栈帧的大小在编译期可知,以局部变量表为例,基本类型变量中只有 long
和 double
占8个字节,其余的作4个字节处理,引用类型根据虚拟机是32位还是64位而占4个字节或8个字节。如此的话栈越大,栈所能容纳的最大栈帧数目也即递归深度也就越大)。类似的,指定一个较小的 stackSize
能够让更多的线程共存而避免OOM异常(有的读者可能会异或,栈较小怎么还不容易抛出OOM异常了呢?不是应该栈较小,内存更不够用,更容易OOM吗?其实单线程环境下,只可能发生栈溢出而不会发生OOM,因为每个方法对应的栈帧大小在编译器就可知了,线程启动时会从虚拟机栈区划分一块内存作为栈的大小,因此无论是压入的栈帧太多还是将要压入的栈帧太大都只会导致栈无法继续容纳栈帧而抛出栈溢出。那么什么时候回抛出OOM呢。对于虚拟机栈区来说,如果没有足够的内存划分出来作为新建线程的栈内存时,就会抛出OOM了。这就不难理解了,有限的进程内存除去堆内存、方法区、JVM自身所需内存之后剩下的虚拟机栈是有限的,分配给每个栈的越少,能够并存的线程自然就越多了)。最后,在一些平台上,无论将 stackSize
设置为多大都可能不会起到任何作用。
The virtual machine is free to treat the stackSize
parameter as a suggestion. If the specified value is unreasonably low for the platform, the virtual machine may instead use some platform-specific minimum value; if the specified value is unreasonably high, the virtual machine may instead use some platform-specific maximum. Likewise, the virtual machine is free to round the specified value up or down as it sees fit (or to ignore it completely).
虚拟机会将 stackSize
视为一种建议,在栈大小的设置上仍有一定的话语权。如果给定的值太小,虚拟机会将栈大小设置为平台对应的最小栈大小;相应的如果给定的值太大,则会设置成平台对应的最大栈大小。又或者,虚拟机能够按照给定的值向上或向下取舍以设置一个合适的栈大小(甚至虚拟机会忽略它)。
Due to the platform-dependent nature of the behavior of this constructor, extreme care should be exercised in its use. The thread stack size necessary to perform a given computation will likely vary from one JRE implementation to another. In light of this variation, careful tuning of the stack size parameter may be required, and the tuning may need to be repeated for each JRE implementation on which an application is to run.
由于此构造函数的平台依赖特性,在使用时需要格外小心。线程栈的实际大小的计算规则会因为JVM的不同实现而有不同的表现。鉴于这种变化,可能需要仔细调整堆栈大小参数,并且对于应用程序使用的不同的JVM实现需要有不同的调整。
Implementation note: Java platform implementers are encouraged to document their implementation's behavior with respect to the stackSize
parameter.
public class StackSizeTest { public static int counter = 0; public static void main(String[] args) { new Thread(() -> { try { count(); } catch (StackOverflowError e) { System.out.println(counter); // result -> 35473 } }).start(); } public static void count() { counter++; count(); } } 复制代码
显式指定 stackSize
之后显著地影响了线程栈的大小,调用深度由原来的 35473
变成了 296
:
public class StackSizeTest { public static int counter = 0; public static void main(String[] args) { new Thread(null,() -> { try { count(); } catch (StackOverflowError e) { System.out.println(counter); } },"test-stack-size",10 * 1024).start(); //stackSize -> 10KB result -> 296 } public static void count() { counter++; count(); } } 复制代码
要想改变栈帧的大小,通过增加局部变量即可实现。以下通过增加多个 long
变量(一个占8个字节),较上一次的测试,方法调用深度又有明显的减小:
public class StackSizeTest { public static int counter = 0; public static void main(String[] args) { new Thread(null,() -> { try { count(); } catch (StackOverflowError e) { System.out.println(counter); } },"test-stack-size",10 * 1024).start(); //stackSize -> 10KB result -> 65 } public static void count() { long a,b,c,d,e,f,g,h,j,k,l,m,n,o,p,q; counter++; count(); } } 复制代码
通过 thread.setDaemon(true)
可将新建后的线程设置为守护线程,必须在线程启动前( thread.start
)设置才有效。
集群架构中,通常需要心跳检测机制。如果应用程序开一条非守护线程来做心跳检测,那么可能会出现应用主程序都终止运行了但心跳检测线程仍在工作的情况,这时JVM会因为仍有非守护线程在工作而继续占用系统的CPU、内存资源,这显然是不应该的。
下列代码简单模仿了这一场景:
public class HeartCheck { public static void main(String[] args) { // worker thread new Thread(()->{ // start the heart-check thread first Thread heartCheck = new Thread(()->{ // do interval-automatic heart check and notify the parent thread when heart check has error while (true) { System.out.println("do heart check"); try { Thread.sleep(100); //interval } catch (InterruptedException e) { e.printStackTrace(); } } }); heartCheck.setDaemon(true); heartCheck.start(); // simulate work try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } } 复制代码
直接上源码:
public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { while (isAlive()) { wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } } 复制代码
如果调用某个线程 thread
的 join()
,会分发到 join(0)
,执行上述的第 10~12
行,只要当前线程获取到了CPU执行权就会轮询 thread
的执行状态( isAlive
是个 native
方法,但我们能够猜到它的作用就是检测 thread
是否存活,即不是 Terminated
状态),一旦发现 thread
仍然存活就会释放CPU执行权(通过 wait(0)
的方式),等下一轮的轮询,直到 thread
进入终止状态,那么当前线程将从 thread.join()
返回。
一定要区分清楚,调用 thread.join()
阻塞的是当前线程,不会对 thread
线程造成任何影响。
join
提供了一个重载的限时等待方法(这是一个经典的超时等待模型:只有当条件满足或者已超过等待时限时才返回),这也是为了避免当前线程陷入永久等待的困境,能够在等待一段时间发现目标线程仍未执行完后自动返回。
join
有一个比较好玩的地方就是如果线程调用它自己的 join
方法,那么该线程将无限 wait
下去,因为: Thread.currentThread().join()
会等待当前线程执行完,而当前线程正在调用当前线程的 join
即等当前线程执行完……就让他自个儿去慢慢玩儿吧~
比如电商网站中的用户行为日志,可能需要经过聚合、筛选、分析、归类等步骤加工,最后再存入数据库。并且这些步骤的执行必须是按部就班的层层加工,那么一个步骤就必须等到上一个步骤结束后拿到结果在开始,这时就可以利用 join
做到这点。
下列代码简单模仿了此场景:
public class StepByStep { public static void main(String[] args) throws InterruptedException { Thread step1 = new Thread(() -> { System.out.println("start capture data..."); //simulate capture data try { Thread.sleep(1000); System.out.println("capture done."); } catch (InterruptedException e) { e.printStackTrace(); } }); step1.start(); Thread step2 = new Thread(() -> { try { step1.join(); System.out.println("start screen out the data..."); Thread.sleep(1000); System.out.println("screen out done."); } catch (InterruptedException e) { e.printStackTrace(); } }); step2.start(); Thread step3 = new Thread(() -> { try { step2.join(); System.out.println("start analyze the data..."); Thread.sleep(1000); System.out.println("analyze done."); } catch (InterruptedException e) { e.printStackTrace(); } }); step3.start(); Thread step4 = new Thread(() -> { try { step3.join(); System.out.println("start classify the data"); Thread.sleep(1000); System.out.println("classify done."); } catch (InterruptedException e) { e.printStackTrace(); } }); step4.start(); step4.join(); System.out.println("write into database"); } } 复制代码
值得注意的是,如果调用未启动线程的 join
,将会立即返回:
public class StepByStep { public static void main(String[] args) throws InterruptedException { Thread t = new Thread(() -> { }); t.join(); } } 复制代码
有时任务量太大且任务是可分的(子任务之间没有上例的依赖关系),那么我们不妨将任务拆分成互不相干的子任务(这一步叫做 Fork
),分别为各个子任务分配一个单独线程从而实现子任务并行执行,提高执行效率,最后将个子任务的结果整合起来做最后的加工(主线程就可以使用 join
来等待各个子任务线程的执行结果,从而最后做一个汇总)。JDK8提供的 Stream
和 ForkJoin
框架都有此模型的身影。
我们可以通过 join
的重载方法提供的限时等待,在目标任务执行时间过长时自动返回,从而采取其他弥补策略,而不至于老是傻傻地等着。
public void interrupt() { if (this != Thread.currentThread()) checkAccess(); synchronized (blockerLock) { Interruptible b = blocker; if (b != null) { interrupt0(); // Just to set the interrupt flag b.interrupt(this); return; } } interrupt0(); } 复制代码
这里有一个细节, interrupt
首先会设置线程的中断标志位,然后再打断它。
查看官方文档:
If this thread is blocked in an invocation of the wait()
, wait(long)
, or wait(long, int)
methods of the Object
class, or of the join()
, join(long)
, join(long, int)
, sleep(long)
, or sleep(long, int)
, methods of this class, then its interrupt status will be cleared and it will receive an InterruptedException
.
If none of the previous conditions hold then this thread's interrupt status will be set.
Interrupting a thread that is not alive need not have any effect.
由此我们可以提取三点信息:
Timed-Waiting/Waiting
中的线程被打断后首先会清除它的中断标志位,然后再抛出 InterruptedException
。因此被中断的线程进入 Runnable/Running
)下的线程不会被打断,但是其中断标志位会被设置,即调用它的 isInterrupted
将返回 true
interrupt
不会产生任何效果。 Tests whether this thread has been interrupted. The interrupted status of the thread is unaffected by this method.
A thread interruption ignored because a thread was not alive at the time of the interrupt will be reflected by this method returning false.
测试线程是否被中断过,该方法的调用不会改变线程的中断标志位。对一个终止状态下的线程调用过 interrupt
并不会导致该方法返回 true
。
于是我们可以使用 isInterrupted
来测试一下上面提取的3个结论:
public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { } try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } }); t1.start(); t1.interrupt(); System.out.println(t1.isInterrupted()); //true Thread.sleep(1000); System.out.println(t1.isInterrupted()); //false } 复制代码
上述代码在 t1.interrupt
后马上检查 t1
的中断标志位,由于 interrupt
是先设置中断标志位,再中断,因此 17
行的输出检测到了中断标志位返回 true
;接着 18~19
行先等 t1
在抛出 InterruptedException
时清除标志位,再检测其中断标志位发现返回 false
证明了结论1:抛出 InterruptedException
之前会先清除其中断标志位。
static volatile boolean flag = true; public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { while (flag) { } }); t1.start(); t1.interrupt(); System.out.println(t1.isInterrupted()); //true flag = false; t1.join(); System.out.println(t1.isInterrupted()); //false } 复制代码
interrupted
不会中断正在运行的线程,但会设置其中断标志位,因此第 10
行返回 true
。由第 13
行的输出我们还可以的处一个新的结论:对 终止状态的线程 调用 isInterrupted
始终会返回 false
。
这是一个静态方法,用来检测 当前线程 是否被中断过,但与 isInterrupted
不同,它的调用会导致当前线程的中断标志位被清除且 isInterrupted
是实例方法。也就是说如果连续两次调用 Thread.interrupted
,第二次一定会返回 false
。
static volatile boolean flag = true; public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { while (flag) { } System.out.println(Thread.currentThread().isInterrupted()); //true System.out.println(Thread.interrupted()); //true System.out.println(Thread.interrupted()); //false }); t1.start(); t1.interrupt(); flag = false; } 复制代码
Thread
有一个弃用的方法 stop
,弃用的原因是这个方法是类似于 linux
中 kill -9
的方式强制立即终止线程,不给线程任何喘息的机会,这意味着执行了一半的程序突然没后文了,如果线程打开了I/O、数据库连接等资源时将无法及时释放他们。
守护线程在其父线程终结时也会随之终结,因此我们可以通过将线程设置为守护线程,通过控制其父线程的终结时间来间接终结他:
public class ThreadService { private Thread executeThread; private volatile boolean finished; public void execute(Runnable task) { executeThread =new Thread(() -> { Thread t = new Thread(() -> { task.run(); }); t.setDaemon(true); t.start(); try { t.join(); finished = true; } catch (InterruptedException e) { System.out.println("task execution was interrupted"); } }); executeThread.start(); } public void shutdown(long millis) { long base = System.currentTimeMillis(); long now = 0; while (!finished) { now = System.currentTimeMillis() - base; if (now >= millis) { System.out.println("task execution time out, kill it now"); executeThread.interrupt(); break; } try { Thread.sleep(10); } catch (InterruptedException e) { System.out.println("was interrupted when shutdown"); } } finished = true; } } 复制代码
在上述代码中,可以通过给 shutdown
传入一个 task
执行时限,要求它在 millis
时间内执行完,如果超出这个时间则视为任务执行异常,通过终止其父线程来终止它。如果它执行正常,在 millis
时间内返回了,那也会导致父线程的结束, shutdown
也能通过轮询 finished
状态来感知任务执行结束。
public class ThreadCloseGraceful implements Runnable{ private volatile boolean stop = false; @Override public void run() { while (true) { if (stop) { break; } // to do here } } public void shutdown() { stop = true; } } 复制代码
这种方式的要点是,共享状态变量必须声明为 volatile
,这样执行线程才能及时感知到 shutdown
命令。
通过轮询线程的中断标志位来感知外界的中断命令。
public class ThreadCloseGraceful extends Thread{ @Override public void run() { while (true) { if (Thread.interrupted()) { break; } // to do here } } public void shutdown() { this.interrupt(); } } 复制代码
resume/suspend
被弃用的主要原因是因为 suspend
将线程挂起时并不会释放其所持有的共享资源,如果一个线程持有一个甚至多个锁后执行 suspend
,那么将会导致所有等待该锁或这些锁释放的线程陷入长久的阻塞状态。如果碰巧将要 resume
这个被挂起线程的线程事先也有获取这些锁的需求,那么 resume
线程也会被阻塞,这可能导致 suspend
线程将无人唤醒,这些线程都将陷入永久阻塞。
因此在并发场景下,对于临界区来说, suspend
和 resume
是线程对立的,无论是谁先进入临界区,都将导致这两个线程甚至是多个线程陷入死锁。
synchronized
能够保证被同步的代码在多线程环境下的执行是串行化的。
this
对象的 monitor
(也就是我们通常所说的锁,术语是管程),一个 monitor
同一个时刻只能被一个线程持有,获取失败将陷入阻塞状态(BLOCKED),直到该锁被释放(持有锁的线程退出该方法/临界区)后该线程将加入到新一轮的锁争取之中 Class
对象的 monitor
,锁获取-释放逻辑和实例方法的相同。 synchronized
括号后显式指定一个同步对象,锁获取-释放逻辑依然相同 获取锁失败时陷入阻塞、锁释放时相应阻塞在该锁上的线程会被唤醒,这会引起线程由用户态到内核态的切换,时间开销较大,甚至大于临界区代码的实际执行开销。因此原则上要减少 synchronized
的使用,但是随着JDK的升级,自旋锁、适应性自旋、锁消除、锁粗化、偏向锁、轻量级锁等优化的引入(详见《深入理解Java虚拟机(第二版)》高并发章节), synchronized
的开销实际上也没那么大了。
可重入,如果当前线程已持有某个对象的 monitor
,在再次进入需要该 monitor
的临界区时,可直接进入而无需经过锁获取这一步。
一个线程可同时持有多个 monitor
。 注意 ,这一操作容易导致死锁的发生,以下代码就模仿了这一情景:
public class DeadLock { public static Object lock1 = new Object(); public static Object lock2 = new Object(); public static void main(String[] args) { IntStream.rangeClosed(0,19).forEach(i->{ if (i % 2 == 0) { new Thread(() -> m1()).start(); } else { new Thread(() -> m2()).start(); } }); } public static void m1() { synchronized (lock1) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lock2) { System.out.println(Thread.currentThread().getName()); } } } public static void m2() { synchronized (lock2) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lock1) { System.out.println(Thread.currentThread().getName()); } } } } 复制代码
上述代码有很大的几率陷入死锁,但是并不会有任何提示信息。我们可以通过 jps/jstack
查看一下线程状态:
C:/Users/zaw>jps 2864 5664 Jps 4072 Launcher 2172 DeadLock C:/Users/zaw>jstack 2172 "Thread-1" #12 prio=5 os_prio=0 tid=0x0000000018c71800 nid=0x8f0 waiting for monitor entry [0x00000000196cf000] java.lang.Thread.State: BLOCKED (on object monitor) at deadlock.DeadLock.m2(DeadLock.java:47) - waiting to lock <0x00000000d6081098> (a java.lang.Object) - locked <0x00000000d60810a8> (a java.lang.Object) at deadlock.DeadLock.lambda$null$1(DeadLock.java:21) at deadlock.DeadLock$$Lambda$3/1989780873.run(Unknown Source) at java.lang.Thread.run(Thread.java:748) "Thread-0" #11 prio=5 os_prio=0 tid=0x0000000018c70800 nid=0x944 waiting for monitor entry [0x00000000195cf000] java.lang.Thread.State: BLOCKED (on object monitor) at deadlock.DeadLock.m1(DeadLock.java:34) - waiting to lock <0x00000000d60810a8> (a java.lang.Object) - locked <0x00000000d6081098> (a java.lang.Object) at deadlock.DeadLock.lambda$null$0(DeadLock.java:19) at deadlock.DeadLock$$Lambda$2/999966131.run(Unknown Source) at java.lang.Thread.run(Thread.java:748) 复制代码
笔者省去了其他线程的状态,分析清楚这一对线程死锁的原因之后,剩下的18个线程是类似的。首先第 9
和 18
两行表明两个线程表明线程因为获取不到对象的锁而陷入 BLOCKED
状态。 11~12
行详细的指出 Thread-1
正在等待获取内存地址为 0x00000000d6081098
的一个对象的锁,且已持有了内存地址为 0x00000000d60810a8
的对象的锁。 20~21
行同样的指出 Thread-0
等在 0x00000000d60810a8
对象上,而已获取了 0x00000000d6081098
对象的锁。可见他们都在无脑阻塞地等待对方释放锁,于是就陷入了死锁。
在 jstack
罗列JVM各个线程状态之后还为我们分析了死锁:
Found one Java-level deadlock: ============================= "Thread-19": waiting to lock monitor 0x0000000018c5a398 (object 0x00000000d60810a8, a java.lang.Object), which is held by "Thread-1" "Thread-1": waiting to lock monitor 0x0000000018c58d98 (object 0x00000000d6081098, a java.lang.Object), which is held by "Thread-0" "Thread-0": waiting to lock monitor 0x0000000018c5a398 (object 0x00000000d60810a8, a java.lang.Object), which is held by "Thread-1" 复制代码
我们还可以使用JDK内置的JVM性能监控工具JConsole更直观地分析线程状态:
C:/Users/zaw>jps 2864 6148 Jps 4072 Launcher 2172 DeadLock C:/Users/zaw>jconsole 2172 复制代码
打开的工具窗口会询问一下是否信任不安全的连接,点击是方可进入。进入后通过线程面板能够查看各线程状态,点击死锁分析,它会为我们分析出当前JVM进程中哪些线程陷入了死锁以及原因是什么:
要想了解为什么线程在执行临界区(包括同步方法和同步代码块)时会有锁获取-释放这一机制,那我们就要知道这个关键字在编译后生成了怎样的JVM指令。
首先我们分别编写一个同步方法和同步块,分别测试 synchronized
在字节码层面会产生什么样的效果:
public class SynchronizedTest{ public synchronized void m1(){ System.out.println("sync method"); } Object lock = new Object(); public void m2(){ synchronized(lock){ System.out.println("sync block"); } } } 复制代码
然后使用 javac
编译,由于编译后的字节码文件是二进制字节流,我们查看不方便(JVM查看方便),因此还需要使用 javap
将其转换成我们能看懂的友好内容(字节码格式详见《深入理解Java虚拟机(第二版)》中的Class文件格式),为了照顾对这部分不熟悉的读者,笔者做了删减,仅关注 synchronized
产生的效果:
C:/Users/zaw>cd Desktop C:/Users/zaw/Desktop>javac SynchronizedTest.java C:/Users/zaw/Desktop>javap -verbose SynchronizedTest.class public class SynchronizedTest minor version: 0 major version: 52 flags: ACC_PUBLIC, ACC_SUPER Constant pool: # 这里省去了常量池部分 { java.lang.Object lock; descriptor: Ljava/lang/Object; flags: public synchronized void m1(); descriptor: ()V flags: ACC_PUBLIC, ACC_SYNCHRONIZED Code: stack=2, locals=1, args_size=1 0: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream; 3: ldc #5 // String sync method 5: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V 8: return LineNumberTable: line 4: 0 line 5: 8 public void m2(); descriptor: ()V flags: ACC_PUBLIC Code: stack=2, locals=3, args_size=1 0: aload_0 1: getfield #3 // Field lock:Ljava/lang/Object; 4: dup 5: astore_1 6: monitorenter 7: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream; 10: ldc #7 // String sync block 12: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V 15: aload_1 16: monitorexit 17: goto 25 20: astore_2 21: aload_1 22: monitorexit 23: aload_2 24: athrow 25: return Exception table: from to target type 7 17 20 any 20 23 20 any } SourceFile: "SynchronizedTest.java" 复制代码
尽管上述代码看起来很长,但是我们只需要关注两个点:
20
行和 33
行,会发现同步方法 m1
比非同步方法 m2
的 flags
多了一个 ACC_SYNCHRONIZED
,因此线程在进入同步方法时,若检测到该方法的 flags
包含 ACC_SYNCHRONIZED
,那么该线程将尝试获取 this
或该方法所在类的 Class
实例(这取决于方法是实例方法还是静态方法),即同步方法的 synchronized
语义是通过方法标志位 ACC_SYNCHRONIZED
来实现的,同步过程是隐式的(同步对象由JVM来指定,锁释放由JVM来完成) 40~49
行,发现它给我们的同步块内的内容 System.out.println("sync block")
的前后分别加上了一个 monitorenter
和一个 monitorexit
,这就对应锁获取-释放,这种同步语义是显式的,同步对象和临界区由我们来控制,相对同步方法灵活一些。 还有一点值得注意的是上述的第 49
行代码为什么又出现了一个 monitorexit
?这是为了保证在同步代码块执行过程中如果抛出了异常,线程持有的锁也能够在异常抛出前被释放掉(不至于影响到其他正在等待锁获取的线程)。
经过上述的分析,对于锁的理解应该有了更深刻的认识。那么如何避免死锁呢?陷入死锁的线程既不会工作还要持续占用系统资源,我们的应用程序应当避免发生这种情况。
Lock
的 tryLock(millis)
超时等待机制,一旦发现等待时间过长,那么就没必要一直等下去,可以先去完成其他任务之后再来尝试获取锁。后面我们将针对这种情况手写一个等待超时就能自动返回的锁。 弃用 suspend/resume
之后,官方建议使用 wait/notify
代替。与 suspend/resume
的定位不同, wait/notify
实现于 Object
,是所有对象都能够调用的方法。且调用对象的 wait/notify
前必须先获取该对象的 monitor
。
以下是官方对 wait(millis)
给出的说明:
* This method causes the current thread (call it <var>T</var>) to * place itself in the wait set for this object and then to relinquish * any and all synchronization claims on this object. Thread <var>T</var> * becomes disabled for thread scheduling purposes and lies dormant * until one of four things happens: notify, notifyAll, interrupt, time out 复制代码
调用一个对象 obj
的 wait
方法将会导致当前执行线程被放入 obj
的等待队列中( wait set
,线程休息室),并且释放该线程通过 synchronized
已持有的所有锁,然后释放CPU的执行权陷入等待,直到被 notify/notifyAll
通知到、被其他线程调用 interrupt
中断或者等待时间已超过所设置的时限时才会进入就绪状态重新争取CPU执行权。
这里需要注意的是并非线程被 notify/notifyAll
唤醒了就能立即从 wait
返回,被唤醒后只会使线程进入就绪状态争取CPU执行权,只有获取到CPU执行权并且获取到所有 wait
前释放的锁后才能从 wait
返回,否则线程仍将阻塞在 wait
上。
使用 wait/notify
,我们能够实现线程间的通信。
官方给出了 wait/notify
使用的经典范式:
synchronized (obj) { while (<condition does not hold>) obj.wait(); ... // Perform action appropriate to condition } 复制代码
使用 while
而不使用 if
的原因就是被唤醒并从 wait
返回的线程应该不断检查它所关注的条件,因为被唤醒可能并不是由于另一个线程为了通知该线程而有针对性的唤醒该线程,这一点从 notify
的随机唤醒、 notifyAll
唤醒全部、被唤醒的线程在同一时刻只有一个能够抢到锁,可以看出真正能够从 wait
返回的线程具有很大的不确定性。由于每个线程的关注的条件不同,所以需要轮询判断条件是否成立,方可从 while
中退出来。
由此我们可以利用 wait/notify
实现生产者-消费者通信模型:
public class ClassicForm { private static String message; private static Object lock = new Object(); public static void main(String[] args) { Thread consumer = new Thread(() -> { while(true){ synchronized (lock) { while (message == null) { // wait for producing try { lock.wait(); } catch (InterruptedException e) { System.out.println("consumer was broken"); return; } } System.out.println("CONSUMER receive message : " + message); message = null; lock.notify(); } } }); Thread producer = new Thread(() -> { synchronized (lock) { for(int i = 0 ; i < 100 ; i++){ while (message != null) { // wait for consuming try { lock.wait(); } catch (InterruptedException e) { System.out.println("producer was broken"); return; } } message = "please the order, order-id is " + i; lock.notify(); System.out.println("PRODUCER send the message : " + message); } } }); consumer.start(); producer.start(); } } 复制代码
你会发现这里的 message
即使没有加 volatile
,生产者每次所做的更改消费者都能准确获取到。这是由 synchronized
的 unlock
指令和JMM(Java内存模型)共同决定的,JMM将在后文中详细展开。
上述代码有一个明显的缺陷,那就是如果生产者生产消息很慢,那么消费者就会一直 wait
直到有新的消息到来。这样就没有充分利用消费者线程所占用的资源。能否为消费者的等待设置一个限时?在等待时长超过限时之后就不 wait
了,先去处理其他任务,稍后再来监听生产者生产的消息。下段代码简单模拟了这一场景:
public class WaitTimeoutModel { private static String message; private static Object lock = new Object(); private static final long MAX_WAIT_LIMIT = 1000; public static void main(String[] args) { Thread consumer = new Thread(() -> { synchronized (lock) { while (true) { long base = System.currentTimeMillis(); long now = 0; while (message == null) { now = System.currentTimeMillis() - base; if (now >= MAX_WAIT_LIMIT) { break; // exit wait } try { lock.wait(MAX_WAIT_LIMIT); } catch (InterruptedException e) { System.out.println("consumer was broken"); } } if (message == null) { System.out.println("CONSUMER exit wait, and do other things"); try { // simulate do other thing Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } } else { System.out.println("CONSUMER receive the message : " + message); message = null; } } } }); Thread producer = new Thread(() -> { // prepare message is very slowly try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } // notify consumer synchronized (lock) { message = "please handle the order, order-id is 5454656465"; lock.notify(); System.out.println("PRODUCER send the message : " + message); } }); consumer.start(); producer.start(); } } 复制代码
要点就是在经典范式的基础之上,在轮询状态变量的过程中增加了一个等待时长判断(第 14~17
行),如果发现超过了给定的时限(这里是 MAX_WAIT_LIMIT
),那么就不再等待,去做其他事情(第 25~30
行),相反如果在 wait(MAX_WAIT_LIMIT)
期间由于生产者的提醒被唤醒,那么同样会跳出轮询(生产者通常生产出消息后才唤醒消费者)进入到第 32~33
行去消费消息。但无论是哪一种情况,都算是消费者一个逻辑执行单元的结束。由于消费者通常是24小时运行监听的( while(true)
),因此在每一个执行单元结束后将重置用来计算等待时长的 base
和 now
(第 11~12
行)。
运行效果如下:
CONSUMER exit wait, and do other things PRODUCER send the message : please handle the order, order-id is 5454656465 CONSUMER receive the message : please handle the order, order-id is 5454656465 CONSUMER exit wait, and do other things CONSUMER exit wait, and do other things CONSUMER exit wait, and do other things ... 复制代码
超时等待模型被广泛用于并发设计模式以及JUC包,需要好好理解。
wait
是 Object
中的实例方法且调用前需要获取实例对象的锁, sleep
是 Thread
中的静态方法可直接调用 sleep
不会释放当前线程所持有的锁,而 wait
则会释放当前线程持有的所有锁 sleep
和 wait
都会使线程进入 TIMED-WAITING
状态释放CPU执行权,但调用 sleep
的线程在设定的时限后能够自动返回,而 wait(millis)
在超时后需要先获取对象的锁才能返回、 wait(0)
更是需要等待被唤醒并获取到锁后才能返回。 下段代码模拟了生产者-消费者模型下两个生产者和两个消费者同时工作导致程序假死的一个案例
public class ProducerConsumer { private String message; public synchronized void produce() { while (message != null) { try { this.wait(); Thread.sleep(1000); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " was broken"); return; } } message = "time is " + new Date(System.currentTimeMillis()); this.notify(); System.out.println(Thread.currentThread().getName() + " send the message : " + message); } public synchronized void consume() { while (message == null) { try { this.wait(); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " was broken"); return; } } System.out.println(Thread.currentThread().getName() + " recv the message : " + message); message = null; this.notify(); } public static void main(String[] args) { ProducerConsumer pc = new ProducerConsumer(); Stream.of("p1", "p2").forEach(name -> { new Thread(() -> { while (true) { pc.produce(); } }, name).start(); }); Stream.of("c1", "c2").forEach(name -> { new Thread(() -> { while (true) { pc.consume(); } }, name).start(); }); } } 复制代码
输出如下:
p1 send the message : time is Fri Feb 01 14:06:26 CST 2019 c2 recv the message : time is Fri Feb 01 14:06:26 CST 2019 p2 send the message : time is Fri Feb 01 14:06:26 CST 2019 c2 recv the message : time is Fri Feb 01 14:06:26 CST 2019 p1 send the message : time is Fri Feb 01 14:06:27 CST 2019 # 至此,四个线程陷入永久wait 复制代码
笔者也曾异或良久,一个Producer生产了消息会通知一个Consumer消费,且后者消费完后又会通知一个等待生产的Producer,没问题啊!怎么会都陷入 wait
呢?
这是因为我们陷入了一个惯性思维,学生产者-消费者模式学着学着就总以为生产者生产了消息会通知消费者、消费者消费完了会通知生产者。我们忘记了 notify
的本质: notify
会从对象的 wait set
中 随机 选取一个线程唤醒。我们再来理性地分析一下上述代码:第 17
行的 notify
一定会唤醒对象的 wait set
上的一个消费者线程吗?不一定吧!假设某一时刻 p1
抢到了锁,而 p2,c1,c2
均阻塞在 wait
上,那么 p1
生产消息后调用的 notify
有没有可能唤醒的是 p2
呢(如此的话,被唤醒的 p2
发现 p1
生产的消息没有被消费仍然会陷入 wait
,这样的话四个线程就都陷入 wait
了,没有其他线程来唤醒他们。类似的,消费者消费完消息后唤醒的也可能是另一个在 wait
的消费者,这样的唤醒做的是无用功)。就是因为 notify
的 不确定性 ,从而导致上述代码并没有按照生产者-消费者的套路来,最后四个线程都陷入了 wait
且没有线程去唤醒他们。
但是如果将第 17,34
行的 notify
改成 notifyAll
就不会死锁了。这是因为 notifyAll
会唤醒所有阻塞在该对象的 wait
上的线程。因此 p1
生产消息后如果调用的是 notifyAll
,那么 p2,c1,c2
都会被唤醒并争取该对象的 monitor
,这时即使 p2
先抢到了,它也会由于消息未被消费而进入 wait
进而释放锁并唤醒等待该锁的 c1,c2
,所以 p1
的 notifyAll
最终一定会导致其中一个消费者从 wait
返回,这样即使是多Producer多Consumer,程序也能跑通了。
p2 send the message : time is Fri Feb 01 14:30:39 CST 2019 c1 recv the message : time is Fri Feb 01 14:30:39 CST 2019 p1 send the message : time is Fri Feb 01 14:30:39 CST 2019 c2 recv the message : time is Fri Feb 01 14:30:39 CST 2019 p2 send the message : time is Fri Feb 01 14:30:40 CST 2019 c1 recv the message : time is Fri Feb 01 14:30:40 CST 2019 p1 send the message : time is Fri Feb 01 14:30:41 CST 2019 c2 recv the message : time is Fri Feb 01 14:30:41 CST 2019 p2 send the message : time is Fri Feb 01 14:30:42 CST 2019 c1 recv the message : time is Fri Feb 01 14:30:42 CST 2019 ... 复制代码
多线程下的生产者-消费者模型,要使用 notifyAll
上文说到 synchronized
有一个严重的缺陷就是,如果持有锁的线程迟迟不释放锁(临界区的执行时间过长),那么等待该锁的其他线程就会一直阻塞住,直到该锁被释放。那么能否实现这样一种机制呢:给等待锁释放的线程设置一个时限,如果超过了该时限,那么就认为锁一时半会儿不会被释放,于是可以让线程利用这段空闲执行其他的任务而非一直阻塞着什么事都不做。
现在我们可以使用 wait/notify
的经典范式实现 synchronized
语义,使用其超时等待模型实现限时等待语义。首先定义一个同步对象接口,即 Lock
:
public interface Lock { void lock() throws InterruptedException; void unlock(); void lock(long millis) throws InterruptedException, TimeoutException; Collection<Thread> getBlockedThread(); int getBlockedCount(); } 复制代码
接着实现一个简单的用一个布尔变量表示同步状态的 BooleanLock
:
public class BooleanLock implements Lock { private volatile boolean isSync = false; //represent whether the lock is held or not. true is held, false is not held private Thread currentThread; //current thread which hold the lock private Collection<Thread> waitQueue; public BooleanLock() { this.isSync = false; this.currentThread = null; this.waitQueue = new ArrayList<>(); } @Override public synchronized void lock() throws InterruptedException { waitQueue.add(Thread.currentThread()); while (isSync) { // lock is held by other thread this.wait(); } // get the lock successfully waitQueue.remove(Thread.currentThread()); currentThread = Thread.currentThread(); isSync = true; //indicate the lock is held System.out.println(Thread.currentThread().getName() + " get the lock"); } @Override public void unlock() { // check the operator is the thread which is holding the lock if (Thread.currentThread() != currentThread) { return; } synchronized (this) { currentThread = null; isSync = false; this.notifyAll(); System.out.println(Thread.currentThread().getName() + " release the lock"); } } @Override public synchronized void lock(long millis) throws InterruptedException, TimeoutException { long base = System.currentTimeMillis(); long now = 0; waitQueue.add(Thread.currentThread()); while (isSync) { now = System.currentTimeMillis() - base; if (now >= millis) { throw new TimeoutException(); } this.wait(millis); } waitQueue.remove(Thread.currentThread()); currentThread = Thread.currentThread(); isSync = true; System.out.println(Thread.currentThread().getName() + " get the lock"); } @Override public Collection<Thread> getBlockedThread() { return Collections.unmodifiableCollection(waitQueue); } @Override public int getBlockedCount() { return waitQueue.size(); } } 复制代码
synchronized
语义: public static void main(String[] args) { BooleanLock lock = new BooleanLock(); Stream.of("t1", "t2", "t3", "t4", "t5").forEach(name -> { new Thread(() -> { try { lock.lock(); Thread.sleep(50); // to do thing } catch (InterruptedException e) { e.printStackTrace(); } finally { printBlockedThread(lock); lock.unlock(); } }, name).start(); }); } private static void printBlockedThread(BooleanLock lock) { System.out.print("There are " + lock.getBlockedCount() + " threads waiting on the lock: "); lock.getBlockedThread().forEach(thread -> System.out.print(thread.getName() + " ")); System.out.println(); } 复制代码
运行结果:
t1 get the lock There are 4 threads waiting on the lock: t4 t3 t2 t5 t1 release the lock t5 get the lock There are 3 threads waiting on the lock: t4 t3 t2 t5 release the lock t4 get the lock There are 2 threads waiting on the lock: t3 t2 t4 release the lock t2 get the lock There are 1 threads waiting on the lock: t3 t2 release the lock t3 get the lock There are 0 threads waiting on the lock: t3 release the lock 复制代码
需要注意的是 unlock
必须写在 finally
中确保锁一定会被释放,而 synchronized
同步块执行时抛出异常JVM会通过异常表(详见《深入理解Java虚拟机(第二版)》Class文件结构一章中的方法表的描述)在异常抛出时释放当前线程所持有的全部的锁。
上例只是实现了与 synchronized
同样的功能,接着我们测试一下限时获取锁的功能,这是 synchronized
无法做到的。
public static void main(String[] args) { BooleanLock lock = new BooleanLock(); Stream.of("t1", "t2", "t3", "t4", "t5").forEach(name -> { new Thread(() -> { try { lock.lock(1000); Thread.sleep(2000); // the task is very time-consuming } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " was interrupted"); } catch (TimeoutException e) { System.out.println(Thread.currentThread().getName() + " get lock time out, so do other thing first and then get the lock again"); } finally { lock.unlock(); } }, name).start(); }); } 复制代码
输出如下:
t1 get the lock t2 get lock time out, so do other thing first and then get the lock again t3 get lock time out, so do other thing first and then get the lock again t4 get lock time out, so do other thing first and then get the lock again t5 get lock time out, so do other thing first and then get the lock again t1 release the lock 复制代码
在使用一些开源框架时,比如 Tomcat
,在关闭时仍会有些日志打印出来,这些日志通常是释放应用程序资源的信息。也就是说我们点击 terminate
的事件被应用程序捕获到后,应用程序并非直接终止而是先释放一些珍贵资源。这就是通过设置钩子函数做到的,它会在应用程序主线程终止前被调用。对应 API
是 Runtime.getRuntime().addShutdownHook(thread)
。
下面我将在 linux
上演示钩子函数的用处。 MyApp.java
表示我的应用程序:
public class MyApp{ public static void main(String[] args){ Runtime.getRuntime().addShutdownHook( new Thread(() -> { //release resource here, like socket,connection etc System.out.println("releasing resources..."); }) ); while(true){ // start a service } } } 复制代码
通过 addShutdownHook
设置的线程将在 main
线程被外界中断时调用,比如我在运行 java MyApp
时按下了 CTRL C
[root@izm5ecexclrsy1gmkl4bgdz ~]# javac MyApp.java [root@izm5ecexclrsy1gmkl4bgdz ~]# java MyApp ^Creleasing resources... 复制代码
又比如后台运行 MyApp
,通过 kill pid
终止它:
[root@izm5ecexclrsy1gmkl4bgdz ~]# java MyApp & [1] 14230 [root@izm5ecexclrsy1gmkl4bgdz ~]# jps 14240 Jps 14230 MyApp [root@izm5ecexclrsy1gmkl4bgdz ~]# kill 14230 [root@izm5ecexclrsy1gmkl4bgdz ~]# releasing resources... 复制代码
但是 kill -9
则不会触发钩子程序:
[root@izm5ecexclrsy1gmkl4bgdz ~]# java MyApp & [1] 14264 [root@izm5ecexclrsy1gmkl4bgdz ~]# ps aux|grep java root 14264 96.3 1.4 2460724 27344 pts/0 Sl 16:03 0:09 java MyApp root 14275 0.0 0.0 112660 964 pts/0 R+ 16:03 0:00 grep --color=auto java [root@izm5ecexclrsy1gmkl4bgdz ~]# kill -9 14264 [root@izm5ecexclrsy1gmkl4bgdz ~]# ps aux|grep java root 14277 0.0 0.0 112660 964 pts/0 R+ 16:03 0:00 grep --color=auto java [1]+ Killed java MyApp 复制代码
Thread.currentThread().getStackTracke()
获取当前线程执行到当前方法时栈中的所有栈帧信息,返回 StackTraceElement[]
,一个元素就代表一个方法栈帧,可以通过它得知方法所属的类、方法名、方法执行到了第几行
public static void main(String[] args) { m1(); } public static void m1() { m2(); } private static void m2() { m3(); } private static void m3() { Arrays.asList(Thread.currentThread().getStackTrace()).stream() .filter( //过滤掉native方法 stackTraceElement -> !stackTraceElement.isNativeMethod() ).forEach( stackTraceElement -> { System.out.println(stackTraceElement.getClassName() + ":" + stackTraceElement.getMethodName() + "():" + stackTraceElement.getLineNumber()); } ); } 复制代码
由于 Runnable
接口的 run
方法并未声明抛出任何异常,因此在重写 run
时,所有 checked exception
都需要我们手动解决。但是如果抛出 unchecked exception
呢, 1/0
就是典型的例子,我们如何捕获他?
通过 thread.setUncheckedExceptionHandler()
能够做到这一点:
public static final int A = 1; public static final int B = 0; public static void main(String[] args) { Thread thread = new Thread(() -> { int i = A / B; }); thread.setUncaughtExceptionHandler((t, e) -> { // t -> the ref of the thread, e -> exception System.out.println(e.getMessage()); /// by zero }); thread.start(); } 复制代码
线程组代表一个线程的集合,一个线程组也可以包含其他线程组,线程组可以以树形结构展开。
在JVM启动时,会创建一个名为 main
的线程运行 main
函数和一个名为 main
的线程组, main
线程的线程组是 main
线程组:
public static void main(String[] args) { System.out.println(Thread.currentThread().getName()); //main System.out.println(Thread.currentThread().getThreadGroup().getName()); //main } 复制代码
创建线程时,如果没有为该线程显式指定线程组,那么该线程将会拿他的父线程的线程组作为自己的线程组。
如果创建线程组时没有显式指定其父线程组,将会拿当前线程的线程组作为其父线程组
public static void main(String[] args) { Thread t1 = new Thread(() -> { // }); System.out.println(t1.getThreadGroup().getName()); //main ThreadGroup threadGroup = new ThreadGroup("MyThreadGroup"); Thread t2 = new Thread(threadGroup, () -> { // }); System.out.println(t2.getThreadGroup().getName()); //MyThreadGroup System.out.println(t2.getThreadGroup().getParent().getName()); //main } 复制代码
threadGroup.list()
方法能够打印线程组中存活线程的信息,可用于 debug
ThreadGroup threadGroup = new ThreadGroup("MyThreadGroup"); new Thread(threadGroup, () -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(threadGroup, () -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); threadGroup.list(); java.lang.ThreadGroup[name=MyThreadGroup,maxpri=10] Thread[Thread-0,5,MyThreadGroup] Thread[Thread-1,5,MyThreadGroup] 复制代码
更多 API
大家可查看官方文档。
工作线程应该不断轮询任务队列是否有任务可做,有则拿来执行,无则等待外界提交。然后还要为外界提供终止当前线程的 stop
,其采用的是利用共享状态变量的方式并使用 volatile
修饰使得外界的终止操作立即对当前工作线程可见。
public class Worker implements Runnable { private volatile boolean stop; private LinkedList<Runnable> taskQueue; private Thread currentThread; public Worker(LinkedList<Runnable> taskQueue) { this.taskQueue = taskQueue; } @Override public void run() { currentThread = Thread.currentThread(); Runnable task = null; OUTER: while (!stop) { synchronized (taskQueue) { while (taskQueue.isEmpty()) { try { taskQueue.wait(); } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName()+" has been interrupted"); break OUTER; } } task = taskQueue.removeFirst(); taskQueue.notifyAll(); } if (task != null) { task.run(); } } } public void interrupt() { if (currentThread != null) { currentThread.interrupt(); } } public void stop() { stop = true; } } 复制代码
public class ThreadPool { private static final int DEFAULT_THREAD_COUNT = 10; private int threadCount; private LinkedList<Worker> workQueue; private LinkedList<Runnable> taskQueue; public ThreadPool() { this(DEFAULT_THREAD_COUNT); } public ThreadPool(int size) { this.threadCount = size; this.workQueue = new LinkedList<>(); this.taskQueue = new LinkedList<>(); init(size); } //创建并启动count个线程 private void init(int count) { if (count <= 0) { throw new IllegalArgumentException("thread pool size must greater than zero"); } for (int i = 0; i < count; i++) { Worker worker = new Worker(taskQueue); Thread thread = new Thread(worker, "ThreadPool-" + i); thread.start(); workQueue.add(worker); } } public void execute(Runnable task) { synchronized (taskQueue) { taskQueue.add(task); taskQueue.notifyAll(); } } public int getThreadCount() { return threadCount; } public int getTaskCount() { return taskQueue.size(); } //对wait中的线程调用stop,他也无法轮询该变量而退出循环 //因此对于wait中的工作线程直接中断它,而正在执行的线程则等他自己轮询到stop而退出 public void shutdown() { synchronized (taskQueue) { for (Worker worker : workQueue) { worker.stop(); worker.interrupt(); } } System.out.println("thread pool destroyed"); } } 复制代码
public class ThreadPoolTest { public static void main(String[] args) throws InterruptedException { ThreadPool threadPool = new ThreadPool(); for (int i = 0; i < 40; i++) { int number = i; threadPool.execute(()->{ System.out.println(Thread.currentThread().getName() + "start execute task-" + number); try { Thread.sleep(new Random(System.currentTimeMillis()).nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } }); } Thread.sleep(5000); threadPool.shutdown(); } } 复制代码
线程池的工作队列不应该无限大,如果不注意的或可能会导致OOM,因此在任务队列中的任务数到达一定数目时应对提交的任务采取拒绝策略。
这里应该用策略模式,策略接口:
public interface RefusePolicy { void refuse() throws Exception; } 复制代码
简单任务数过大就抛异常的策略:
public class DiscardRefusePolicy implements RefusePolicy { public class TaskExceededException extends Exception { public TaskExceededException(String message) { super(message); } } @Override public void refuse() throws TaskExceededException { throw new TaskExceededException("task has exceeded the taskSize of thread poll"); } } 复制代码
改造 execute
方法:
private static final int DEFAULT_THREAD_COUNT = 10; private static final RefusePolicy DEFAULT_REFUSE_POLICY = new DiscardRefusePolicy(); private static final int DEFAULT_TASK_SIZE = 200; private int threadCount; private LinkedList<Worker> workQueue; private LinkedList<Runnable> taskQueue; private int maxTaskSize; private RefusePolicy refusePolicy; public ThreadPool() { this(DEFAULT_THREAD_COUNT, DEFAULT_TASK_SIZE, DEFAULT_REFUSE_POLICY); } public ThreadPool(int size, int maxTaskSize, RefusePolicy refusePolicy) { this.threadCount = size; this.maxTaskSize = maxTaskSize; this.workQueue = new LinkedList<>(); this.taskQueue = new LinkedList<>(); this.refusePolicy = refusePolicy; init(size); } public void execute(Runnable task) throws Exception { synchronized (taskQueue) { if (taskQueue.size() >= maxTaskSize) { refusePolicy.refuse(); return; } taskQueue.add(task); taskQueue.notifyAll(); } } 复制代码
public static void main(String[] args) throws InterruptedException { ThreadPool threadPool = new ThreadPool(); for (int i = 0; i < 300; i++) { int number = i; try { threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() + "start execute task-" + number); try { Thread.sleep(new Random(System.currentTimeMillis()).nextInt(100)); } catch (InterruptedException e) { e.printStackTrace(); } }); } catch (Exception e) { System.out.println("task-" + i + " execution error : " + e.getMessage()); } } Thread.sleep(5000); threadPool.shutdown(); } 复制代码