原创技术文章,版权归作者所有,若转载请标明出处
公众号,待定,原公众号长期试灰已被冻结
上篇文章介绍了多线程的原理及其意义,在文末提出了线程安全的问题,这也是我们在高并发下常常听到的词。本文主要介绍在单实例服务中,我们如何快速上手使用 jdk
为我们提供的锁机制,并了解其原理,剩余 AQS
和 Condition
留到后面讲解,全文 纯手打
,代码 纯手写
,图片 纯手画
,希望能给大家带来一些收获,如有不对的地方,欢迎指正。
我们在开发高并发应用时,常常都在强调线程安全,那什么样的场景是不安全的呢?
上道(an)具(li),上个章节中提到了搬货的例子,不知道小伙伴们有没有试下最后的案例,
上篇文章提到的案例,
马上双十一了,加入仓库有一堆货物,目前只有一个人搬(甲),暂定有1000000件,如果甲只能一次搬一件,那么总共需要搬运1000000次,如果能加入一个乙与甲一起搬,假定两人搬运时间一样,那么两人各搬500000次,就能搞定。
上章节结果,
搬运完成,员工:乙,搬运:[272708]次
搬运完成,员工:甲,搬运:[727292]次
上章节思考
例子中用到了synchronized和AtomicInteger,是为了保证货物剩余总量在多人搬运下,数值也是对的,可以尝试将AtomicInteger换成Integer或者去除synchronized,两人最终总计搬运和将与原货物总量不一致。
下面我们去掉synchronized,并将AtomicInteger替换成int
# 搬货 static class Carry extends Thread{ /** 搬运人*/ private String peopleName; /** 搬运次数*/ private int carryNum; public Carry(String peopleName) { this.peopleName = peopleName; } @Override public void run() { while (!isInterrupted()) { if (cargoNum > 0) { cargoNum--; carryNum++; } else { System.out.println("搬运完成,员工:" + peopleName + ",搬运:[" + carryNum + "]次"); interrupt(); } } } } 复制代码
开搬
/** 货物个数*/ static int cargoNum = 1000000; public static void main(String[] args) { Carry carry1 = new Carry("甲"); Carry carry2 = new Carry("乙"); carry1.start(); carry2.start(); } 复制代码
结果
搬运完成,员工:乙,搬运:[983347]次 搬运完成,员工:甲,搬运:[995228]次 复制代码
我们发现,总共只有1000000的货物,讲道理,最终两人应该总共搬了1000000次,但是两个人都接近搬了1000000次,这样不是重复劳动了吗?没错,这就是在多线程下的线程不安全引发的。
我们想下这样的原因,
甲、乙两人同时搬运,甲搬的时候拿到了总货物剩余量,然后-1,自己的工作量+1,然后搬走货物,乙同样如此操作,看似好像没什么问题,但是试想一下,如果甲、乙同时搬时,同时拿到剩余量,比如1000,然后同时-1,那么两人都认为还剩余999,于是就产生了多余的劳动力。
如果我们能保证,剩余总量在甲更新的时候,乙不会更新,反之,乙更新的时候,甲也不会更新,那么最终就不会出现这样的问题了。
所以这里就会用到锁,加把锁,我用的时候锁上,你等着,我用完,打开锁,你用,如此既能保证数据是安全的。
《Java Concurrency In Practice》中对线程安全的定义如下,当多个线程访问一个对象时,如果不用考虑这些线程在运行时环境下的调度和交替执行,也不需要进行额外的同步,或者在调用方进行任何其他的协调操作,调用这个对象的行为都可以获得正确的结果,那这个对象是线程安全的。
synchronized
是jdk为我们提供的一个内置互斥锁,大致分为 类锁
和 对象锁
,见名知意,类锁就是锁类文件的对象(class),对象锁则是锁类实例对象。
下面简单理解其原理, 深入剖析将在后续字节码、JMM章节中 ,
synchronized的实现是依赖Monitor,而一个对象实例会关联一个Monitor, 参考JVM规范
Each object is associated with a monitor. A monitor is locked if and only if it has an owner. The thread that executes monitorenter attempts to gain ownership of the monitor associated with objectref
关系信息主要在对象头中,对象头分为 Mark Word(标记信息)
和 Klass Pointer(指向类元数据)
两个部分。 (具体关于对象分析,会在后续有关字节码的章节中探讨)
其中 Mark Word
中的 LockWord
会指向一个 Monitor
, Monitor
包含的信息如上图,
Owner EntryQ RCThis Nest HashCode Condidate
根据上图,简单总结,一个线程想锁住一个对象,那么它需要持有对象关联的Monitor,其他线程则(Thread2、Thread3)会阻塞。有人会问了,抢夺的过程如何保证线程安全,其实JVM是通过CAS实现的,CAS则会在后文中讲解。
类锁锁的是class对象,class文件在JVM中的对象
举例如下
首先我们创建一个打印类
public class Print { public void print() throws InterruptedException { System.out.println(Thread.currentThread().getName() + " print....."); # 睡眠1s Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " print end....."); } } 复制代码
线程类
static class PrintThread extends Thread { @Override public void run() { Print print = new Print(); # 锁住类 synchronized (Print.class) { try { print.print(); } catch (InterruptedException e) { e.printStackTrace(); } } } } 复制代码
执行
public static void main(String[] args) { PrintThread printThread1 = new PrintThread(); PrintThread printThread2 = new PrintThread(); printThread1.start(); printThread2.start(); } 复制代码
结果
Thread-0 print..... Thread-0 print end..... Thread-1 print..... Thread-1 print end..... 复制代码
我们可以看到,虽然是两个线程调用各自的打印实例,但是,第二个线程明显被阻塞了,如果我们去掉 synchronized (Print.class)
,结果如下
Thread-0 print..... Thread-1 print..... Thread-0 print end..... Thread-1 print end..... 复制代码
这样两个线程就是并行执行了,同理,对于调用类static 修饰的方法,在方法上添加 synchronized
,也会锁住类,此处不做演示。
对象锁的是实例对象
我们改造下线程类,print实例通过构造传入
static class PrintThread extends Thread { private Print print; PrintThread(Print print) { this.print = print; } @Override public void run() { synchronized (print) { try { print.print(); } catch (InterruptedException e) { e.printStackTrace(); } } } } 复制代码
执行类,这里还是两个线程传入不同的类实例
public static void main(String[] args) { PrintThread printThread1 = new PrintThread(new Print()); PrintThread printThread2 = new PrintThread(new Print()); printThread1.start(); printThread2.start(); } 复制代码
结果
Thread-0 print..... Thread-1 print..... Thread-1 print end..... Thread-0 print end..... 复制代码
可以看到,不同的线程互不影响
那么,我们如果传入同一个实例呢?
执行类
public static void main(String[] args) { Print print = new Print(); PrintThread printThread1 = new PrintThread(print); PrintThread printThread2 = new PrintThread(print); printThread1.start(); printThread2.start(); } 复制代码
结果
Thread-0 print..... Thread-0 print end..... Thread-1 print..... Thread-1 print end..... 复制代码
可以看到,对于同一个打印实例,两个线程执行是有顺序的,第一个线程持有了锁,然后第二个线程被阻塞了,等第一个线程执行完成,第二个线程才获取到对象锁执行。
volatile相对于synchronized是jdk提供的更加轻量的锁,为了便于理解,我们先来两个开胃菜。
线程间数据共享
目前有个变量a为1,而线程1和线程2都需要更新a的值,那么同时操作完成后,a的值是多少?
显然,a最终有可能为2,也有可能为3,比如线程1拿到了a,此时a为1,然后赋值为2,但是还没有同步回主内存中,此时线程2拿到了a仍然为1,设置成了3,那这样数据不就不安全了吗?除了可以用前面将的 synchronized
保证线程安全,我们还可以使用 volatile
。
如果一个字段被声明成 volatile
,java线程内存模型确保所有线程看到这个变量的值是一致的。但是,仅支持原子操作可见,比如
# 两个线程同时对i进行++操作 public class VolatileTest { static volatile int i = 0; static class AddThread extends Thread { @Override public void run() { for (int j = 0; j < 100000; j++) { i++; } } } public static void main(String[] args) { AddThread add1 = new AddThread(); AddThread add2 = new AddThread(); add1.start(); add2.start(); while (Thread.activeCount() > 2) { Thread.yield(); } System.out.println("i 最终为 " + i); } } 复制代码
结果
i 最终为 177762 复制代码
而最终, i
应该为 200000
才是正确的,所以volatile无法保证非原子操作的线程安全。
下面加下volatile的几个特性:
jvm为了性能优化,运行编译器和处理器对指令进行重排序,看下面一段代码:
public class VolatileDemo { private int i = 0; private boolean flag = false; public void write() { # 操作1 i = 1; # 操作2 flag = true; System.out.println("更新完成..."); } public void read() { # 操作3 if (flag) { # 操作4 System.out.println("i 被更新为, i -> " + i); } } } 复制代码
正常的写操作是 1->2
,读操作是 3->4
,但是如果发生指令重排,写操作可能变成 2->1
,那么读操作时,可能存在还未更新完成,就读到了值,所以我们可以使用 volatile
修饰 flag
private volatile boolean flag = false; 复制代码
这样可以防止编译器或处理器对其进行指令重排,而实现方式时基于 内存屏障
内存屏障 | 说明 |
---|---|
StoreStore | 禁止上面的普通写和下面的 volatile 写重排序 |
StoreLoad | 防止上面的 volatile 写与下面可能有的 volatile 读/写重排序 |
LoadLoad | 禁止下面所有的普通读操作和上面的 volatile 读重排序 |
LoadStore | 禁止下面所有的普通写操作和上面的 volatile 读重排序 |
而对于被volatile修饰的变量
操作 | 说明 |
---|---|
volatile 写 | 前插入一个 StoreStore 屏障。 |
volatile 写 | 后插入一个 StoreLoad 屏障。 |
volatile 读 | 前插入一个 LoadLoad 屏障。 |
volatile 读 | 后插入一个 LoadStore 屏障。 |
综上,我们可以得出如下防重排规则,NO代表禁止重排
ThreadLocal能够提供线程局部变量,同一个线程中可共享,不同的线程互相隔离。
简单举例说明
# 创建线程副本,默认至1 static ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 1); public static void main(String[] args) { for (int i = 0; i < 5; i++) { new Thread(() -> { # 获取值 int n = threadLocal.get(); # ++ n++; System.out.println(Thread.currentThread().getId() + ": " + n); }).start(); } } 复制代码
结果
13: 2 15: 2 14: 2 12: 2 16: 2 复制代码
我们可以看出,从threadlocal获取的值,每个线程是互相独立的,那么接下来我们看下ThreadLocal是如何实现的
下面我们来 手撕源码
set()方法
/** * Sets the current thread's copy of this thread-local variable * to the specified value. Most subclasses will have no need to * override this method, relying solely on the {@link #initialValue} * method to set the values of thread-locals. * * @param value the value to be stored in the current thread's copy of * this thread-local. */ public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); } 复制代码
看注释,将此线程局部变量的当前线程副本设置为指定的值。有点拗口,我们可以看到,这里获取了当前线程,并且调用了 getMap(t)
,那我们看下这个方法
/** * Get the map associated with a ThreadLocal. Overridden in * InheritableThreadLocal. * * @param t the current thread * @return the map */ ThreadLocalMap getMap(Thread t) { return t.threadLocals; } 复制代码
获取了当前线程的 t.threadLocals
,返回的是 ThreadLocal.ThreadLocalMap
,这里我们先搞清楚第一层关系, Thread内会包含一个ThreadLocal.ThreadLocalMap
那我看下ThreadLocal.ThreadLocalMap
/** * ThreadLocalMap is a customized hash map suitable only for * maintaining thread local values. No operations are exported * outside of the ThreadLocal class. The class is package private to * allow declaration of fields in class Thread. To help deal with * very large and long-lived usages, the hash table entries use * WeakReferences for keys. However, since reference queues are not * used, stale entries are guaranteed to be removed only when * the table starts running out of space. */ static class ThreadLocalMap { /** * The entries in this hash map extend WeakReference, using * its main ref field as the key (which is always a * ThreadLocal object). Note that null keys (i.e. entry.get() * == null) mean that the key is no longer referenced, so the * entry can be expunged from table. Such entries are referred to * as "stale entries" in the code that follows. */ static class Entry extends WeakReference<ThreadLocal<?>> { /** The value associated with this ThreadLocal. */ Object value; Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } } /** * The initial capacity -- MUST be a power of two. */ private static final int INITIAL_CAPACITY = 16; /** * The table, resized as necessary. * table.length MUST always be a power of two. */ private Entry[] table; // ...后面略,着重看上面这些 复制代码
ThreadLocalMap中定义了Entry,key是指向ThreadLocal的弱引用 WeakReference
,value则是我们保存的值,什么是弱引用呢?
我们平时使用的最多的就是强引用,如 Object o = new Object()
,o与Object之间是强引用,如果一直引用,GC时,不会回收o,而弱引用,是在下一次GC时,对象就会被回收。
好的,回到 set()
,我们得出大致的关系 Thread->ThreadLocal.ThreadLocalMap->ThreadLocal
,额,好像是有点绕,没关系,后面有 彩蛋
!
下面看下 get()
方法
/** * Returns the value in the current thread's copy of this * thread-local variable. If the variable has no value for the * current thread, it is first initialized to the value returned * by an invocation of the {@link #initialValue} method. * * @return the current thread's value of this thread-local */ public T get() { # 获取当前线程 Thread t = Thread.currentThread(); # 获取当前线程的ThreadLocalMap ThreadLocalMap map = getMap(t); if (map != null) { # 获取当前线程对应当前ThreadLocal的值 ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); } 复制代码
理解了Thread、ThreadLocal、ThreadLocalMap的关系,get()应该就能很好理解了
然后看下 getEntry(this)
/** * Get the entry associated with key. This method * itself handles only the fast path: a direct hit of existing * key. It otherwise relays to getEntryAfterMiss. This is * designed to maximize performance for direct hits, in part * by making this method readily inlinable. * * @param key the thread local object * @return the entry associated with key, or null if no such */ private Entry getEntry(ThreadLocal<?> key) { # 通过ThreadLocal获取其对应的值,通过散列算法 int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; # 比较key if (e != null && e.get() == key) return e; else return getEntryAfterMiss(key, i, e); } 复制代码
没找到的话,会进行循环寻址 getEntryAfterMiss(key, i, e)
/** * Version of getEntry method for use when key is not found in * its direct hash slot. * * @param key the thread local object * @param i the table index for key's hash code * @param e the entry at table[i] * @return the entry associated with key, or null if no such */ # 循环遍历,直到找到为止,此处markdown显示bug,o_o private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) { Entry[] tab = table; int len = tab.length; while (e != null) { ThreadLocal<?> k = e.get(); if (k == key) return e; if (k == null) expungeStaleEntry(i); else i = nextIndex(i, len); e = tab[i]; } return null; } 复制代码
ok,读到这里,应该对ThreadLocal的原理有了比较清晰的认识吧,建议读者可以自己跟下源码。如果还不理解,没关系,看彩蛋:
为了便于理解,笔者亲自供上上图,实线部分代表强引用,虚线部分代表的是弱引用,看懂这张关系图,对ThreadLocal应该就得心应手了。
但是,我们常常会发现使用ThreadLocal会造成 内存泄漏
,这是为什么呢,特别是在使用 线程池
的时候。看图可知,因为Entry是个强引用,如果线程未结束,这块内存是会一直存在的,如果使用线程池,同一个线程上就会存在大量的不需要数据,此时使用完成应该及时释放。
了解完了内置锁,我们学习一下一种更轻量级的锁, 显式锁
,这块会主要讲解如何使用(写到这里已经 4000
字了,避免读者用脑过度o_o), 后面会详细讲解其核心AQS及Condition原理
。
老规矩,先来一段非线程安全案例
# 计数器 public class Counter { private Integer n = 0; public void Add() { n++; } public Integer getN() { return n; } } 复制代码
启动两个线程,各调用add() 100000次
static class AddThead extends Thread { private Counter counter; AddThead(Counter counter) { this.counter = counter; } @Override public void run() { for (int i = 0; i < 100000; i++) { counter.Add(); } } } public static void main(String[] args) { Counter counter = new Counter(); for (int i = 0; i < 2; i++) { new AddThead(counter).start(); } while(Thread.activeCount() > 2) { Thread.yield(); } System.out.println("结果:" + counter.getN()); } 结果:194758 复制代码
使用Lock
public class Counter { private Lock lock = new ReentrantLock(); private Integer n = 0; public void Add() { try { lock.lock(); n++; } finally { lock.unlock(); } } public Integer getN() { return n; } } 结果:200000 复制代码
try { lock.lock(); # 其他 } finally { lock.unlock(); } 复制代码
使用完成之后记得释放锁
public static void main(String[] args) { Lock lock = new ReentrantLock(); # 锁5次 for (int i = 0; i < 5; i++) { lock.lock(); } # 解锁5次 for (int i = 0; i < 5; i++) { lock.unlock(); } System.out.println("end ...."); } 复制代码
我们启动两个线程,让一个线程持有锁,另一个线程tryLock()
尝试获取锁
static Lock lock = new ReentrantLock(); # 持有锁 static class Holder extends Thread { @Override public void run() { try { lock.lock(); # 睡眠 Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } } # 尝试锁 static class TryLock implements Callable<Boolean> { @Override public Boolean call() throws Exception { # 尝试获取锁,获取到则返回true if (lock.tryLock()) { try{ return true; }finally { lock.unlock(); } } return false; } } public static void main(String[] args) throws ExecutionException, InterruptedException { Holder holder = new Holder(); holder.start(); FutureTask<Boolean> futureTask = new FutureTask<>(new TryLock()); Thread tryThread = new Thread(futureTask); tryThread.start(); Boolean result = futureTask.get(); System.out.println("result :" + result); } result :false 复制代码
获取锁超时
添加获取等待时间, lock.tryLock(6000, TimeUnit.MILLISECONDS)
,然后我们再次执行
结果
result :true 复制代码
所谓公平锁是指先到先锁,而非公平锁是在线程切换时允许抢占,谁抢到谁拥有锁。
前面讲到的ReetrantLock同时只运行一个线程执行,而 ReentrantReadWriteLock
允许多个线程同时执行以提升在读多写少的场景的吞吐量。
demo
public class ReentrantReadWriteLockDemo { ReadWriteLock lock = new ReentrantReadWriteLock(); public void write() { try { lock.writeLock().lock(); System.out.println(Thread.currentThread().getId() + " 持有写锁"); Thread.sleep(1000); System.out.println(Thread.currentThread().getId() + " 准备释放写锁"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.writeLock().unlock(); } } public void read() { try { lock.readLock().lock(); System.out.println(Thread.currentThread().getId() + " 持有读锁"); Thread.sleep(1000); System.out.println(Thread.currentThread().getId() + " 准备释放读锁"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.readLock().unlock(); } } } 复制代码
读读
public static void main(String[] args) { ReentrantReadWriteLockDemo lockDemo = new ReentrantReadWriteLockDemo(); new Thread(()->{ lockDemo.read(); }).start(); new Thread(()->{ lockDemo.read(); }).start(); } =======================结果======================== 11 持有读锁 12 持有读锁 11 准备释放读锁 12 准备释放读锁 复制代码
可以看到两个线程是可以同时执行的
读写
public static void main(String[] args) { ReentrantReadWriteLockDemo lockDemo = new ReentrantReadWriteLockDemo(); new Thread(()->{ lockDemo.read(); }).start(); new Thread(()->{ lockDemo.write(); }).start(); } =======================结果======================== 11 持有读锁 11 准备释放读锁 12 持有写锁 12 准备释放写锁 复制代码
可以看到两个是互斥的,写读类似,不做举例
写写
public static void main(String[] args) { ReentrantReadWriteLockDemo lockDemo = new ReentrantReadWriteLockDemo(); new Thread(()->{ lockDemo.write(); }).start(); new Thread(()->{ lockDemo.write(); }).start(); } =======================结果======================== 11 持有写锁 11 准备释放写锁 12 持有写锁 12 准备释放写锁 复制代码
同样互斥,所以我们可以总结出,写操作是互斥的,只要有写就互斥:
操作 | 是否互斥 |
---|---|
读读 | 否 |
读写 | 是 |
写写 | 是 |
写读 | 是 |
jdk1.8提供了 StampedLock
(改进后的读写锁),后续章节会深入探讨。
除了使用wait()、notify()/notifyall()来阻塞和唤醒线程,我们还可以使用 LockSupport
工具。
LockSupport区别于前者的是,它阻塞及唤醒的是线程,而wait、notify我们可以仔细看,它是调用的对象的方法,锁住的是对象。
看下LockSupport源码
park()
public static void park() { UNSAFE.park(false, 0L); } # 可设置阻塞时长 public static void parkNanos(long nanos) { if (nanos > 0) UNSAFE.park(false, nanos); } 复制代码
upPark()
# 唤醒某个线程 public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); } 复制代码
park、unpark调用的是native方法,除了锁住的是线程外,还需要注意,阻塞时,是不能捕获Interrupt异常的,需要自己手动去判断状态。
CAS(compare and swap),看名字应该挺好理解,比较且替换。这是目前CPU处理器能够提供的技术,前面我们用到的锁,不管是锁对象、阻塞线程,都相对来说会有性能损耗。而CAS相比不需要锁,它是通过一个地址V,原值A,新值B来实现,比如,a=0,我们想改为1,我们首先需要知道a的内存地址,然后比较是否为0,如果是就改为1。如果不是,就返回目前的值,然后自旋操作。
CAS在对象锁,AQS中同样有应用,原理跟上述一致,后续写AQS时会讲解。在JAVA中,提供了一些通过CAS实现的原子操作类,如:
回顾之前,我们使用了synchronized来锁Integer来实现货物计算,现在我们来使用AtomicInteger来计算
static AtomicInteger n = new AtomicInteger(0);d # 每个线程计算10000次 static class AddThread extends Thread { @Override public void run() { for (int i = 0; i < 10000; i++) { n.incrementAndGet(); } } } public static void main(String[] args) { # 启动五个线程 for (int i = 0; i < 5; i++) { new AddThread().start(); } while(Thread.activeCount() > 2) { Thread.yield(); } System.out.println("n :" + n.get()); } 结果 n :50000 复制代码
可以看到,我们并没有使用什么synchronized、volatile、lock,但是最终结果是正确的。其他AtomicXXX使用可自己练习。
本章节主要介绍了JAVA中的锁机制,包括synchronized、volatile、lock、cas等,其中有的深入讲了,有的主要讲了使用(本次写的有点多,后面尽量拆细点,深入点),希望能给大家带来一些帮助。如果有写的有问题的或需要讨论的可留言指正。