java.util.concurrent是在并发编程中比较常用的工具类,里面包含很多用来在并发场景中使用的组件。比如 线程池、阻塞队列、计时器、同步器、并发集合 等等。
Lock最为重要的特性就是解决并发程序的安全性问题。在JUC大部分组件都使用了Lock,所以了解和使用Lock显得尤为重要。Lock在JUC中本质上是以一个接口的形势表现的。
我们可以从上面的图中可以看出关于锁有很多不同的实现类。下面来简单介绍一翻吧。
ReentrantLock实现了Lock接口,表示重入锁。是线程在获得锁之后,再次获取锁不需要阻塞,而是直接关联一次计数器增加重入次数。 后面我们重点分析ReentrantLock的原理。
ReentrantReadWriteLock实现了ReadWriteLock接口,其中有两把锁,一个 ReadLock ,一个 WriteLock ,它们分别实现了 Lock 接口。 适合读多写少的场景。 基本原则:
StampedLock是JDK1.8引进的新的锁机制,它是读写锁的一个改进版。一种乐观的读策略,使得乐观锁完全不阻塞写线程。
说到重入锁ReentrantLock,就是再次获取锁的同时,只是对重入次数进行计数,而不需要阻塞来获取锁。先来看一个案例代码,这样容易理解重入锁的概念。
我们在测试代码中调用test()方法获得了当前对象的锁,然后在这个方法中去调用test1()方法,test2()中也存在一个实例锁,这个时候当前线程无法获取test1()中的对象锁而阻塞, 这样就会产生死锁。R eentrantLock重入锁的目的就是为了避免线程产生死锁。
public class ReentrantLockDemo { public synchronized void test() { System.out.println("Begin test..."); test1(); } public void test1() { System.out.println("Begin test1..."); synchronized (this) { } } public static void main(String[] args) { ReentrantLockDemo reentrantLockDemo = new ReentrantLockDemo(); new Thread(reentrantLockDemo::test).start(); } } 复制代码
public class ReentrantLockDemo { private static int count = 0; static Lock lock = new ReentrantLock(true); public static void incr() { //线程A获取锁,计数state = 1 lock.lock(); try { //退出线程 中断的过程往下传递. true // sleep/ join/ wait //while() // ... Thread.sleep(1); count++; // decr(); }catch (InterruptedException e) { e.printStackTrace(); } finally { //线程A释放锁,state=1-1=0 lock.unlock(); } } public static void decr() { //线程A再次获取锁,计数加1,state = 2 lock.lock(); try{ count--; }finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(()->{ ReentrantLockDemo.incr(); }); t1.start(); t1.interrupt();//线程中断 for(int i = 0 ; i < 1000; i++) { new Thread(()->{ ReentrantLockDemo.incr(); }).start(); } Thread.sleep(3000); System.out.println("result = " + count); } } 复制代码
读写锁维护了一个读锁,一个写锁。一般情况下读写锁比排它锁的性能要好一些,因为大多数的场景是读多写少的。
public class ReentrantReadWriteLockDemo { static Map<String, Object> map = new HashMap<>(); static ReentrantReadWriteLock rrwl = new ReentrantReadWriteLock(); static Lock read = rrwl.readLock(); static Lock write = rrwl.writeLock(); public static Object get(String key) { System.out.println("Begin reading data..."); read.lock(); try{ return map.get(key); }finally { read.unlock(); } } public static Object put(String key, Object obj) { System.out.println("Begin writing data..."); write.lock(); try{ return map.put(key, obj); }finally { write.unlock(); } } } 复制代码
我们在Synchronized中分析了 偏向锁、轻量级锁、重量级锁 。它们是基于 乐观锁 以及 自旋锁 来优化synchronized加锁的开销,在 重量级锁阶段 是通过线程的阻塞以及唤醒来达到线程竞争和同步的目的。
那么在ReentrantLock也一定存在这样的问题,那么它是怎么去解决的呢?这里我们需要引入AQS(AbstractQueueSynchronizer)。
在Lock中,AQS是一个同步队列,它是一个同步工具,也是Lock用来实现线程同步的核心组件。
AQS内部维护的是一个FIFO的双向链表,这种数据结构的特点就是有 两个指针 ,分别指向直接的后继节点next和直接的前驱节点prev。当线程抢占锁失败后,会封装成一个 Node 直接放入到AQS阻塞队列中。
先上AQS中的源码:
static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). */ volatile int waitStatus; /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ volatile Node prev;//前驱节点 /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ volatile Node next;//后继节点 /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. */ volatile Thread thread;//当前线程 /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter;//存储在condition队列中的后继节点 /** * Returns true if node is waiting in shared mode. */ //是否为共享锁 final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } //将线程组装成一个Node,添加到队列中 Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } //在condition队列中进行使用 Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } 复制代码
源码如下:
public void lock() { sync.lock(); } 复制代码
根据源码可以看到具体的实现,分别是FairSync(公平)和NonFairSync(非公平)两个类。
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { //对于非公平锁,一开始就CAS抢占一下 //如果CAS成功了,就表示获得了锁 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else//如果CAS失败了,调用acquire()方法走竞争逻辑 acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } 复制代码
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } 复制代码
CAS 就是 Unsafe 类中提供的一个原子操作。
state是AQS中的一个属性,对于重入锁(ReentrantLock)而言,它表示一个同步状态。有两层含义:
接下来我们来看unsafe.cpp文件中最终执行的源码方法吧。
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) UnsafeWrapper("Unsafe_CompareAndSwapInt"); //将Java对象解析成JVM的oop oop p = JNIHandles::resolve(obj); //根据对象p和地址偏移量找到地址 jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); //基于 cas 比较并替换, x 表示需要更新的值,addr 表示 state 在内存中的地址,e 表示预期值 return (jint)(Atomic::cmpxchg(x, addr, e)) == e; UNSAFE_END 复制代码
属于sun.misc包,不属于Java标准。但是很多 Java 的基础类库,包 括一些被广泛使用的高性能开发库都是基于 Unsafe 类开发的,比如 Netty、 Hadoop、Kafka 等;
Unsafe 可认为是 Java 中留下的后门,提供了一些低层次操作,如直接内存访问、 线程的挂起和恢复、CAS、线程同步、内存屏障等。
从下面源码分析来看,如果CAS未能操作成功,说明state已经不等于0了,此时需要执行acquire(1)方法。
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } 复制代码
接着看acquire(1)方法的源码吧。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 复制代码
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) {//自旋方式获得锁 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } 复制代码
这个方法的作用是尝试获取锁,如果成功返回 true,不成功返回 false。
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } 复制代码
下面来看此方法的具体实现:
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } 复制代码
当tryAcquire()获取锁失败时,则会调用此方法来将当前线程封装成Node对象加入到AQS队列尾部。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure //tail表示AQS队列的尾部,默认为null Node pred = tail; if (pred != null) { //当前线程的prev执行tail node.prev = pred; //通过CAS把node加入到队列中,并设置为tail if (compareAndSetTail(pred, node)) { //设置成功后,把tail节点的next指向当前node pred.next = node; return node; } } //tail为null时,把node加入到同步队列 enq(node); return node; } 复制代码
enq(node)方法通过自旋的方式,把当前节点node加入到同步队列中去,下面看一下enq源码吧:
private Node enq(final Node node) { for (;;) { //将新的节点prev指向tail Node t = tail; if (t == null) { // Must initialize //通过CAS将tail设置为新的节点 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { //将原来的tail的next节点指向新的节点 t.next = node; return t; } } } } 复制代码
通过 addWaiter 方法把线程添加到链表后,会接着把 Node 作为参数传递给 acquireQueued 方法,去竞争锁。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { //线程中断标记 boolean interrupted = false; for (;;) { //获得当前节点的prev节点 final Node p = node.predecessor(); //如果是head节点,说明有资格去抢占锁 if (p == head && tryAcquire(arg)) { //获取锁成功,线程A已经释放了锁,然后设置head为线程B获得执行权限 setHead(node); //把原来的head节点从链表中移除,弱引用 p.next = null; // help GC failed = false; return interrupted; } //线程A可能还没释放锁,使得线程B在执行tryAcquire时返回false if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //当前线程在等待过程中有没有中断 interrupted = true; } } finally { //取消锁的操作 if (failed) cancelAcquire(node); } } 复制代码
线程A的锁可能还没释放,那么此时线程B来抢占锁肯定失败,就会调用此方法。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //前置节点 int ws = pred.waitStatus; //如果前置节点为 SIGNAL,意味着只需要等待其他前置节点的线程被释放 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ //返回true,可以放心挂起了 return true; //ws 大于 0,意味着 prev 节点取消了排队,直接移除这个节点就行 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { //相当于: pred=pred.prev;node.prev=pred; node.prev = pred = pred.prev; } while (pred.waitStatus > 0);//这里采用循环,从双向列表中移除 CANCELLED 的节点 pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //利用 cas 设置 prev 节点的状态为 SIGNAL(-1) compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } 复制代码
Node的状态有5种,默认状态是0,以下是其它四种状态:
//在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该 Node 的结点, 其结点的 waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化 static final int CANCELLED = 1; //只要前置节点释放锁,就会通知标识为 SIGNAL 状态的后续节点的线程 static final int SIGNAL = -1; //表示该线程在condition队列中阻塞 static final int CONDITION = -2; //共享模式下,PROPAGATE 状态的线程处于可运行状态 static final int PROPAGATE = -3; 复制代码
使用LockSupport.park(this)挂起当前线程为WAITING状态
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } 复制代码
Thread.interrupted,返回当前线程是否被其他线程触发过中断请求,也就是 thread.interrupt(); 如果有触发过中断请求,那么这个方法会返回当前的中断标识 true,并且对中断标识进行复位标识已经响应过了中断请求。如果返回 true,意味 着在 acquire 方法中会执行 selfInterrupt()。
当前线程在acquireQueued中被中断过,则需要产生一个中断请求,原因是线程在调用acquireQueued方法的时候不会响应中断请求。
static void selfInterrupt() { Thread.currentThread().interrupt(); } 复制代码
从Java6开始引用的一个提供了基本的线程同步原语的类,LockSupport本质还是调用了Unsafe中的方法:
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread)) UnsafeWrapper("Unsafe_Unpark"); Parker* p = NULL; if (jthread != NULL) { oop java_thread = JNIHandles::resolve_non_null(jthread); if (java_thread != NULL) { jlong lp = java_lang_Thread::park_event(java_thread); if (lp != 0) { // This cast is OK even though the jlong might have been read // non-atomically on 32bit systems, since there, one word will // always be zero anyway and the value set is always the same p = (Parker*)addr_from_java(lp); } else { // Grab lock if apparently null or using older version of library MutexLocker mu(Threads_lock); java_thread = JNIHandles::resolve_non_null(jthread); if (java_thread != NULL) { JavaThread* thr = java_lang_Thread::thread(java_thread); if (thr != NULL) { p = thr->parker(); if (p != NULL) { // Bind to Java thread for next time. java_lang_Thread::set_park_event(java_thread, addr_to_java(p)); } } } } } } if (p != NULL) { #ifndef USDT2 HS_DTRACE_PROBE1(hotspot, thread__unpark, p); #else /* USDT2 */ HOTSPOT_THREAD_UNPARK( (uintptr_t) p); #endif /* USDT2 */ p->unpark(); } UNSAFE_END 复制代码
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time)) UnsafeWrapper("Unsafe_Park"); EventThreadPark event; #ifndef USDT2 HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time); #else /* USDT2 */ HOTSPOT_THREAD_PARK_BEGIN( (uintptr_t) thread->parker(), (int) isAbsolute, time); #endif /* USDT2 */ JavaThreadParkedState jtps(thread, time != 0); thread->parker()->park(isAbsolute != 0, time); #ifndef USDT2 HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker()); #else /* USDT2 */ HOTSPOT_THREAD_PARK_END( (uintptr_t) thread->parker()); #endif /* USDT2 */ if (event.should_commit()) { oop obj = thread->current_park_blocker(); event.set_klass((obj != NULL) ? obj->klass() : NULL); event.set_timeout(time); event.set_address((obj != NULL) ? (TYPE_ADDRESS) cast_from_oop<uintptr_t>(obj) : 0); event.commit(); } UNSAFE_END 复制代码
在unlock()方法中,会调用release()方法来释放锁:
public void unlock() { sync.release(1); } 复制代码
public final boolean release(int arg) { //释放锁成功 if (tryRelease(arg)) { //得到AQS队列中的head节点 Node h = head; //如果head不为空并且状态不等于0,调用unpark唤醒后续节点 if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } 复制代码
protected final boolean tryRelease(int releases) { //state状态减掉传入的参数1 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //如果结果为0,将排它锁的Owner设置为null //解锁的时候减掉 1,同一个锁,在可以重入后,可能会被叠加为 2、3、4 这些值,只有 unlock()的次数与 lock()的次数对应才会将 Owner 线程设置为空,而且也只有这种情况下才会返回 true if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } 复制代码
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ //获得head节点的状态 int ws = node.waitStatus; if (ws < 0) //设置head节点的状态为0 compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ //得到head节点的下一个节点 Node s = node.next; //如果下一个节点为 null 或者 status>0 表示 cancelled 状态 if (s == null || s.waitStatus > 0) { s = null; //通过从尾部节点开始扫描,找到距离 head 最近的一个waitStatus<=0 的节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //next 节点不为空,直接唤醒这个线程即可 if (s != null) LockSupport.unpark(s.thread); } 复制代码
我们在加锁的enq()方法中,在 cas 操作之后,t.next=node 操作之前。 存在其他线程调用 unlock 方法从 head开始往后遍历,由于 t.next=node 还没执行意味着链表的关系还没有建立完整。就会导致遍历到 t 节点的时候被中断。所以从后往前遍历,一定不会存在这个问题。
通过ReentrantLock.unlock()将原本挂起的线程换唤醒后继续执行,原来被挂起的线程是在 acquireQueued () 方法中,所以被唤醒以后继续从这个方法开始执行.
锁的公平性是相对于获取锁的顺序而言的,如果是一个公平锁,那么锁的获取顺序 就应该符合请求的绝对时间顺序,也就是 FIFO 。 只要CAS设置同步状态成功,则表示当前线程获取了锁,而公平锁则不一样,差异点 有两个:
1、FairSync.lock()方法
final void lock() { acquire(1); } 复制代码
2、NonfairSync.lock()方法
非公平锁在获取锁的时候,会先通过 CAS 进行抢占,而公平锁则不会。
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } 复制代码