LongAdder是jdk8新增的用于高并发环境的计数器。
Atomicxxx使用硬件级别的指令CAS来更新计数器的值,在高并发的情况下每次只能有一个线程能成功,竞争失败的线程会非常多,白白浪费了很多cpu事件,因为竞争失败的线程会自旋。
jdk8的AtomicLong:
// jdk1.8的AtomicLong的实现代码,这段代码在sun.misc.Unsafe中 // 当线程竞争很激烈时,while判断条件中的CAS会连续多次返回false,这样就会造成无用的循环,循环中读取volatile变量的开销本来就是比较高的 // 因为这样,在高并发时,AtomicXXX并不是那么理想的计数方式 public final long getAndAddLong(Object o, long offset, long delta) { long v; do { v = getLongVolatile(o, offset); } while (!compareAndSwapLong(o, offset, v, v + delta)); return v; }
LongAdder根据锁分段来实现,维护了一组按需分配的计数单元。并发计数时,不同的线程可以在不同的计数单元上进行计数,减少了线程竞争。本质上是空间换时间的思想。
公共父类Striped64实现了一些核心操作,处理64位数据。二元算术运算累积,指的是你可以给它提供一个二元算术方式,这个类按照你提供的方式进行算术计算,并保存计算结果。
LongAccumulator是标准的实现类,LongAdder是特化的实现类,它的功能等价于LongAccumulator((x, y) -> x+y, 0L)。它们的区别很简单,前者可以进行任何二元算术操作,后者只能进行加减两种算术操作。
Striped64使用了一个叫Cell的类,是一个普通的二元算术累积单元,线程通过hash取模操作映射到一个Cell上进行累积。为了加快取模运算效率,把Cell数组大小设置为2^n,同时大量使用Unsafe提供的底层操作。
// 很简单的一个类,这个类可以看成是一个简化的AtomicLong // 通过cas操作来更新value的值 // @sun.misc.Contended是一个高端的注解,代表使用缓存行填来避免伪共享,可以自己网上搜下,这个我就不细说了 @sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics Unsafe相关的初始化 private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }
尝试将这个值分割为多个cell(sum的时候求和),让这些竞争的线程只管更新自己所属的cell。因为在rehash之前,每个线程中存储的hashcode不会变,所以每次都应该会找到同一个cell,这样就将竞争压力分散了。
abstract class Striped64 extends Number { @sun.misc.Contended static final class Cell { ... } /** Number of CPUS, to place bound on table size */ static final int NCPU = Runtime.getRuntime().availableProcessors(); // cell数组,长度一样要是2^n,可以类比为jdk1.7的ConcurrentHashMap中的segments数组 transient volatile Cell[] cells; // 累积器的基本值,在两种情况下会使用: // 1、没有遇到并发的情况,直接使用base,速度更快; // 2、多线程并发初始化table数组时,必须要保证table数组只被初始化一次,因此只有一个线程能够竞争成功,这种情况下竞争失败的线程会尝试在base上进行一次累积操作 transient volatile long base; // 自旋标识,在对cells进行初始化,或者后续扩容时,需要通过CAS操作把此标识设置为1(busy,忙标识,相当于加锁),取消busy时可以直接使用cellsBusy = 0,相当于释放锁 transient volatile int cellsBusy; Striped64() { } // 使用CAS更新base的值 final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); } // 使用CAS将cells自旋标识更新为1 // 更新为0时可以不用CAS,直接使用cellsBusy就行 final boolean casCellsBusy() { return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1); } // 下面这两个方法是ThreadLocalRandom中的方法,不过因为包访问关系,这里又重新写一遍 // probe翻译过来是探测/探测器/探针这些,不好理解,它是ThreadLocalRandom里面的一个属性, // 不过并不影响对Striped64的理解,这里可以把它理解为线程本身的hash值 static final int getProbe() { return UNSAFE.getInt(Thread.currentThread(), PROBE); } // 相当于rehash,重新算一遍线程的hash值 static final int advanceProbe(int probe) { probe ^= probe << 13; // xorshift probe ^= probe >>> 17; probe ^= probe << 5; UNSAFE.putInt(Thread.currentThread(), PROBE, probe); return probe; } /** * 核心方法的实现,此方法建议在外部进行一次CAS操作(cell != null时尝试CAS更新base值,cells != null时,CAS更新hash值取模后对应的cell.value) * @param x the value 前面我说的二元运算中的第二个操作数,也就是外部提供的那个操作数 * @param fn the update function, or null for add (this convention avoids the need for an extra field or function in LongAdder). * 外部提供的二元算术操作,实例持有并且只能有一个,生命周期内保持不变,null代表LongAdder这种特殊但是最常用的情况,可以减少一次方法调用 * @param wasUncontended false if CAS failed before call 如果为false,表明调用者预先调用的一次CAS操作都失败了 */ final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; // 这个if相当于给线程生成一个非0的hash值 if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty 如果hash取模映射得到的Cell单元不是null,则为true,此值也可以看作是扩容意向,感觉这个更好理解 for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // cells已经被初始化了 if ((a = as[(n - 1) & h]) == null) { // hash取模映射得到的Cell单元还为null(为null表示还没有被使用) if (cellsBusy == 0) { // Try to attach new Cell 如果没有线程正在执行扩容 Cell r = new Cell(x); // Optimistically create 先创建新的累积单元 if (cellsBusy == 0 && casCellsBusy()) { // 尝试加锁 boolean created = false; try { // Recheck under lock 在有锁的情况下再检测一遍之前的判断 Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { // 考虑别的线程可能执行了扩容,这里重新赋值重新判断 rs[j] = r; // 对没有使用的Cell单元进行累积操作(第一次赋值相当于是累积上一个操作数,求和时再和base执行一次运算就得到实际的结果) created = true; } } finally { cellsBusy = 0; // 清空自旋标识,释放锁 } if (created) // 如果原本为null的Cell单元是由自己进行第一次累积操作,那么任务已经完成了,所以可以退出循环 break; continue; // Slot is now non-empty 不是自己进行第一次累积操作,重头再来 } } collide = false; // 执行这一句是因为cells被加锁了,不能往下继续执行第一次的赋值操作(第一次累积),所以还不能考虑扩容 } else if (!wasUncontended) // CAS already known to fail 前面一次CAS更新a.value(进行一次累积)的尝试已经失败了,说明已经发生了线程竞争 wasUncontended = true; // Continue after rehash 情况失败标识,后面去重新算一遍线程的hash值 else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // 尝试CAS更新a.value(进行一次累积) ------ 标记为分支A break; // 成功了就完成了累积任务,退出循环 else if (n >= NCPU || cells != as) // cell数组已经是最大的了,或者中途发生了扩容操作。因为NCPU不一定是2^n,所以这里用 >= collide = false; // At max size or stale 长度n是递增的,执行到了这个分支,说明n >= NCPU会永远为true,下面两个else if就永远不会被执行了,也就永远不会再进行扩容 // CPU能够并行的CAS操作的最大数量是它的核心数(CAS在x86中对应的指令是cmpxchg,多核需要通过锁缓存来保证整体原子性), // 当n >= NCPU时,再出现几个线程映射到同一个Cell导致CAS竞争的情况,那就真不关扩容的事了,完全是hash值的锅了 else if (!collide) // 映射到的Cell单元不是null,并且尝试对它进行累积时,CAS竞争失败了,这时候把扩容意向设置为true // 下一次循环如果还是跟这一次一样,说明竞争很严重,那么就真正扩容 collide = true; // 把扩容意向设置为true,只有这里才会给collide赋值为true,也只有执行了这一句,才可能执行后面一个else if进行扩容 else if (cellsBusy == 0 && casCellsBusy()) { // 最后再考虑扩容,能到这一步说明竞争很激烈,尝试加锁进行扩容 ------ 标记为分支B try { if (cells == as) { // Expand table unless stale 检查下是否被别的线程扩容了(CAS更新锁标识,处理不了ABA问题,这里再检查一遍) Cell[] rs = new Cell[n << 1]; // 执行2倍扩容 for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; // 释放锁 } collide = false; // 扩容意向为false continue; // Retry with expanded table 扩容后重头再来 } h = advanceProbe(h); // 重新给线程生成一个hash值,降低hash冲突,减少映射到同一个Cell导致CAS竞争的情况 } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // cells没有被加锁,并且它没有被初始化,那么就尝试对它进行加锁,加锁成功进入这个else if boolean init = false; try { // Initialize table if (cells == as) { // CAS避免不了ABA问题,这里再检测一次,如果还是null,或者空数组,那么就执行初始化 Cell[] rs = new Cell[2]; // 初始化时只创建两个单元 rs[h & 1] = new Cell(x); // 对其中一个单元进行累积操作,另一个不管,继续为null cells = rs; init = true; } } finally { cellsBusy = 0; // 清空自旋标识,释放锁 } if (init) // 如果某个原本为null的Cell单元是由自己进行第一次累积操作,那么任务已经完成了,所以可以退出循环 break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) // cells正在进行初始化时,尝试直接在base上进行累加操作 break; // Fall back on using base 直接在base上进行累积操作成功了,任务完成,可以退出循环了 } } // double的不讲,更long的逻辑基本上是一样的 final void doubleAccumulate(double x, DoubleBinaryOperator fn, boolean wasUncontended); // Unsafe mechanics Unsafe初始化 private static final sun.misc.Unsafe UNSAFE; private static final long BASE; private static final long CELLSBUSY; private static final long PROBE; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> sk = Striped64.class; BASE = UNSAFE.objectFieldOffset (sk.getDeclaredField("base")); CELLSBUSY = UNSAFE.objectFieldOffset (sk.getDeclaredField("cellsBusy")); Class<?> tk = Thread.class; PROBE = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomProbe")); } catch (Exception e) { throw new Error(e); } } }
分支A是用CAS更新对应的cell.value,是个写操作,分支B是进行扩容。
ConcurrentHashMap中,扩容和写操作是会严格处理的,在一个分段锁管辖区内,不会出现扩容和写操作并发:1.6和1.7的扩容操作都是在put内部执行的,put本身就会加锁,因此扩容进行时会阻塞对同一个Segment的写操作;1.8中扩容时,put/remove等方法如果碰见正在其他线程正在执行扩容,会去帮助扩容,扩容完成了之后才会去尝试加锁执行真正的写操作。
虽然B分支会进行”加锁“,但是A操作跟cellsBusy无关,”加锁“并不禁止A操作的执行。AB两个分支是不互斥的, 因此Striped64这里会出现A分支的写操作,和B分支扩容操作并发执行的情况。
A操作使用CAS更新Cell对象中的某个属性,并不改变数组持有的Cell对象的引用,扩容操作进行的是数组持有的Cell对象引用的复制,复制后引用指向的还是原来的那个Cell对象。
举个例子就是,旧的cell数组,叫作old,old[1] = cellA,cellA.value = 1,扩容后的新数组,叫作new,任然有new[1] = cellA。A分支实际上执行的是cellA.value = 2,无论分支A和B怎么并发执行,执行完成后新数组都能看到分支A对Cell的改变,扩容前后实际上数组持有的是同一群Cell对象。
这下就知道为什么不直接用long变量代替Cell对象了吧。long[]进行复制时,两个数组完完全全分离了,A分支直接作用在旧数组上,B分支扩容后,看不到串行复制执行后对旧数组同一位置的改变。举个例子就是,old[1]=10,A分支要把old[1]更新为11,这时候B分支已经复制到old[5]了,A分支执行完成后,B分支创建的新数组new[1]可能还是10(不管是多少,反正没记录A分支的操作),这样A分支的操作就被遗失了,程序会有问题。
public class LongAdder extends Striped64 implements Serializable { // 构造方法,什么也不做,直接使用默认值,base = 0, cells = null public LongAdder() { } // add方法,根据父类的longAccumulate方法的要求,这里要进行一次CAS操作 // (虽然这里有两个CAS,但是第一个CAS成功了就不会执行第二个,要执行第二个,第一个就被“短路”了不会被执行) // 在线程竞争不激烈时,这样做更快 public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } } public void increment() { add(1L); } public void decrement() { add(-1L); } // 返回累加的和,也就是“当前时刻”的计数值 // 此返回值可能不是绝对准确的,因为调用这个方法时还有其他线程可能正在进行计数累加, // 方法的返回时刻和调用时刻不是同一个点,在有并发的情况下,这个值只是近似准确的计数值 // 高并发时,除非全局加锁,否则得不到程序运行中某个时刻绝对准确的值,但是全局加锁在高并发情况下是下下策 // 在很多的并发场景中,计数操作并不是核心,这种情况下允许计数器的值出现一点偏差,此时可以使用LongAdder // 在必须依赖准确计数值的场景中,应该自己处理而不是使用通用的类 public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; } // 重置计数器,只应该在明确没有并发的情况下调用,可以用来避免重新new一个LongAdder public void reset() { Cell[] as = cells; Cell a; base = 0L; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) a.value = 0L; } } } // 相当于sum()后再调用reset() public long sumThenReset() { Cell[] as = cells; Cell a; long sum = base; base = 0L; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) { sum += a.value; a.value = 0L; } } } return sum; } // 其他的不说了 }