AtomicInteger是对int的一个封装,利用CAS保证了原子性操作。
它依赖Unsafe提供的一些底层能力进行底层操作:
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe(); private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value"); private volatile int value;
public final int getAndIncrement() { return U.getAndAddInt(this, VALUE, 1); }
getAndAddInt需要返回原始数组,所以需要添加失败重试逻辑。
public final int getAndAddInt(Object o, long offset, int delta) { int v; do { v = getIntVolatile(o, offset); } while (!weakCompareAndSetInt(o, offset, v, v + delta)); return v; }
CompareAndSet之类的函数,返回值就是成功与否。
public final boolean compareAndSet(int expectedValue, int newValue)
比AtomicLong等更加紧凑。
atomic包的LongAdder在高度竞争环境下,可能是比AtomicLong更佳的选择。尽管其本质是空间换时间。
Java9之后可以使用Variable Handle API:获取相应句柄,然后调用其CAS方法。
private static final VarHandle HANDLE = MethodHandles.lookup().findStaticVarHandle (AtomicBTreePartition.class, "lock"); private void acquireLock(){ long t = Thread.currentThread().getId(); while (!HANDLE.compareAndSet(this, 0L, t)){ // 等待一会儿,数据库操作可能比较慢 … } }
常见的失败重试机制,都隐含着竞争情况是短暂的的假设。大多数场景下确实是只发生一次就获得了成功,但是总有意外情况,所以在有需要的时候,需要考虑限制自旋的次数,以免过度消耗CPU。
Java提供了AtomicStampedReference工具类,通过为引用简历类似版本号的方式,保证CAS的正确性。
详细可以参考http://yizhanggou.top/aqs/ ,这里做下总结。
将基础的同步相关操作抽象在AbstractQueuedSynchronizer中,利用AQS为我们构建同步结构提供范本。
private volatile int state;
利用CAS去实现一个同步结构,至少要实现acquire和release,分别是获取资源的独占权,以及释放对这个资源的独占。
内部通过扩展AQS实现了Sync类型,以AQS的state来反映锁的持有情况。
private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { …}
acquire/release操作
public void lock() { sync.acquire(1); } public void unlock() { sync.release(1); }
acquire实现在AQS内部:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
首先看下tryAcquire,在ReentrantLock,tryAcquire逻辑是现在NonfairSync和FairSync中,而AQS内部tryAcquire仅仅是个接近为未实现的方法,需要实现者自己定义。
公平性:
public ReentrantLock() { sync = new NonfairSync(); // 默认是非公平的 } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
非公平的tryAcquire:在锁无人占有时,不检查是否有其他等待者。
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState();// 获取当前 AQS 内部状态量 if (c == 0) { // 0 表示无人占有,则直接用 CAS 修改状态位, if (compareAndSetState(0, acquires)) {// 不检查排队情况,直接争抢 setExclusiveOwnerThread(current); // 并设置当前线程独占锁 return true; } } else if (current == getExclusiveOwnerThread()) { // 即使状态不是 0,也可能当前线程是锁持有者,因为这是再入锁 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
如果tryAcquire失败,则进入排队竞争阶段:如果当前节点的前面是头结点,则试图获取锁,一切顺利则成为新的头结点。否则,有必要则等待。
final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) {// 循环 final Node p = node.predecessor();// 获取前一个节点 if (p == head && tryAcquire(arg)) { // 如果前一个节点是头结点,表示当前节点合适去 tryAcquire setHead(node); // acquire 成功,则设置新的头节点 p.next = null; // 将前面节点对当前节点的引用清空 return interrupted; } if (shouldParkAfterFailedAcquire(p, node)) // 检查是否失败后需要 park interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node);// 出现异常,取消 if (interrupted) selfInterrupt(); throw t; } }