面试互联网公司不得不说的高并发!
在这里想写写自己在学习并发处理的学习思路,也会聊聊自己遇到的那些坑,以此为记,希望鞭策自己不断学习、永不放弃!
具体笔者认为大体可分为分两部分:
第一部分:Java多线程编程。
第二部分:高并发的解决思路。
第三部分:分布式架构中redis、zookeeper分布式锁的应用。
本文着重讲解第一块。
1、Java内存模型与线程。
并发编程主要讨论以下几点:多个线程操作相同资源,保证线程安全,合理使用资源。
通常我们可以将物理计算机中出现的并发问题类比到JVM中的并发。
物理计算机处理器、高速缓存、主内存间交互关系如图:
处理器和内存的运行速度存在几个数量级别的差距,因此为解决此矛盾引入了告诉缓存这一概念。当多个处理器的运行任务都涉及到同一块主内存区域时,将可能导致各自缓存数据的不一致问题,为解决一致性问题,需要各个处理器访问缓存时都遵循一些协议,在读写时要根据协议来进行操作。(MSI、MESI、MOSI、Synapse、Firefly及Dragon Protocol等)
处理器为提高性能,会对输入代码乱序执行(Out-Of-Order Execution) 优化。
类比Java内存模型,线程、主内存、工作内存交互关系如图:
JMM定义程序中各个变量访问规则,即在虚拟机中将内存取出和存储的底层细节。
线程A如果要跟线程B要通信的话,必须经历以下两个步骤: 1)线程A把本地内存A中更新过的共享变量的值刷新到主内存中。 2)线程B去主内存中读取A更新过的共享变量的值。
线程的工作内存中保存了该线程使用到变量的主内存副本拷贝(也可理解为此线程的私有拷贝),线程对变量的操作(读取、赋值等)都在工作内存中进行,而不能直接读写主内存中变量。不同线程之间的通信业需要通过主内存来完成。 主内存对应Java堆中对象实例数据部分,而工作内存则对应虚拟机栈中部分区域。
在此还有一个需要提及的点!
指令重排序
执行程序时,为提高性能,编译器和处理器常常会对指令做出重排序。分三种:
1)编译器优化的重排序。
2)指令并行重排序。
3)内存系统重排序。
JMM的编译器会禁止特定类型的编译器重排序,对于处理器重排序(后两者),则要求Java编译器在生成指令序列时,插入特定类型的内存屏障指令,通过内存屏障指令来禁止特定类型的处理器重排序。
内存之间的交互操作
JMM中定义了8种操作来来描述工作内存与主内存之间的实现细节。
JMM规定了执行上述八种操作时必须满足的规则(与happens-before原则是等效的,即先行发生原则):
补充:JVM-攻城掠地
2、测试工具
PostMan、Apache Bench、JMeter、LoadRunner
3、线程安全性
原子性:提供了互斥访问,同一时刻只能由一个线程来对它进行操作。
可见性:一个线程对主内存的修改可以及时被其他线程观察到。
有序性:一个线程观察其它线程中指令执行顺序,由于指令重排序的存在,该观察的结果一般为杂乱无章的。 (happens-before原则) Java程序的天然有序性可以总结为:如果本线程内观察,所有的操作都是有序的;如果在一个线程观察另一个线程,所有的操作都是无须的。前者指的是线程内的串行语义,后者指的是指令重排序和工作内存和主内存同步延迟现象。
通过CAS来保证原子性,即Compare And Swap 比较交换:
CAS利用处理器提供的CMPXCHG指令实现,自旋CAS实现的基本思路就是循环进行CAS直到成功为止。 比较内存的值与预期的值,若相同则修改预期的值。
CAS虽然可以进行高效的进行源自操作,但是CAS仍在存在三大问题。
public class AtomicExample1 { // 请求总数 public static int clientTotal = 5000; // 同时并发执行的线程数 public static int threadTotal = 200; public static AtomicInteger count = new AtomicInteger(0); public static void main(String[] args) throws Exception { ExecutorService executorService = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(threadTotal); final CountDownLatch countDownLatch = new CountDownLatch(clientTotal); for (int i = 0; i < clientTotal ; i++) { executorService.execute(() -> { try { semaphore.acquire(); add(); semaphore.release(); } catch (Exception e) { log.error("exception", e); } countDownLatch.countDown(); }); } countDownLatch.await(); executorService.shutdown(); log.info("count:{}", count.get()); } private static void add() { count.incrementAndGet(); // count.getAndIncrement(); } }
AtomicInteger
源码实现
public final int incrementAndGet() { return unsafe.getAndAddInt(this, valueOffset, 1) + 1; } public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this.getIntVolatile(var1, var2); } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; } //当前的指为var2,底层穿过来的值var5 如果当前的值与底层传过来的值一样的话,则将其更新问var5+var4
AtomicLong与LongAdder
public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } } /** * Equivalent to {@code add(1)}. */ public void increment() { add(1L); }
AtomicBoolean
public final boolean compareAndSet(boolean expect, boolean update) { int e = expect ? 1 : 0; int u = update ? 1 : 0; return unsafe.compareAndSwapInt(this, valueOffset, e, u); }
AtomicReference
public final V getAndSet(V newValue) { return (V)unsafe.getAndSetObject(this, valueOffset, newValue); } public final Object getAndSetObject(Object var1, long var2, Object var4) { Object var5; do { var5 = this.getObjectVolatile(var1, var2); } while(!this.compareAndSwapObject(var1, var2, var5, var4)); return var5; }
AtomicIntegerFieldUpdater
public class AtomicExample5 { private static AtomicIntegerFieldUpdater<AtomicExample5> updater = AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class, "count"); @Getter public volatile int count = 100; public static void main(String[] args) { AtomicExample5 example5 = new AtomicExample5(); if (updater.compareAndSet(example5, 100, 120)) { log.info("update success 1, {}", example5.getCount()); } if (updater.compareAndSet(example5, 100, 120)) { log.info("update success 2, {}", example5.getCount()); } else { log.info("update failed, {}", example5.getCount()); } } }
AtomicStampedReference
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { Pair<V> current = pair; return expectedReference == current.reference && expectedStamp == current.stamp && ((newReference == current.reference && newStamp == current.stamp) || casPair(current, Pair.of(newReference, newStamp))); }
AtomicLongArray 维护数组
线程安全-可见性
导致共享变量在线程间不可见的原因
1)线程交叉执行。
2)重排序结合线程交叉执行。
3)共享变量更新后的值没有在工作内存与主内存及时更新。
JMM关于synchronizd的两条规定:
volatile-可见性 通过加入内存屏障和禁止重排序优化实现。
必须符合以下场景才可使用:
private static void add() { count++; // 1、count 取出当前内存中的值 // 2、+1 // 3、count 写回主存 //即:两个线程同时执行+1写回主存就出现问题。 }
volatile通常用来作为状态标记量
volatile boolean inited = false; //线程1: context = loadContext(); inited = true; //线程2; while (!inited){ sleep(); } doSomethingWithConfig(context);
4、安全发布对象
发布对象:使一个对象能够被当前范围之外代码所使用。
对象逸出:一种错误的发布。当一个对象还没有构造完成,就能被其它线程所见。
安全发布对象
public class SingletonExample4 { // 私有构造函数 private SingletonExample4() { } // 1、memory = allocate() 分配对象的内存空间 // 2、ctorInstance() 初始化对象 // 3、instance = memory 设置instance指向刚分配的内存 // JVM和cpu优化,发生了指令重排 // 1、memory = allocate() 分配对象的内存空间 // 3、instance = memory 设置instance指向刚分配的内存 // 2、ctorInstance() 初始化对象 // 单例对象 private volatile static SingletonExample4 instance = null; // 静态的工厂方法 public static SingletonExample4 getInstance() { if (instance == null) { // 双重检测机制 // B synchronized (SingletonExample4.class) { // 同步锁 if (instance == null) { instance = new SingletonExample4(); // A - 3 } } } return instance; } }
通过枚举实现单例模式
/** * 枚举模式:最安全 */ @ThreadSafe @Recommend public class SingletonExample7 { // 私有构造函数 private SingletonExample7() { } public static SingletonExample7 getInstance() { return Singleton.INSTANCE.getInstance(); } private enum Singleton { INSTANCE; private SingletonExample7 singleton; // JVM保证这个方法绝对只调用一次 Singleton() { singleton = new SingletonExample7(); } public SingletonExample7 getInstance() { return singleton; } } }
1) 不可变对象
满足条件:
2) 线程封闭
3) 线程不安全写法
4) 同步容器
ArrayList -> CopyOnWriteArrayList
public void add(int index, E element) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; if (index > len || index < 0) throw new IndexOutOfBoundsException("Index: "+index+ ", Size: "+len); Object[] newElements; int numMoved = len - index; if (numMoved == 0) newElements = Arrays.copyOf(elements, len + 1); else { newElements = new Object[len + 1]; System.arraycopy(elements, 0, newElements, 0, index); System.arraycopy(elements, index, newElements, index + 1, numMoved); } newElements[index] = element; setArray(newElements); } finally { lock.unlock(); } } public E get(int index) { return get(getArray(), index); }
HashSet、TreeSet --> CopyOnWriteArraySet、ConcurrentSkipListSet
ConcurrentSkipListSet对批量操作不能保证原子性。
参考: JDK1.8源码分析之ConcurrentSkipListSet(八)
HashMap、TreeMap --> ConcurrentHashMap、ConcurrentSkipListMap
ConcurrentHashMap效率相对比ConcurrentSkipListMap高,ConcurrentSkipListMap有些其不具有的特性:
6、J.U.C之AQSAbstractQueuedSynchronizer-AQS
AQS同步组件 1)等待多线程完成的CountDownLatch(JDK1.5)
允许一个或多个线程等待其他线程完成操作。
其构造函数接收一个int类型的参数作为计数器,调用countDown方法的时候,计数器的值会减1,CountDownLatch的await方法会阻塞当前线程,直到N变为零。
应用:并行计算,解析Excel中多个sheet的数据。
2)控制并发线程数的 Semaphore 用来控制同时访问特定资源线程的数量。 应用:流量控制,特别是公共资源有限的场景,如数据库连接。
//可用的许可的数量 Semaphore(int permits) //获取一个许可 aquire() //使用完成后归还许可 release() //尝试获取许可证 tryAcquire()
3)同步屏障 CyclicBarrier
让一组线程达到一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,才会开门,所有被屏障拦截的线程才会继续执行。 应用:多线程计算数据,最后合并计算结果的场景。
CyclicBarrier和CountDownLatch的区别
//屏障拦截的线程数量 CyclicBarrier(int permits) //已经到达屏障 await() //CyclicBarrier阻塞线程的数量 getNumberWaiting()
4)重入锁 ReentrantLock (排他锁:同时允许单个线程访问。) 支持重进入的锁,表示该锁能够支持一个线程对资源的重复加锁。(即实现重进入:任意线程获取到锁之后能够再次获取该锁而不会被锁阻塞。)
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
公平锁就是等待时间最长的线程最优先获取锁,也就是说获取锁的是顺序的(FIFO)。而非公平则允许插队。 非公平因为不保障顺序,则效率相对较高,而公平锁则可以减少饥饿发生的概率。
ReentrantReadWriteLock (读写锁,实现悲观读取,同时允许多个线程访问)
在写线程访问时,所有的读线程和其他写线程均被堵塞。其维护了一对锁,通过分离读锁、写锁,使得并发性比排他锁有很大提升。
适用于读多写少的环境,能够提供比排他锁更好的并发与吞吐量。
不足:ReentrantReadWriteLock是读写锁,在多线程环境下,大多数情况是读的情况远远大于写的操作,因此可能导致写的饥饿问题。
StampedLock
是ReentrantReadWriteLock 的增强版,是为了解决ReentrantReadWriteLock的一些不足。 StampedLock读锁并不会阻塞写锁,设计思路也比较简单,就是在读的时候发现有写操作,再去读多一次。。 StampedLock有两种锁,一种是悲观锁,另外一种是乐观锁。如果线程拿到乐观锁就读和写不互斥,如果拿到悲观锁就读和写互斥。
参考: Java8对读写锁的改进:StampedLock
5)Condition
参考: Java线程(九):Condition-线程通信更高效的方式
6)FutureTask
参考: Java并发编程:Callable、Future和FutureTask
7)Fork/Join
参考: Fork/Join 模式高级特性
8)BlocklingQueue
参考:Java中的阻塞队列
参考: Java 四种线程池的用法分析
参考资料:
《深入理解Java虚拟机》
《Java并发编程艺术》
《Java多线程编程核心技术》
慕课:Java并发编程与高并发解决方案