Monitors are a structured way of combining synchronization and data. A class encapsulates both data and methods in the same way that a monitor combines data, methods, and synchronization in a single modular package.
mutex.lock(); try { queue.enq(x); } finally { mutex.unlock(); } 复制代码
In concurrent programming, a monitor is a synchronization construct that allows threads to have both mutual exclusion and the ability to wait (block) for a certain condition to become true. Monitors also have a mechanism for signaling other threads that their condition has been met . A monitor consists of a mutex (lock) object and condition variables. A condition variable is basically a container of threads that are waiting for a certain condition. Monitors provide a mechanism for threads to temporarily give up exclusive access in order to wait for some condition to be met, before regaining exclusive access and resuming their task.
当线程请求锁时,可以自旋也可以阻塞,所以管程需要一种线程间消息传递的机制来唤醒线程,即 Condition
。条件变量通常对应一个线程等待队列,条件变量改变后可以发送一个信号(比如 pthread_cond_signal
),唤醒在条件上等待的线程(一个或多个节点出队)。大家都知道,Java内置了管程,即 synchronized
机制。但这个内置的管程在实现上力求简单,因此只有一个隐式的条件变量。这就是尽管它也叫管程,我们却从来无法从 wait
得到 Condition
的原因。Java的 Lock
接口提供了对 Condition
的抽象。AQS在阻塞时会调用 LockSupport.park
,而它在Linux上的实现还是基于 mutex
和 condition
所以究竟如何理解管程这个术语呢?我觉得管程可以认为就是对数据结构封装了同步功能。比如,我们可以把 StringBuffer
的实现方法看作是一个管程,因为它的所有方法都被 synchronized
Condition condition = mutex.newCondition(); ... mutex.lock() try { while (!property) { condition.await(); } catch (InterruptedException e) { ... // application-dependent response } ... } 复制代码
使用条件变量可以实现一个典型的生产者消费者队列,具体就不说了,可以参考 LinkedBlockingQueue
的源代码。条件变量可能会出现 唤醒丢失(Lost-Wakeup)问题 ,这个我在前面的队列篇已经详细的描述过了。
可以将每个对象作为一个管程。本质上, synchronized
修饰的方式可以分为两种,实例对象和静态对象。比如,修饰静态方法其实属于后者,因为它锁定了类。我们都知道, synchronized
这里可以用一个比较方便的工具JOL(Java Object Layout),可以打印对象的内存布局。我们写一个非常简单的demo:
public class Demo { public static void main(String[] args) throws InterruptedException { Object b = new Object(); System.out.println(b); System.out.println(ClassLayout.parseInstance(b).toPrintable()); } } 复制代码
java.lang.Object@74a14482 java.lang.Object object internals: OFFSET SIZE TYPE DESCRIPTION VALUE 0 4 (object header) 01 82 44 a1 (00000001 10000010 01000100 10100001) (-1589345791) 4 4 (object header) 74 00 00 00 (01110100 00000000 00000000 00000000) (116) 8 4 (object header) e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243) 12 4 (loss due to the next object alignment) Instance size: 16 bytes Space losses: 0 bytes internal + 4 bytes external = 4 bytes total 复制代码
加入 synchronized
public class Demo { public static void main(String[] args) throws InterruptedException { Object b = new Object(); System.out.println(b); ExecutorService service = Executors.newCachedThreadPool(); service.invokeAll(Collections.nCopies(2, () -> { synchronized (b) { System.out.println(ClassLayout.parseInstance(b).toPrintable()); return 0; } })); Thread.sleep(1000); System.out.println(ClassLayout.parseInstance(b).toPrintable()); service.shutdown(); } } java.lang.Object@74a14482 java.lang.Object object internals: OFFSET SIZE TYPE DESCRIPTION VALUE 0 4 (object header) da 4f 23 26 (11011010 01001111 00100011 00100110) (639848410) 4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0) 8 4 (object header) e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243) 12 4 (loss due to the next object alignment) Instance size: 16 bytes ... java.lang.Object object internals: OFFSET SIZE TYPE DESCRIPTION VALUE 0 4 (object header) 01 82 44 a1 (00000001 10000010 01000100 10100001) (-1589345791) 4 4 (object header) 74 00 00 00 (01110100 00000000 00000000 00000000) (116) 8 4 (object header) e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243) 12 4 (loss due to the next object alignment) Instance size: 16 bytes Space losses: 0 bytes internal + 4 bytes external = 4 bytes total 复制代码
这里有一个小细节是我给主线程加了一个延时,否则主线程将会打印和工作线程相同的对象头。可以看到,这里 synchronized
|----------------------------------------------------------------------------------------|--------------------| | Object Header (64 bits) | State | |-------------------------------------------------------|--------------------------------|--------------------| | Mark Word (32 bits) | Klass Word (32 bits) | | |-------------------------------------------------------|--------------------------------|--------------------| | identity_hashcode:25 | age:4 | biased_lock:1 | lock:2 | OOP to metadata object | Normal | |-------------------------------------------------------|--------------------------------|--------------------| | thread:23 | epoch:2 | age:4 | biased_lock:1 | lock:2 | OOP to metadata object | Biased | |-------------------------------------------------------|--------------------------------|--------------------| | ptr_to_lock_record:30 | lock:2 | OOP to metadata object | Lightweight Locked | |-------------------------------------------------------|--------------------------------|--------------------| | ptr_to_heavyweight_monitor:30 | lock:2 | OOP to metadata object | Heavyweight Locked | |-------------------------------------------------------|--------------------------------|--------------------| | | lock:2 | OOP to metadata object | Marked for GC | |-------------------------------------------------------|--------------------------------|--------------------| 复制代码
重量级锁对应系统的管程,它的后30位是指向管程的指针。你可以在字节码中看到, synchronized
会产生 moniterenter
和 moniterexit
再看一下 synchronized
public class Demo { private static Integer v = 1; public synchronized static void main(String[] args) throws InterruptedException { System.out.println(Integer.toHexString(Demo.class.hashCode())); System.out.println(ClassLayout.parseInstance(Demo.class).toPrintable()); } } 4554617c java.lang.Class object internals: OFFSET SIZE TYPE DESCRIPTION VALUE 0 4 (object header) 58 f7 f1 02 (01011000 11110111 11110001 00000010) (49411928) 4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0) 8 4 (object header) df 03 00 f8 (11011111 00000011 00000000 11111000) (-134216737) ... 复制代码
这里注意不要调用 parseClass
阻塞(block)这个术语不论是在中文还是英文都被重载了太多次了。在JVM里经常看到状态显示的是 Monitor
而不是 Blocked
。Java中 Thread.State
阻塞和等待的区别是,阻塞状态可以自动获得监视器锁并转化状态,而等待必须手动的发送信号来打破。在Java中,后进入 synchronized
代码块会产生阻塞;而调用 notify
其实这两个状态的细分,还意味着 synchronized
的管程至少实现了等待集和准入集两个结构,分别对应 WAITING
即读阻塞写,写阻塞读,读不阻塞读。Java中关于读写锁的标准接口是 ReadWriteLock
,默认实现 ReentrantReadWriteLock
可以使用管程构造一个简单的读写锁。如果没有读者不互斥,那么读者写者就是一般互斥问题。读者写者之间必须要进行同步,因此读锁在一个是否存在写者的条件上阻塞,写锁在一个读者是否为0的条件上阻塞,写锁需要一个全局计数器来确认所有读者都已经退出。这两个条件都是在同一个 Condition
protected class ReadLock implements Lock { @Override public void lock() { lock.lock(); try { while (writer) { condition.await(); } readers++; } catch (InterruptedException e) { // empty } finally { lock.unlock(); } } @Override public void unlock() { lock.lock(); try { readers--; if (readers == 0) { condition.signalAll(); } } finally { lock.unlock(); } } } 复制代码
protected class WriteLock implements Lock { @Override public void lock() { lock.lock(); try { while (readers > 0 || writer) { condition.await(); } writer = true; } catch (InterruptedException e) { // empty } finally { lock.unlock(); } } @Override public void unlock() { writer = false; condition.signalAll(); } } 复制代码
protected class WriteLock implements Lock { @Override public void lock() { lock.lock(); try { while (writer) { condition.await(); } writer = true; while (readAcquires != readReleases) { condition.await(); } } catch (InterruptedException e) { // empty } finally { lock.unlock(); } } @Override public void unlock() { writer = false; condition.signalAll(); } } 复制代码
AQS实现 ReentrantReadWriteLock
/** * Nonfair version of Sync */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L; final boolean writerShouldBlock() { return false; // writers can always barge } final boolean readerShouldBlock() { /* As a heuristic to avoid indefinite writer starvation, * block if the thread that momentarily appears to be head * of queue, if one exists, is a waiting writer. This is * only a probabilistic effect since a new reader will not * block if there is a waiting writer behind other enabled * readers that have not yet drained from the queue. */ return apparentlyFirstQueuedIsExclusive(); } } /** * Fair version of Sync */ static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L; final boolean writerShouldBlock() { return hasQueuedPredecessors(); } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } } 复制代码
在AQS中,我们知道 hasQueuedPredecessors
可以判断是否为队列首节点,这是一个子类实现AQS时公平性的判断。 ReentrantReadWriteLock
认为写者优先反而是不公平的,但它设置了一个启发式的避免饥饿的方法,调用AQS的 apparentlyFirstQueuedIsExclusive
而核心代码都在这两种模式的基类 Sync
里。先看一下 tryAcquire
protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. If read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); // Reentrant acquire setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; } 复制代码
在 WriteLock
把写者和读者计数都放在了 state
里,通过位运算分离。写者占据低16位。但读者有多个,因此不光有 state
的状态变化,还有一个 readHolds
的是 ThreadLocal
变量 作为读者重入的计数。另外,读锁还使用 cachedHoldCounter
,这是对 readHolds
的一个缓存,因为取 ThreadLocal
是有开销的(虽然类似于从哈希表查值)。你可以看到每次得到锁的线程都会把这个值设置为自己的 readHolds
protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); } 复制代码
为什么这里不需要读者为0的判断呢?实际上是有的,可以看到写锁一开始判断 c != 0
,因为读写者计数都在 state
第一种情况是可以的,另外如果再释放写锁,我们可以称之为锁降级(Lock downgrading,这不是我取的,源码注释里有)。从前面读锁的实现来看,它允许当前线程是写锁持有者的条件下持有读锁,但写锁的条件是 w == 0 || current != getExclusiveOwnerThread()
public class Semaphore { final int capacity; int state; Lock lock; Condition condition; public Semaphore(int c) { capacity = c; state = 0; lock = new ReentrantLock(); condition = lock.newCondition(); } public void acquire() { lock.lock(); try { while (state == capacity) { condition.await(); } state++; } finally { lock.unlock(); } } public void release() { lock.lock(); try { state--; condition.signalAll(); } finally { lock.unlock(); } } } 复制代码
AQS里的 Semaphore