下面将从以下几个方面浅析ReentrantLock:
重入锁 ReentrantLock,顾名思义,就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。代码示例如下:
public class ReenterLock implements Runnable{ private ReentrantLock lock = new ReentrantLock(); private int i = 0; @Override public void run() { for (int j = 0; j < 1000000 ; j++) { //获取锁 lock.lock(); try{ i++; } finally { //释放锁 lock.unlock(); } } } public static void main(String[] args) throws InterruptedException { ReenterLock reenterLock = new ReenterLock(); Thread t1 = new Thread(reenterLock); Thread t2 = new Thread(reenterLock); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(reenterLock.i); } } 复制代码
重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞:
代码示例如下:
lock.lock(); lock.lock(); try{ i++; } finally { //释放锁 lock.unlock(); lock.unlock(); } 复制代码
ReentrantLock处理死锁的手段,说白了也是ReentrantLock的重要特性
首先介绍下死锁的大致概念:
两个或多个进程在执行过程中,因争夺资源而造成的一种相互等待的现象,如无外力作用,它们将无法继续进行下去
下面举一个 Synchronized下的死锁例子:
public class DeadLockExample implements Runnable{ private boolean flag; //锁1 private static Object lock1 = new Object(); //锁2 private static Object lock2 = new Object(); public DeadLockExample(boolean flag) { this.flag = flag; } @Override public void run() { if (flag) { synchronized (lock1) { System.out.println("线程 : " + Thread.currentThread().getName() + " get lock1"); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //尝试获取lock2 System.out.println("线程 : "+ Thread.currentThread().getName()+" waiting get lock2"); synchronized (lock2) { System.out.println("线程 : " + Thread.currentThread().getName() + " get lock1"); } } } else { synchronized (lock2) { System.out.println("线程 : " + Thread.currentThread().getName() + " get lock2"); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //尝试获取锁1 System.out.println("线程 : "+ Thread.currentThread().getName()+" waiting get lock1"); synchronized (lock1) { System.out.println("线程 : " + Thread.currentThread().getName() + " get lock1"); } } } } public static void main(String[] args) { Thread t1 = new Thread(new DeadLockExample(true)); t1.setName("A"); Thread t2 = new Thread(new DeadLockExample(false)); t2.setName("B"); t1.start(); t2.start(); } } 复制代码
输出结果:
线程 : A get lock1 线程 : B get lock2 线程 : A waiting get lock2 线程 : B waiting get lock1 复制代码
可以看出线程 A在等待获取锁2,而线程 B在等待获取锁1,两个线程相互等待这样就形成了死锁
而ReentranLock 与 Synchronized 一样是一种同步机制,但是 ReentranLock 提供了 比 synchronized 更强大、更灵活的锁机制, 可以减少死锁发生的概率 。
ReentranLock 提供了两种方式来处理死锁:
使用 lock的 lockInteruptibly()
方法获取锁,如果出现死锁的话,调用线程的 interrupt来消除死锁,以上面那个例子为基础,改成 ReentrantLock的形式,代码如下
public class DeadLockWithReentrantLock implements Runnable{ private boolean flag; //锁1 private static ReentrantLock lock1 = new ReentrantLock(); //锁2 private static ReentrantLock lock2 = new ReentrantLock(); public DeadLockWithReentrantLock(boolean flag) { this.flag = flag; } @Override public void run() { try { if (flag) { //获取锁 lock1.lockInterruptibly(); System.out.println("线程 : " + Thread.currentThread().getName() + " get lock1"); TimeUnit.SECONDS.sleep(2); System.out.println("线程 : " + Thread.currentThread().getName() + " try to get lock2"); lock2.lockInterruptibly(); } else { lock2.lockInterruptibly(); System.out.println("线程 : " + Thread.currentThread().getName() + " get lock2"); TimeUnit.SECONDS.sleep(2); System.out.println("线程 : " + Thread.currentThread().getName() + " try to get lock1"); lock1.lockInterruptibly(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { //如果当前线程持有锁1,释放锁1 if (lock1.isHeldByCurrentThread()) { lock1.unlock(); } //如果当前线程持有锁2,释放锁2 if (lock2.isHeldByCurrentThread()) { lock2.unlock(); } System.out.println("线程 : " + Thread.currentThread().getName() + " 退出"); } } public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(new DeadLockWithReentrantLock(true)); t1.setName("A"); Thread t2 = new Thread(new DeadLockWithReentrantLock(false)); t2.setName("B"); t1.start(); t2.start(); TimeUnit.SECONDS.sleep(5); System.out.println("线程B设置中断标记,线程B将退出死锁状态"); t2.interrupt(); } } 复制代码
输出结果
线程 : A get lock1 线程 : B get lock2 线程 : B try to get lock1 线程 : A try to get lock2 线程B设置中断标记,线程B将退出死锁状态 java.lang.InterruptedException 线程 : B 退出 线程 : A 退出 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222) at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335) at com.pjmike.thread.reentrantlock.DeadLockWithReentrantLock.run(DeadLockWithReentrantLock.java:36) at java.lang.Thread.run(Thread.java:745) 复制代码
线程A获取锁1,线程B获取锁2,线程A尝试获取锁2,线程B尝试获取锁1,两个线程相互等待对方持有的锁,故形成了死锁。此时 main函数中,调用线程B的 interrupt
中断线程,线程B响应中断,最后两个线程都相继退出。真正完成任务只有线程A,线程B首先响应中断,放弃任务直接退出,释放资源。
下面来看下关键方法 lockInterruptibly
是如何实现的:
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } 复制代码
方法中调用队列同步器 AbstractQueuedSynchronizer
中的 acquireInterruptibly
方法
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } 复制代码
从上面的代码就可以看出如果当前线程被中断,就会抛出一个 InterruptedException
异常,我们之前的输出结果也是抛出一个中断异常,最终死锁被消除。关于队列同步器的部分,这里就不详细介绍了,可以参阅《Java并发编程的艺术》一书,书中对AQS的描述如下:
AQS 是用来构建锁或者其他同步组件的基础框架,它使用了一个 int成员变量表示同步状态,通过内置的FIFO 队列来完成资源获取线程的排队工作。
除了等待外部中断外,避免死锁还有一种方法就是限时等待。限时等待的方式是调用 tryLock
方法,还是先来看代码示例如下:
public class DeadLockWithReentrantLock2 implements Runnable{ private boolean flag; //锁1 private static ReentrantLock lock1 = new ReentrantLock(); //锁2 private static ReentrantLock lock2 = new ReentrantLock(); public DeadLockWithReentrantLock2(boolean flag) { this.flag = flag; } @Override public void run() { try { if (flag) { if (lock1.tryLock()) { System.out.println("线程 : " + Thread.currentThread().getName() + " get lock1"); TimeUnit.SECONDS.sleep(2); System.out.println("线程 : " + Thread.currentThread().getName() + " try to get lock2"); if (lock2.tryLock()) { System.out.println("线程 : " + Thread.currentThread().getName() + " already get lock2"); } } } else { if (lock2.tryLock()) { System.out.println("线程 : " + Thread.currentThread().getName() + " get lock2"); TimeUnit.SECONDS.sleep(2); System.out.println("线程 : " + Thread.currentThread().getName() + " try to get lock1"); if (lock1.tryLock()) { System.out.println("线程 : " + Thread.currentThread().getName() + " already get lock1"); } } } } catch (InterruptedException e) { e.printStackTrace(); } finally { //如果当前线程持有锁1,释放锁1 if (lock1.isHeldByCurrentThread()) { lock1.unlock(); } //如果当前线程持有锁2,释放锁2 if (lock2.isHeldByCurrentThread()) { lock2.unlock(); } System.out.println("线程 : " + Thread.currentThread().getName() + " 退出"); } } public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(new DeadLockWithReentrantLock2(true)); t1.setName("A"); Thread t2 = new Thread(new DeadLockWithReentrantLock2(false)); t2.setName("B"); t1.start(); t2.start(); TimeUnit.SECONDS.sleep(5); } } 复制代码
输出结果是:
线程 : B get lock2 线程 : A get lock1 线程 : B try to get lock1 线程 : A try to get lock2 线程 : B 退出 线程 : A already get lock2 线程 : A 退出 复制代码
ReentrantLock.tryLock()方法不带参数运行的情况下,当前线程会尝试获取锁,如果锁并未被其他线程占用,则申请锁会成功,并立即返回true。如果锁被其他线程占用,则当前线程不会进行等待,而是立即返回 false.这种模式不会引起线程等待,因此也不会产生死锁。
上面的例子中,线程A获得锁1,线程B获得锁2,线程B尝试获取锁1,发现锁1被占用,此时线程B不会等待,最终退出释放锁2,线程A就获得锁2继续执行任务而后退出。
其实,tryLock方法还可以接受两个参数,一个表示等待时长,另外一个表示计时单位。
public boolean tryLock(long timeout, TimeUnit unit) 复制代码
比如设置时长为5s,就表示线程在锁请求中,最多等待5s,如果超过5s没有获得锁,就会返回 false.如果成功获得锁,则返回true.
ReentrantLock中有两种锁:公平锁和非公平锁。
默认情况下,ReentrantLock获得的锁是非公平的。上面举的一些代码示例中获得锁都是非公平的。当然也可以设置公平锁,在ReentrantLock的构造方法里
public ReentrantLock(boolean fair) 复制代码
但是公平锁需要系统维护一个有序队列,因此公平锁的实现成本比较高,性能也比较低下。下面来举一个公平锁的代码示例:
public class FairLock implements Runnable{ private static ReentrantLock lock = new ReentrantLock(true); @Override public void run() { while (true) { try { lock.lock(); System.out.println(Thread.currentThread().getName() + " 获得锁 "); } finally { lock.unlock(); } } } public static void main(String[] args) { FairLock fairLock = new FairLock(); Thread A = new Thread(fairLock, "Thread-A"); Thread B = new Thread(fairLock, "Thread-B"); A.start(); B.start(); } } 复制代码
输出结果:
Thread-A 获得锁 Thread-B 获得锁 Thread-A 获得锁 Thread-B 获得锁 Thread-A 获得锁 Thread-B 获得锁 Thread-A 获得锁 Thread-B 获得锁 ...... 复制代码
从输出结果看,两个线程基本上是交替获得锁的,几乎不会发生同一线程连续多次获得锁的可能,从而保证了公平性。
再次总结下公平锁与非公平锁:
ReentrantLock的类层次结构如下图所示:
ReentrantLock实现了Lock接口,Lock接口定义了锁获取和释放的基本操作:
public interface Lock { //获取锁 void lock(); //可中断地获取锁,在锁的获取时可以中断当前线程 void lockInterruptibly() throws InterruptedException; //非阻塞的获取锁,调用该方法后立刻返回,如果能够获取返回true,否则返回false boolean tryLock(); //获取锁的超时设定 boolean tryLock(long time, TimeUnit unit) throws InterruptedException; //释放锁 void unlock(); //获取等待通知组件 Condition newCondition(); } 复制代码
从上图还可以看出,ReentrantLock内部有三个内部类:Sync、NonfairSync、FairSync。Sync是一个抽象类型,它继承了AbstractQueuedSynchronizer(简称AQS),而NonfairSync和FairSync是Sync的继承类,分别对应非公平锁和公平锁。AQS是队列同步器,是用来构建锁或者其他同步组件的基础框架,实现了很多与锁相关的功能。
AQS的主要使用方式是继承,子类通过继承AQS并实现它的抽象方法来管理同步状态。而Sync也是继承AQS,实现了它的tryRelease方法。
在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用AQS提供的三个方法:
getState(): 获取当前同步状态
setState(int newState): 设置当前同步状态
compareAndSetState(int expect,int update): 使用CAS设置当前状态,该方法能够保证状态设置的原子性。( CAS是一种用于在多线程环境下实现同步功能的机制,CAS操作包含三个操作数--内存位置、预期数值和新值。CAS的实现逻辑是将内存位置处的数值与预期数值相比较、若相等,则将内存位置处的值替换为新值,若不相等,则不做任何操作 )
同步器依赖内部的同步队列(一个FIFO双向队列,也叫做CLH同步队列)来完成同步状态的管理,当前线程获取获取同步状态失败时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
最后再简单介绍AQS中的几个方法以方便后面分析使用,(AQS是一门大学问,可以说在Java并发是非常核心的内容,本文只做简单介绍,对于AQS更详细内容请参阅相关书籍):
boolean tryAcquire(int arg): 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态
boolean tryRelease(int arg): 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态
boolean release(int arg): 释放同步状态,并将CLH同步队列中第一个节点包含的线程唤醒
void acquire(int arg): 获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用重写的tryAcquire(int arg)方法。
下面通过源码的形式,以非公平锁为例,简要分析lock方法与unlock的内部实现。
以下面这个demo的核心代码来分析:
private ReentrantLock lock = new ReentrantLock(); private int i = 0; @Override public void run() { //获取锁 lock.lock(); try { i++; } finally { //释放锁 lock.unlock(); } } 复制代码
lock.lock()
实际调用的是NonfairSync的lock方法,lock内部首先执行compareAndSetState 方法进行CAS操作,尝试抢占锁,如果成功,就调用 setExclusiveOwnerThread
方法把当前线程设置在这个锁上,表示抢占成功。 static final class NonfairSync extends Sync { ... final void lock() { //调用AQS的compareAndSetState方法进行CAS操作 //当同步状态为0时,获取锁,并设置状态为1 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } ... } 复制代码
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 复制代码
// 1 protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } // 2 final boolean nonfairTryAcquire(int acquires) { //当前线程 final Thread current = Thread.currentThread(); int c = getState(); //比较当前同步状态是否为0,如果是0,就去抢占锁 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果不为0,就比较当前线程与占用锁的线程是不是同一个线程,如果是,就去增加状态变量的值 //这就是可重入锁之所以能可重入,就是因为同一个线程可以反复使用它的锁 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } 复制代码
下图是NonfairSync的lock方法的一个调用时序图,与上面的分析相呼应:
unlock调用过程源代码如下:
//1 ReentrantLock中的unlock public void unlock() { sync.release(1); //调用Sync的release方法,实则调用AQS中的release } //2 AQS中的release public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } //3 在release中调用 Sync实现的tryRelease方法 protected final boolean tryRelease(int releases) { //getState()=1,前面获取锁时已经更新为1,而releases为1,=> c =0 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //去除锁的独占线程 if (c == 0) { free = true; setExclusiveOwnerThread(null); } //重新设置state = 0 setState(c); //释放锁成功返回true return free; } 复制代码