Doug lea 在jdk8中给我们带来了比AtomicLong性能更优的LongAdder,AtomicLong的实现方式是内部有个value 变量,当多线程并发自增,自减时,均通过CAS 指令从机器指令级别操作保证并发的原子性。AtomicLong这种通过乐观锁的方式相对于其他机制已经是很高效了,难不成LongAdder使用了更高效的指令?
猜的再多也不如去看一眼源码。LongAdder的继承树如下:
继承自Striped64,这个类包装了一些很重要的内部类和操作。
LongAdder的方法:
通过其中的 add 方法源码来了解内部原理,其他API也是差不多的。
add 方法:
/**
* Adds the given value.
*
* @param x the value to add
*/
publicvoidadd(longx){
Cell[] as; long b, v; int m; Cell a;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
Cell 是在 Striped64 中的一个内部类, Cell 代表了一个单元,从字面上理解是这样的。
static final classCell{
volatile long value;
Cell(long x) { value = x; }
finalbooleancas(longcmp,longval){
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
Cell内部有一个非常重要的value变量,并且通过CAS来更新其值。
首次调用 add 方法时, (as = cells) != null 为true,看下 casBase 方法,其定义在 Striped64 中。
finalbooleancasBase(longcmp,longval){
return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}
BASE 是变量 base 在类中的偏移量, base 暂时认为就相当于”实际”存储的值。在这里如果cas更新成功了,则直接就returnl了。如果cas更新失败,说明有线程在竞争。此时AtomicLong则会死循环进行cas操作,而LongAdder使用了一种非常巧妙的思想。因为 as == null ,所以来看下接下去调用的 longAccumulate 方法:
/**
* Handles cases of updates involving initialization, resizing,
* creating new Cells, and/or contention. See above for
* explanation. This method suffers the usual non-modularity
* problems of optimistic retry code, relying on rechecked sets of
* reads.
*
* @param x the value
* @param fn the update function, or null for add (this convention
* avoids the need for an extra field or function in LongAdder).
* @param wasUncontended false if CAS failed before call
*/
finalvoidlongAccumulate(longx, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
//h索引到的cell为null,则生成cell对象并设置
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//a为索引到的cell,如果cas更新成功则结束循环
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
//扩容,size变成之前的2倍
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
//初始化Cell数组
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
局部变量h是和线程绑定的非0值,h用来定位到 Cell 数组的某一项。
LongAdder给了我们一个非常容易想到的解决方案:减少并发,将单一value的更新压力分担到多个value中去,降低单个value的 “热度”,分段更新!唯一会制约AtomicLong高效的原因是高并发,高并发意味着CAS的失败几率更高, 重试次数更多,越多线程重试,CAS失败几率又越高,变成恶性循环,AtomicLong效率降低。 Cell 数组中的每一项就是一个段,当我们需要得到值时,将所有cell中的数据和base相加就可以了。
/**
* Returns the current sum. The returned value is <em>NOT</em> an
* atomic snapshot; invocation in the absence of concurrent
* updates returns an accurate result, but concurrent updates that
* occur while the sum is being calculated might not be
* incorporated.
*
* @return the sum
*/
publiclongsum(){
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
理解了LongAdder的设计思想, longAccumulate 方法应该也很容易看懂。LongAdder在第一次cas失败的时候,并没有选择死循环,而是通过 以空间换时间 的方式提高了性能。低并发时LongAdder和AtomicLong性能差不多,高并发时LongAdder更高效!
因本人水平有限,若文章内容存在问题,恳请指出。允许转载,转载请在正文明显处注明原站地址以及原文地址,谢谢!