转载

自旋锁的优化

当锁要保护的代码块执行很快的时候,并且争抢不是非常激烈的时候,自旋锁的比之重量级的需要切换上下文的互斥锁能带来更好的性能表现。此外,自旋锁的提出主要也是为了解决多核系统的同步问题。对于非多核系统,用户态的自旋锁空耗 CPU,反而降低了整个系统的 progress,作用反而不大。

此外,当保护的代码执行较为耗时,并且自旋锁的争抢非常激烈的时候,自旋锁本身就消耗了大量无谓的 CPU ,这种情况下还不如使用互斥锁,让出 CPU 给任务执行。

我在前面一篇介绍 CPU 高速缓存的博客,举了个例子用 AtomicInteger 的 compareAndSet 实现一个自旋锁,主要是为了演示在 SMP 下通过增加一个内循环,来减少锁状态 state 在多个 CPU 之间的『颠簸』。

但是其实这个例子如果用 synchronized 或者 ReentrantLock 改写,都会快得多。比如换成 ReentrantLock,

private ReentrantLock lock =  new ReentrantLock();

/**
 * 递增 count,使用 ReentrantLock 保护 count
 */
public void incr() {
    lock.lock();
    count = count + 1;
    lock.unlock();
}

运行时间下降到了 150~230 毫秒,比之原来测试的两个版本快了一个数量级。

原因就在于递增 +1 这个操作非常快,导致自旋锁版本的代码线程争抢锁非常激烈,大家抢的头破血流,但是任务却没有多大进展,空耗了大量 CPU。就好像一堆人抢着去上卫生间,大家都不排队,你挤我抢,反而堵在了门口,卫生间的实际利用率下降了。

自旋锁的优化有一些思路:

首先是指数退让(Exponential Back off),每个线程在争抢锁之前等待一小会,通常第一次不退让,但是之后就按照指数时间递增这个退让时间,我们修改下原来的 BusyLock 例子来演示这个优化:

 //最大退让时间 10 毫秒
 private static final int max_backoff_msg=10;

     /**
     * 利用 CAS 实现自旋锁
     */
    private void lock() {
        //其实退让时间 1 毫秒。
        int backoff=1;
        while (!state.compareAndSet(0, 1)){
            while (state.get() == 1)
                ;
            //不是第一次,退让一下
            if(backoff>1){
                try{
                    Thread.sleep(backoff);
                }catch(Exception InterruptedException){
                    Thread.currentThread().interrupt();
                }
            }
            //还没有超过最大退让时间,指数增加退让时间。
            if(backoff<max_backoff_msg){
                backoff*=2;
            }
        }
    }

通过引入一个退让机制,我们重新跑下原来的测试,整体执行时间下降到了 120~200 毫秒了,这就跟 ReentrantLock 差不了太多了。

但是呢,是否比 ReentrantLock 的实现更好? 肯定不是。首先没有处理可重入锁,也不支持 tryLock 超时、取消等;其次,用的 sleep 做退让,最小单位是毫秒,并且 sleep 精度无法保证。另外,没有任何公平性可言,等待最久的线程可能退让最多,新来的很可能先抢到锁等等。

这就引出了另一个优化思路:排队(Queuing Lock)。这其实就是 ReentrantLock 的实现方式。ReentrantLock 有两种模式:公平和非公平,通常情况都推荐你使用非公平版本,也就是默认的。对于非公平版本的 lock 实现如下:

    * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

先尝试用 CAS 获取锁,如果成功了,那就设置独占线程为自己,如果失败,进入 AQS 框架 的 aquire 方法:

   public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

aquire 里同样先尝试下能不能获取,不行就去调用 addWaiteracquireQueued 去尝试排队,如果还是不行,就中断取消。

排队过程也是一个 lock-free 的循环过程。 acquireQueued 仍然会去自旋尝试获取自旋锁,如果仍然继续失败,就调用 park 让出 CPU 不再参与调度,等锁被释放的时候被前驱节点的线程(释放锁的线程)唤醒再次参与争抢。最后,通过引入队列禁止抢占,也可以支持严格的公平锁了。

关于 AQS 和 ReentrantLock 源码解析, 网上资料 多如牛毛,这里也不重复了。题外话:单纯看一个东西源码,只能了解一个实现是什么样的,但是不知道为什么这样实现,有时候需要换个角度去思考,并且阅读一些相关方面的资料,才能理解作者为何如此设计等等。

原文  http://blog.fnil.net/blog/1df8c71d5019f4ca48c19b1707174897/
正文到此结束
Loading...