AbstractQueuedSynchronizer是基于一个FIFO双向链队列 ==CLH队列==,用于构建锁或者同步装置的类,也称为Java同步器,ReentrantLock的公平锁与非公平锁就是由该同步器构成,链队列结构图如下。
Node prev | 前驱节点 |
Node next | 后继节点 |
Node nextWaiter | 存储condition队列中的后继节点 |
Thread thread | 入队列时的当前线程 |
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
根据指定状态获取,能获取到 执行compareAndSetState方法设置新状态
public final void acquire(int arg) { //tryAcquire成功的话 acquire结束; if (!tryAcquire(arg) && //AcquireQueued方法进行阻塞等待,直到获取锁为止 //addWaiter把当前线程添加到队列尾部 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //tryAcquire失败并且acquiredQueued成功的话把当前线程中断 selfInterrupt(); }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); //获取状态 int c = getState(); //如果该锁未被任何线程占有,该锁能被当前线程获取 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //若被获取到 查看是否被当前线程 else if (current == getExclusiveOwnerThread()) { //是当前线程的话再次获取,计数+1 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
//node为null,排他方式阻塞等待 final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //p为当前节点的前一个节点 final Node p = node.predecessor(); //如果p为头结点并且获取成功就把当前线节点设置为头结点 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } //如果线程需要被阻塞 则interrputr为true if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //线程需要运行 if (ws == Node.SIGNAL) return true; //ws>0 处于CANCEL状态的线程 if (ws > 0) { //把这些线程从队列中清除 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //把等待的设置为运行状态 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
// park方法让其他线程处于等待状态 private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
public void unlock() { //释放一个锁 sync.release(1); }
public final boolean release(int arg) { //调用tryRelease尝试释放一个锁 if (tryRelease(arg)) { Node h = head; //释放成功后 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
private void unparkSuccessor(Node node) { //获取当前节点状态 int ws = node.waitStatus; //把状态设置为等待获取锁状态 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //如果没有下一个节点或者下一个节点状态为CANCEL,则把它们清除 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //否则调用LockSupport中unpark方法,唤醒后一个节点 if (s != null) LockSupport.unpark(s.thread); }
public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); }
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
public class MyAQSLock implements Lock { private final Sync sync; public MyAQSLock() { sync = new Sync(); } @Override public void lock() { sync.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1,unit.toNanos(time)); } @Override public void unlock() { sync.release(1); } @Override public Condition newCondition() { return sync.newCondition(); } /** * 把lock、unlock实现使用AQS构建为内部类 */ private class Sync extends AbstractQueuedSynchronizer{ Condition newCondition(){ return new ConditionObject(); } @Override protected boolean tryAcquire(int arg) { //第一个线程进来拿到锁 int state = getState(); //用于重入锁判断 Thread current = Thread.currentThread(); if(state==0){ if(compareAndSetState(0,arg)){ setExclusiveOwnerThread(Thread.currentThread()); return true; } } //重入锁判断 当前线程和独占锁线程相同,则再次获取 else if(current==getExclusiveOwnerThread()){ int next = state+arg; if(next<0){ throw new RuntimeException(); } setState(next); return true; } return false; } /** * 可重入释放锁 * @param arg * @return */ @Override protected boolean tryRelease(int arg) { if(Thread.currentThread()!=getExclusiveOwnerThread()){ throw new RuntimeException(); } int state = getState()-arg; if(state==0){ setExclusiveOwnerThread(null); setState(0); return true; } setState(0); return false; } } }
public class TestAQSLock2 { MyAQSLock myLock = new MyAQSLock(); private int value; private int value2; public int a(){ myLock.lock(); try { b(); return value++; }finally { myLock.unlock(); } } public void b(){ myLock.lock(); try { System.out.println(++value2); }finally { myLock.unlock(); } } public static void main(String[] args) { TestAQSLock2 myLock = new TestAQSLock2(); for(int i=0;i<50;i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + ":" + myLock.a()); }).start(); } } }