锁是用来控制多个线程访问共享资源的方式, java
中可以使用 synchronized
和 Lock
实现锁的功能
synchronized
是java中的关键字,隐藏获取和释放锁的过程, Lock
是java中的接口,需要主动的获取锁和释放锁, synchronized
是排他锁,而 Lock
支持可中断获取锁,超时获取锁
Lock
提供的接口
public interface Lock { /** * 获取锁,调用该方法后当前线程获取锁,获取到锁之后从该方法返回 */ void lock(); /** * 可中断的获取锁,在获取锁的过程中可以中断当前线程 */ void lockInterruptibly() throws InterruptedException; /** * 尝试非阻塞的获取锁,调用方法后立即返回,获取到锁则返回true,否则返回false */ boolean tryLock(); /** * 超时获取锁,在超时时间内获取到锁,在超时时间被中断,超时时间内为获取到锁,三种情况下会从该方法返回 */ boolean tryLock(long time, TimeUnit unit) throws InterruptedException; /** * 释放锁 */ void unlock(); /** * 获取等待通知组件,只有当前线程获取到锁之后才可以调用该组件的wait()方法,释放锁 */ Condition newCondition(); } 复制代码
队列同步器 AbstractQueuedSynchronizer
( AQS
简称同步器)是用来构建锁或者其他同步组件的基础框架
java
中锁的实现基本都是通过聚合了一个同步器的子类完成线程访问控制的,同步器是实现锁的关键,可以这么理解,锁面向编程者,隐藏了实现细节,同步器面向锁的实现,简化了锁的实现方式,屏蔽了同步状态管理,线程排队,等待与唤醒等底层操作,通过 AbstractQueuedSynchronizer
我们可以很方便的实现一个锁
同步器的设计基于模板方法模式,提供的模板方法主要包括:独占锁获取锁与释放同步状态,共享式获取与释放同步状态,获取同步队列中等待线程情况
想要实现一个独占式锁需要重写以下方法
方法名 | 描述 |
---|---|
void acquire(int arg) | 独占式获取同步状态,同一时刻只能有一个线程可以获取到同步状态,获取失败进入同步队列等待 |
void acquireInterruptibly(int arg) | 独占式获取同步状态,响应中断操作,被中断时会抛异常并返回 |
boolean tryAcquireNanos(int arg, long nanosTimeout) | 独占式获取同步状态,响应中断操作,并且增加了超时限制,如果规定时间没有获得同步状态就返回false,否则返回true |
boolean release(int arg) | 独占式释放同步状态,在释放同步状态之后,将同步队列中的第一个节点包含的线程唤醒 |
想要实现一个共享锁需要重写以下方法
方法名 | 描述 |
---|---|
void acquireShared(int arg) | 共享式获取同步状态,同一时刻可以有多个线程获取到同步状态 |
void acquireSharedInterruptibly(int arg) | 共享式获取同步状态,响应中断操作 |
boolean tryAcquireSharedNanos(int arg, long nanosTimeout) | 共享式获取同步状态,响应中断操作,并且增加了超时限制,如果规定时间没有获得同步状态就返回false,否则返回true |
boolean releaseShared(int arg) | 共享式释放同步状态 |
方法名 | 描述 |
---|---|
Collection getQueuedThreads() | 获取同步队列上的线程集合 |
在这些模板方法中,多次提到了同步队列,我们看一下 AQS
是如何实现同步队列的
首先看下 AbstractQueuedSynchronizer
的类图
Node
类是 AbstractQueuedSynchronizer
类的内部类,同步器依靠内部的一个同步队列来完成同步状态的管理,当前线程获取同步状态失败的时候,同步器会将当前线程及等待信息构造成一个 Node
节点加入到同步队列中
属性 | 描述 |
---|---|
waitStatus | 该线程等待状态,包含如下: CANCELLED 值为1,表示需要从同步队列中取消等待 SIGNAL值为-1,表示后继节点处于等待状态,如果当前节点释放同步状态会通知后继节点,使得后继节点的线程能够运行 CONDITION值为-2,表示节点在等待队列中 PROPAGATE值为-3,表示下一次共享式同步状态获取将会无条件传播下去 INITIAL值为0,表示初始状态 |
prev:Node | 前驱节点 |
next:Node | 后继节点 |
thread:Thread | 当前线程 |
nextWaiter:Node | 下一个等待节点 |
可以看到 AQS中
的节点信息包含前驱和后继节点,所以我们知道了AQS的同步队列是双向链表结构的
AQS
中的几个重要属性
属性 | 描述 |
---|---|
state:int | 同步状态:如果等于0,锁属于空闲状态,如果等于1,标识锁被占用,如果大于1,则表示锁被当前持有的线程多次加锁,即重入状态 |
head:Node | 队列的头节点 |
tail:Node | 队列的尾节点 |
unsafe:Unsafe | AQS中的cas算法实现 |
AQS
中提供了三个方法对同步状态进行操作
getState()
获取到同步状态 setState(int newState)
设置同步状态 compareAndSetState(int expect, int update)
使用 CAS
设置当前状态,该方法能够保证设置的原子性 AQS
的基本结构如下图所示
在同步器中 head
和 tail
的节点的引用指向同步队列的头,尾节点,这样在后面操作节点入列和出列的时候只需要操作同步器中的 head
和 tail
节点就可以
ReentrantLock
重入锁,内部AQS的实现是基于独占式获取/释放同步状态的。我们学习一下 ReentrantLock
的实现原理来进一步加深对 AQS
的理解
重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁阻塞,它表示一个线程可以对资源重复加锁,同时支持获取锁时使用公平锁还是非公平锁
例:
/** * @author: chenmingyu * @date: 2019/4/12 15:09 * @description: ReentrantLock */ public class ReentrantLockTest { private static Lock LOCK = new ReentrantLock(); public static void main(String[] args) { Runnable r1 = new TestThread(); new Thread(r1,"r1").start(); Runnable r2 = new TestThread(); new Thread(r2,"r2").start(); } public static class TestThread implements Runnable{ @Override public void run() { LOCK.lock(); try { System.out.println(Thread.currentThread().getName()+":获取到锁 "+LocalTime.now()); TimeUnit.SECONDS.sleep(3L); }catch (Exception e){ e.printStackTrace(); }finally { LOCK.unlock(); } } } } 复制代码
输出
只有在 r1
线程释放锁之后 r2
线程才获取到锁去执行代码打印数据
创建的实例,默认使用非公平锁,如果需要公平锁,需要调用有参的构造函数
/** * 非公平锁 * 创建ReentrantLock实例,默认使用非公平锁 */ public ReentrantLock() { sync = new NonfairSync(); } /** * 公平锁 * 创建ReentrantLock实例,fair为true使用公平锁 */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } 复制代码
NonfairSync
与 FairSync
都是 ReentrantLock
类的内部类,继承自 ReentrantLock
类的内部类 Sync
, Sync
类继承了 AbstractQueuedSynchronizer
类图如下
非公平锁的实现
/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } 复制代码
非公平锁会在调用 lock()
方法的时候首先调用 compareAndSetState(0, 1)
方法尝试获取锁,如果没有获取到锁则调用 acquire(1)
方法
compareAndSetState(0, 1)
方法是一个 CAS
操作,如过设置成功,则为获取到同步状态,并调用 setExclusiveOwnerThread(Thread.currentThread());
方法将当前线程设置为独占模式同步状态的所有者
我们所说的获取同步状态其实指的就是获取锁的状态,获取同步状态成功则加锁成功
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } 复制代码
acquire(1)
方法是提供的模板方法,调用 tryAcquire(arg)
和 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 复制代码
tryAcquire(arg)
方法调用的是子类的实现, NonfairSync
的 tryAcquire
方法
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } 复制代码
nonfairTryAcquire(acquires)
方法
/** * 非公平尝试获取同步状态 */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { /** * 首先根据`getState()`方法获取同步状态,如果等于0尝试调用`compareAndSetState(0, * acquires)`方法获取同步状态,如果设置成功则获取同步状态成功,设置当前线程为独占模式同步状态的 * 所有者 */ int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } 复制代码
getState()
方法获取同步状态,如果等于0尝试调用 compareAndSetState(0, acquires)
方法获取同步状态,如果设置成功则获取同步状态成功,设置当前线程为独占模式同步状态的所有者 state
+1,表示当前线程多次加锁 如果 tryAcquire(arg)
返回false,表示没有获取到同步状态,即没有拿到锁,所以需要调用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
方法将当前线程加入到同步队列中,并且以死循环的方式获取同步状态,如果获取不到则阻塞节点中的线程,而被阻塞的线程只能通过前驱节点的出队,或者阻塞线程被中断来实现唤醒
addWaiter(Node.EXCLUSIVE)
方法的作用就是构造同步队列的节点信息,然后加入到同步队列尾部
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } 复制代码
首先调用 Node
类的构造方法创建一个实例, tail
是 AQS
中队列的尾节点
如果 tail
节点不为空,将实例的前驱节点置为 tail
指向的节点,然后调用 compareAndSetTail(pred, node)
方法, compareAndSetTail(pred, node)
方法调用 unsafe.compareAndSwapObject(this, tailOffset, expect, update)
,此方法是一个 CAS
操作,不可中断,用来保证节点能够被线程安全的添加,设置成功后,将节点 tail
的后继节点指向当前实例,以此来实现将当前实例加入到同步队列尾部
如果 tail
节点等于空或者 compareAndSetTail(pred, node)
设置失败,则会调用 enq(node)
方法
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } 复制代码
在这个方法中利用 for
循环构造了一个死循环,如果当前 AQS
的 tail
节点为空,则证明当前同步队列中没有等待的线程,也就是没有节点,调用 compareAndSetHead(new Node())
方法构造了一个头节点,然后循环调用 compareAndSetTail(t, node)
将当前实例加入到队列的尾部,如果失败就一直调用,直到成功为止
在调用 addWaiter(Node mode)
方法后会调用 acquireQueued(final Node node, int arg)
方法,作用是在每个节点进入到同步队列中后就进入了一个自旋的状态,通过校验自己的前驱节点是否是头节点,并且是否获取到同步状态为条件进行判断,如果满足条件则从自旋中退出,负责一直自旋
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 复制代码
方法内也是一个 for
的死循环,通过 node.predecessor()
方法获取传入的 Node
实例的前驱节点并与 AQS
的 head
节点进行比较,如果相等,则尝试获取同步状态获取锁,如果获取成功就调用 setHead(node);
方法将当前 Node
实例节点设置为 head
节点,将原来 head
节点的后继节点置为null,有助于GC回收
setHead(node);
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } 复制代码
如果传入的 Node
实例的前驱节点与 AQS
的 head
节点不相等或者获取同步状态失败,则调用 shouldParkAfterFailedAcquire(p, node)
和 parkAndCheckInterrupt()
方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } 复制代码
通过 CAS
操作,设置节点的前驱节点等待状态为 Node.SIGNAL
,如果设置失败,返回false,因为外层是死循环,会重复当前方法直到设置成功
parkAndCheckInterrupt()
方法调用 LookSupport.park()
阻塞线程,然后清除掉中断标识
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } 复制代码
从 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
方法返回后,调用 selfInterrupt()
,将线程中断
在了解 acquire(1);
方法的作用之后,在理解公平锁的实现就容易了
final void lock() { acquire(1); } 复制代码
对比非公平锁的实现少了一步上来就获取同步状态的操作,其余操作跟非公平锁的实现一样
ReentrantLock
的 unlock()
方法实际调用的 AQS
的 release(int arg)
方法
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } 复制代码
首先调用 tryRelease(arg)
释放同步状态
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } 复制代码
获取同步状态,并减1,如果此时c==0则释放锁,将当前独占式锁的拥有线程置为null,然后设置 state
为0
然后调用 unparkSuccessor(Node node)
方法唤醒后继节点的线程
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) /* * 唤醒后继节点的线程 */ LockSupport.unpark(s.thread); } 复制代码
总结一下独占式获取锁和释放锁的过程:
AQS
维护的同步队列的尾部,并且开始自旋,跳出自旋的条件就是前驱节点为 AQS
的头节点并且获取到了同步状态,此时将节点移除同步队列 在了解了 ReentrantLock
的实现原理之后,我们就可以仿照着自己去实现一个自定义独占式锁了
LockTest
类,实现 Lock
接口,重写必要的接口 LockTest
类里创建一个内部类 Sync
,继承 AQS
,因为要实现独占式锁,所以重写 tryAcquire(int arg)
和 tryRelease(int arg)
方法就可以了 LockTest
代码
/** * @author: chenmingyu * @date: 2019/4/11 15:11 * @description: 自定义独占式锁 */ public class LockTest implements Lock{ private final Sync SYNC = new Sync(); public static class Sync extends AbstractQueuedSynchronizer{ @Override protected boolean tryAcquire(int arg) { if(compareAndSetState(0,1)){ setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int arg) { if(getState()<1){ throw new IllegalMonitorStateException("释放同步状态不可小于1"); } int c = getState() - arg; if (c == 0) { setExclusiveOwnerThread(null); } setState(c); return true; } } @Override public void lock() { SYNC.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { SYNC.acquireInterruptibly(1); } @Override public boolean tryLock() { return SYNC.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void unlock() { SYNC.release(1); } @Override public Condition newCondition() { return null; } } 复制代码
/** * @author: chenmingyu * @date: 2019/4/12 15:09 * @description: LockTest */ public class ReentrantLockTest { private static Lock LOCKTEST = new LockTest(); public static void main(String[] args) { Runnable r1 = new TestThread(); new Thread(r1,"LockTest 1").start(); Runnable r2 = new TestThread(); new Thread(r2,"LockTest 2").start(); } public static class TestThread implements Runnable{ @Override public void run() { LOCKTEST.lock(); try { System.out.println(Thread.currentThread().getName()+":获取到锁 "+LocalTime.now()); TimeUnit.SECONDS.sleep(3L); }catch (Exception e){ e.printStackTrace(); }finally { LOCKTEST.unlock(); } } } } 复制代码
输出
ReentrantReadWriteLock
是读写锁的实现,实现 ReadWriteLock
接口
ReentrantReadWriteLock
内部同样维护这一个 Sync
内部类,实现了 AQS
,通过重写对应方法实现读锁和写锁
现在已经知道了同步状态是由 AQS
维护的一个整型变量 state
,独占式锁获取到锁时会对其进行加1,支持重入,而读写锁 ReentrantReadWriteLock
在设计的时候也是通过一个整型变量进行读锁的同步状态和写锁的同步状态维护,在一个变量上维护两种状态就需要对整型变量进行按位分割,一个int类型的变量包含4个字符,一个字符8个bit,就是32bit,在 ReentrantReadWriteLock
中,高16位表示读,低16位表示写
读写锁中的写锁,支持重进入的排它锁
重写 ReentrantReadWriteLock
的内部类 Sync
中的 tryAcquire(int acquires)
方法
protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); /* * 1,如果同步状态c不等于0,代表着有读锁或者写锁 */ if (c != 0) { // 2,如果c不等于0,w写锁的同步状态为0,切当前线程不是持有锁的线程,返回false if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; } 复制代码
如果存在读锁,写锁不能被获取,必须要等到其他读线程释放读锁,才可以获取到写锁,这么做的原因是要确保写锁做的操作对读锁可见,如果写锁被获取,则其他读写线程的后续访问均会被阻塞
读写锁中的读锁,支持重进入的共享锁
写锁的释放与独占式锁释放过程相似,每次都是减少写锁的同步状态,直到为0时,表示写锁已被释放
读锁是一个支持重入的共享锁,重写 ReentrantReadWriteLock
的内部类 Sync
中的 tryAcquireShared(int unused)
方法
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0; } } 复制代码
如果其他线程获取了写锁,则当前线程获取读锁状态失败进入等待状态,如果当前线程获取了写锁或者写锁未被获取,则当前线程获取同步状态成功,获取到读锁
释放读锁的时候就是每次释放都会对同步状态进行-1,直到为0时,表示读锁已被释放
锁降级是指将写锁降级为读锁,这个过程就是当前线程已经获取到写锁的时候,在获取到读锁,随后释放写锁的过程,这么做的目的为的就是保证数据的可见性
当前线程A获取到写锁后,对数据进行修改,之后在获取到读锁,然后释放写锁,完成锁降级,这时候线程A还没释放读锁,别的线程就无法获取到写锁,就无法对数进行修改,以此来保证数据的可见性
java并发编程 | 线程详解