转载

从0学习java并发编程实战-读书笔记-构建自定义的同步工具(12)

类库包含了许多存在状态依赖的类,例如FutureTask、Semaphore和BlockingQueue等。在这些类的一些操作中有着基于状态的前提条件,例如,不能从一个空的队列中删除元素,或者获取一个尚未结束的任务的计算结果,在这些操作可以执行之前,必须等到队列进入“非空状态”,或者任务进入已完成状态。

创建状态依赖类的最简单方式就是在类库中现有的状态依赖类上进行构造,例如countDownLatch来提供阻塞等。但如果类库中没有提供你要的功能,那么你就需要使用类库提供的底层机制来构造自己的同步机制,包括 内置的条件队列显式的Condition对象 以及 AbstractQueuedSynchronizer框架

状态依赖的管理性

在单线程程序中调用一个方法时,如果某个基于状态的前提条件未得到满足(例如“连接池必须为非空”),那么这个条件将永远无法成真。因此在编写顺序程序中的类时,要使得这些类在它们的前置条件未被满足时就失败。

但在并发条件中,基于状态的条件可能会由于其他线程的操作而改变:因为某个线程将元素返还给了资源池,资源池从空的变为非空。

对于并发对象上的依赖状态的方法,虽然有时候在前提条件不满足的情况下不会失败,但通常有一种更好的选择,即等待前提条件变为真。

依赖状态的操作可以一直阻塞直到可以继续执行,这比使它们先失败再实现起来更为方便且不容易出错。内置的条件队列可以使线程一直阻塞,直到对象进入某个进程可以继续执行的状态,并且当被阻塞的线程可以执行时再唤醒它们。

可阻塞的依赖操作的结构

可阻塞的状态依赖操作的加锁模式有点不同,因为锁实在操作的执行过程中被释放与重新获取的。构成前提条件的状态变量必须由对象的锁来保护,从而使它们在测试前提条件的同时保持不变,否则前提条件就永远无法成真。在再次测试前提条件之前,必须重新获得锁。

acquire lock on object state
while(precondition does not hold){
    release lock
    wait unitl precondition might hold
    optionally fail if interrupted or timeout expires
    reacqurie lock
}
perform action 
    release lock

在有界缓存提供的put和take操作中都包含有一个前提条件: 不能从空缓存中获取元素,也不能将元素放入已满的缓存 。当前提条件未满足时,依赖状态的操作可以抛出一个异常或返回一个错误状态(使其成为调用者的一个问题),也可以保持阻塞直到对象进入正确的状态。

有界缓存实现的基类

public abstract class BaseBoundedBuffer<V>{
    private final V[] buf;
    private int tail;
    private int head;
    private int count;

    protected BaseBoundedBuffer(int capacity){
        this.buf = (V[])new Object
    }

    protected synchronized final void doPut(V v){
        buf[tail] = v;
        if(++tail == buf.length){
            tail = 0;
        }
        count++;
    }

    protected synchronized final V doTake(){
        V v = buf[head];
        buf[head] = null;
        if(++head == buf.length){
            head = 0;
        }
        --count;
        return v;
    }

    public synchronized final boolean isFull(){
        return count == buf.length;
    }

    public synchronized final boolean isEmpty(){
        return count == 0;
    }
}

将前提条件的失败传递给调用者

public class GrumpyBoundedBuffer<V> extends BaseBoundedBuffer<V>{
    public GrumpyBoundedBuffer(int size){
        super(size);
    }

    public synchronized void put(V v) throws BufferFullException{
        if(isFull()){
            throw new BufferFullException();
        }
        doPut(v);
    }

    public synchronized V take() throws BufferEmptyException{
        if(isEmpty()){
            throw new BufferEmptyException;
        }
        return doTake();
    }

}

put和take方法都进行了同步以确保实现对缓存状态的独占访问,因为这两个方法在在访问缓存的时候都采用了“先检查再运行”的逻辑。

尽管这种方法实现起来很简单,但是使用起来并非如此。“缓存已满”并不是有界缓存的一个异常条件,就像红灯并不代表交通信号灯出现了异常。在实现缓存时得到的简化(让调用者自行管理状态依赖性)并不能抵消在使用时的复杂度,因为调用者必须做好捕获异常的准备,并且在每次缓存时都需要重试(而且可能会使一些功能无法实现,例如FIFO)。

抛出的异常意味着:“对不起,请再试一次”,但这个并没有解决根本问题,调用者必须自行处理前提条件失败的情况。

通过轮询与休眠来实现简单的阻塞

如果缓存为空,那么take将休眠,直到另一个线程在缓存中放入一些数据。

如果缓存是满的,那么put将休眠,直到另一个线程从缓存中移除一些数据。

public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<V>{
    public SleepyBoundedBuffer(int size){
        super(size);
    }

    public void put(V v) throws InterruptedException{
        while(true){
            synchronized(this){
                if(!isFull()){
                    doPut(v);
                    return;
                }
            }
            Thread.sleep(SLEEP_GRANULARITY);
        }
    }

    public V take() throws InterruptedException{
        while(true){
            synchronized(this){
                if(!isEmpty()){
                    return doTake();
                }
            }
            Thread.sleep(SLEEP_GRANULARITY);
        }
    }
}

缓存代码必须在持有缓存锁的时候才能测试相应的状态条件,因为表示状态条件的变量是由缓存锁保护的。如果测试失败,那么当前执行的线程将首先释放锁,并且休眠一段时间,从而使其他线程能够访问缓存。当线程醒来时,它将重新请求锁并再次尝试执行操作。线程就将反复的在休眠以及测试条件状态中切换,直到可以操作为止。

从调用者的角度来看,这个方法能很好的运行:如果这个操作可以执行,则立即执行,否则就阻塞,调用者无需关心处理失败和重试。要选择合适的休眠时间,如果休眠时间间隔小,那么响应性就高,但是消耗的CPU资源更大。如果休眠时间长了,可能存在已经符合条件,但是却长时间未重试的情况。

SleepyBoundedBuffer对调用者提出了一个新的需求:处理InterruptedException。当方法由于等待某个条件而阻塞时,需要提供一个可中断机制。

条件队列

条件队列 这个名字源于:它使得一组线程(称之为等待线程集合)能够通过某种方式来等待特定的条件变成真。所以条件队列中的元素是一个个正在等待相关条件的线程。

正如每个java对象都可以作为一个锁,每个对象同样可以作为一个条件队列,并且Object中的wait、notify和notifyAll方法就构成了内部条件队列的API。

对象的内置锁与内部条件队列是相互关联的, 要调用对象X中条件队列的任何一个方法,必须持有对象X上的锁 。这是因为 等待由状态构成的条件维护状态一致性 这两种机制必须被紧密的绑定在一起: 只有对状态进行检查时,才能在某个条件上等待,并且只有能修改状态时,才能从条件等待中释放另一个线程。

Object.wait 会自动释放锁,并请求操作系统挂起当前线程,从而使其他线程能获得这个锁并修改对象的状态。当被挂起的操作线程醒来时,它将在返回之前重新获取锁。从直观上理解:

  • 调用wait意味着:我要去休息了,但发生特定的事时唤醒我。
  • 而调用通知方法就意味着:特定的事发生了。

使用条件队列实现的有界缓存

public class BoundedBuffer<V> extends BaseBoundedBuffer<V> {
    public BoundedBuffer(int size){
        super(size);
    }
    // 阻塞直到队列不为满(not full)
    public synchronized void put(V v) throws InterruptedException {
        while(isFull()){
            wait();
        }
        doPut(V);
        notifyAll();
    }

    // 阻塞直到队列不为空(not empty)
    public synchronized V take() throws InterruptedException{
        while(isEmpty()){
            wait();
        }
        V v = doTake();
        notifyAll();
        return v;
    }
}

使用wait&notifyAll比使用休眠更加高效,当缓存状态没有发生变化的时,线程醒来的次数将更少,响应性也更高(当发生特定状态变化的时候,会通知到所有wait的线程,并立即醒来)。和休眠相比,条件队列并没有改变原来的语义。它只是在多个方面进行了优化:CPU效率,上下文切换开销,响应性等。条件队列在表达和管理状态依赖性时更加简单和高效。

如果某个功能无法通过“轮询和休眠”来实现,那么它也无法通过条件队列来实现

使用条件队列

条件队列使构建高效以及高可响应性的状态依赖类变得更容易,但是同时也容易被不正确地使用。虽然许多规则都能确保正确的使用条件队列,但在编译器或系统平台上却没有强制要求遵守这些规则。(这也就是为什么要尽量基于LinkedBlockingQueue、Latch、Semaphore和FutureTask等来构建程序的原因之一,如果能避免使用条件队列,那么实现起来将容易许多)

条件谓词

要想正确的使用条件队列,关键是找出对象在那个条件谓词上等待。在API中并没有对条件谓词进行实例化的方法,并且在java语言规范或JVM实现中也没有任何信息可以确保正确的使用它们。实际上,在java语言规范或Javadoc中根本没有提到它。 但如果没有条件谓词,条件等待机制将无法发挥作用。

条件谓词是 使某个操作成为状态依赖操作 的前提条件。

在有界缓存中,只有当缓存不为空时,take方法才能执行,否则必须等待。对take来说,它的条件谓词就是“缓存不为空”,在take方法执行之前必须首先测试该条件谓词。

将与条件队列相关联的条件谓词以及在这些条件谓词上等待的操作都写入文档

在条件等待中存在一种重要的三元关系,包括加锁、wait方法和一个条件谓词。在条件谓词中包含多个状态变量,而状态变量由一个锁来保护,因此在测试条件谓词之前必须先持有这个锁。锁对象与条件队列对象(即调用notify和wait方法所在的对象)必须是同一个对象。

在BoundedBuffer中,缓存的状态是由缓存锁保护的,并且缓存对象被用作条件队列。take对象将获取请求缓存锁,然后对条件谓词(即缓存为非空)进行测试:

  • 如果缓存非空,那么它会移除第一个元素。之所以能这样做,是因为take此时仍然持有保护缓存状态的锁。
  • 如果条件谓词不为真(缓存为空),那么take必须等待并直到另一个线程在缓存中放入一个对象。take将在缓存的内置条件队列上调用wait方法,这需要持有条件队列对象上的锁。这是一种谨慎的设计,因为take方法已经持有在测试条件谓词时需要的锁。wait方法将释放锁,阻塞当前线程,并等待直到超时,然后线程被中断或者通过一个通知被唤醒。在唤醒进程后,wait在返回前还要重新获取锁。 当线程从wait方法中被唤醒时,它在重新请求锁时不具有任何特殊的优先级 ,而要与任何其他尝试进入同步代码块的线程一起正常地在锁上竞争。

每次wait调用都会隐式地与特定的条件谓词关联起来。当调用某个特定条件谓词的wait时,调用者必须已经持有与条件队列相关的锁,并且这个锁必须保护着构成条件谓词的状态变量。

过早唤醒

虽然在锁、条件谓词和条件队列之间的三元关系并不复杂,但wait方法的返回并不一定意味着线程正在等待的条件谓词变成真了。

内置条件队列可以与多个条件谓词一起使用。当一个线程由于调用notifyAll而醒来时,并不意味着该线程正在等待的条件谓词已经成真了。另外,wait方法还可以“假装”返回,而不是由于某个线程调用了notify。

当执行控制重新进入调用wait的代码时,它已经重新获取了与条件队列相关联的锁。在发出通知的线程调用notifyAll的时候,条件谓词可能变成真, 但是当线程重新获取到锁的时候,条件谓词可能已经变成假的。在线程被唤醒到wait重新获取到锁的这段时间里,可能有其他线程已经获取了这个锁,并修改了对象的状态。或者也许根本就没有变成真,也许别的线程因为别的什么原因调用了notify或者notifyAll,可能是同一条件队列的别的条件变成了真。

一个条件队列与多个条件谓词相关 这个情况很 常见 ,例如一个条件队列中有“非满”和“非空”两个谓词。

基于上诉的这些原因,每当线程从wait中唤醒时,都必须再次测试条件谓词,如果条件谓词不为真,那么就继续等待(或者失败),由于线程在条件谓词不为真的情况下也可以反复地醒来,因此必须在一个循环中调用wait,并在每次迭代中都测试条件谓词。

状态依赖方法的标准形式

void stateDependentMethod() throws InterruptedException{
    // 必须通过一个锁来保护条件谓词
    synchronized(lock){
        while(!conditionPredicate){
            lock.wait();
        }
    }
}

当使用条件等待时候

  • 通常都有一个条件谓词,包括一些对象状态的测试,线程在执行前必须首先通过这些测试。
  • 在调用wait之前测试条件谓词,并且从wait中返回时再次进行测试。
  • 在一个循环中调用wait
  • 确保使用与条件队列相关的锁来保护构成条件谓词的各个状态变量。
  • 当调用wait、notify或notifyAll等方法时,一定要持有与条件队列相关的锁。
  • 在检查条件谓词之后以及开始执行相应的操作之前,不要释放锁。

丢失的信号

之前讨论过活跃性问题,例如死锁和活锁。另一种形式的活跃性问题是丢失的信号。丢失的信号是指:线程必须等待一个已经为真的条件,但在开始等待之前没有检查条件谓词。现在,线程将等待一个已经发过的事件(简单的理解即为等待的线程由于某些原因错过了notify通知,但是条件队列并不知道你是否接收到通知)。

通知

在有界缓存中,如果缓存为空,那么在调用take时将阻塞。在缓存变为非空时,为了使take解除阻塞,必须确保在每条使缓存变为非空的代码路径中都发出一条通知。在BoundedBuffer中,只有一条代码路径,即在put之后。因此,put在成功地将一个元素添加到缓存后,将调用notifyAll。同样,take在移除一个元素后也将调用notifyAll,向任何正在等待“不为满”条件的线程发出通知。

每当在等待一个条件时,一定要确保在条件谓语变成真时通过某种方式发出通知。

在条件队列API中有两个发出通知的方法,即notify和notifyAll。无论调用哪一个,都必须持有与条件队列对象相关联的锁。在调用notify时,JVM会在这个条件队列上等待的多个线程中选择一个来唤醒,而notifyAll会唤醒所有这个条件队列上等待的线程。

由于多个线程可以基于不同的条件谓词在同一个条件队列上等待,因此如果使用notify而不是notifyAll,那么将是一种危险操作,因为单一的通知很容易导致类似于信号丢失问题。

只有同时满足以下两个条件,才能使用单一的notify而不是notifyAll

  • 所有等待线程的类型都相同 :即一个条件队列上只有一个条件谓词时
  • 单进单出 :在条件变量上的每次通知,最多只能唤醒一个线程来操作

由于大多数类并不满足这种需求,因此普遍认可的做法是 优先使用notifyAll而不是notify ,虽然notifyAll可能比notify更低效,但却容易保证类的行为是正确的。

在BoundedBuffer的put和take方法中采用的通知机制是保守的:每当将一个对象放入缓存,或者将一个对象从缓存中移除,就执行一次通知。我们可以对其进行优化:

  • 首先,仅当缓存从空变为非空,或者从满转为非满时,才需要释放一个线程。并且,仅当put或take影响到这些状态转换时,才发出通知。这也被称为 条件通知(Condition Notification) 。虽然条件通知可以提升性能,但却很难正确地实现,而且还会使子类的实现变得复杂。因此在使用时应该谨慎。
public synchronized void put(V v) throws InterruptedException {
    while(isFull()){
        wait();
    }
    boolean wasEmpty = isEmpty();
    doPut(v);
    if(wasEmpty){
        notifyAll();
    }
}

子类安全问题

在使用条件通知或单次通知时,一些约束条件使得子类化过程变得复杂。要想支持子类化,那么在设计类时需要保证:如果在实施子类话时违背了条件通知或单次通知的某个需求,那么在子类中可以增加合适的通知机制来代表基类。

对于状态依赖的类,要么将其等待和通知等协议完全向子类公开,要么完全阻止子类参与到等待和通知等过程中。(这是对“要么围绕着继承来设计和文档化,要么禁止使用继承”这条规则的拓展)。当设计一个可被继承的依赖类时,至少需要公开条件队列和锁。

封装条件队列

通常,我们应该把条件队列封装起来,因而出了使用条件队列的类,就不能在其他地方访问它。否则,调用者会自以为理解了在等待和通知上使用的协议,并且采用一种违背设计的方式来使用条件队列。

不过这条建议,将条件队列封装起来,与线程安全类的最常见设计模式不一致。这种模式建议使用对象的内置锁来保护自身状态。即缓存对象自身既是锁,又是条件队列。然而,可以很容易将BoundedBuffer重新设计为使用私有的锁对象和条件队列,唯一不同之处在于,新的BoundedBuffer不再支持任何形式的客户端加锁。

显式的Condition对象

public interface Condition{
    void await() throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    void awaitUninterruptibly();
    boolean awaitUnitl(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
}

内置条件队列存在一些缺陷。每个内置锁都只能有一个相关联的条件队列,因而在想BoundedBuffer这种类中,多个线程可能在同一个条件队列上等待不同的条件谓词,并且在最常见的加锁模式下公开条件队列对象。如果想编写一个带有多个条件谓词的并发对象,或者想获得出了条件队列可见性之外的更多控制权,就可以使用显式的Lock和Condition而不是内置锁和条件队列,这是一种更灵活的选择。

一个Condition和一个Lock关联在一起,就像一个条件队列和一个内置锁关联一样。要创建一个Condition,可以在相关联的Lock上调用Lock.newCondition方法。Condition可以比内置条件队列提供更丰富的功能:在每个锁上可以存在多个等待、条件队列可以是中断的或不可中断的、基于时限的等待、以及公平和非公平的队列操作。

与内置锁条件队列不同的是, 对于每个Lock,可以有任意的Condition对象Condition对象继承了相关的Lock的公平性 ,对于公平的锁,线程会依照FIFO顺序从Condition.await中释放。

特别注意:在Condition对象中,与wait、notify、notifyAll相对应的方法分别是await、signal和signalAll。但是Condition也是一个Object,所以它也保护wait、notify、notifyAll方法。一定要使用正确的方法————signal和await。

使用显式条件变量的有界缓存

public class ConditionBoundedBuffer<T> {
    protect final Lock lock = new ReentrantLock();
    // 条件谓词:not Full
    private final Condition notFull = lock.newCondition();
    // 条件谓词:not Empty
    private final Condition notEmpty = lock.newCondition();
    private final T[] items = (T[]) new Object[BUFFER_SIZE];
    private int tail, head, count;

    //阻塞直到:notFull
    public void put(T x) throws InterruptedException{
        lock.lock();
        try{
            while(count == items.length){
                notFull.await();
            }
            items[tail] = x;
            if(++tail == items.length){
                tail = 0;
            }
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    // 阻塞直到:notEmpty
    public T take() throws InterruptedException{
        lock.lock();
        try{
            while(count == 0){
                notEmpty.await();
            }
            T x = itmes[head];
            items[head] = null;
            if(++head == items.length){
                head = 0;
            }
            --count;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

在分析上,使用多个Condition的类时,比使用单一的内部队列加多个条件谓词的类简单得多。通常将两个条件谓词分开并放到两个等待线程集中,Condition使其更容易满足单次通知的需求。signal比signalAll更高效,它能极大的减少在每次缓存操作中发生的上下文切换与锁请求的次数。

当使用显式的Lock和Condition时,必须满足锁、条件谓词和条件变量之间的三元关系。条件谓词中包含的变量必须由Lock来保护,并且在检查条件谓词和调用await和signal时,必须持有Lock对象。

Synchronizer剖析

在ReentrantLock 和Semaphore这两个接口之间存在许多的共同点。这两个类都可以用作一个“阀门”,即每次只允许一定数量的线程通过,并当线程到达阀门时,可以通过(在调用lcok或acquire时成功返回),也可以等待(在调用lock或者acquire时阻塞),还可以取消(在调用tryLock和tryAcquire时返回false,表示在指定的时间内锁是不可用或者无法获得许可),而且这两个接口都是支持可中断、不可中断、以及限时的获取操作,也支持等待线程执行公平或非公平的队列操作。

尝试利用Lock实现信号量(并非Semaphore的真实实现方式)

public class SemaphoreOnLock{
    private final Lock lock = new ReentrantLock();
    // 条件谓词: permitsAvailable(permits > 0)
    private final Condition permitsAvailable = lock.newCondition();
    private int permits;

    SemaphoreOnLock(int initialPermits){
        lock.lock();
        try{
            permits = initialPermits;
        }finally{
            lock.unlock();
        }
    }

    //阻塞并直到:permitsAvilable
    public void acquire() throws InterruptedException{
        lock.lock();
        try{
            while(permits <= 0){
                permitsAvailable.await();
            }
            --permits;
        }finally {
            lock.unlock();
        }
    }

    public void release(){
        lock.lock();
        try{
            ++permits;
            permitsAvailable.signal();
        } finally {
            lock.unlock();
        }
    }
}

事实上,它们在实现时使用了一个共同的基类,即 AbstractQueuedSynchronizer(AQS) ,这个类也是其他很多同步类的基类。

AbstractQueuedSynchronizer

AQS是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效的构造出来。不仅 ReentrantLockSemaphore 是基于AQS构建的,还包括 CountDownLatch , ReentrantReadWriteLock , SychronousQueueFutureTask

AQS解决了在实现同步类涉及的大量细节问题,例如等待队列采用FIFO队列顺序等,在不同的同步器中还可以定义一些灵活的标准来判断某个某个线程是应该通过还是等待。

基于AQS来构建同步器能带来许多好处,它不仅能极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题(这是没有使用AQS来构建同步器的情况)。

大多数开发者并不会直接使用AQS,标准同步器类的集合能满足绝大多数情况的需求。但如果能了解标准同步器类的实现方式,那么对理解它们的工作原理是非常有帮助的。

在基于AQS构建的同步器类中,最基本的操作包括 各种形式的获取操作和释放操作获取操作 是一种依赖状态的操作,并且通常会阻塞。

  • 当使用 信号量 时,“获取”操作的含义就很直观,即获取的是锁或者许可,并且调用者可能会一直等待直到同步器类处于可被获取的状态。
  • 在使用 CoundownLatch 时,“获取”操作意味着 等待并直到闭锁到达结束状态
  • 而在使用 FutureTask 时,“获取”操作意味着 等待并直到任务已完成

释放 并不是一个可阻塞的操作,当执行“释放”操作的时候买所有在请求时被阻塞的线程都会开始执行。

如果一个类想成为状态依赖的类,那么它必须拥有一些状态。AQS负责管理同步器类中的状态,它管理了一个整数状态信息,可以通过 getState , setState , compareAndSetState 等protect类型方法进行操作。这个整数可以用于表示任何状态,例如:

ReentrantLock
Semaphore
FutureTask

在同步器类中还可以自行管理一些额外的状态变量,例如ReentrantLock保存了锁的当前所有者信息,这样就能区分某个获取操作是重入还是竞争。

AQS获取操作和释放操作的标准形式

boolean acquire() throws InterrupedException {
    while(当前状态不允许获取操作){
        if(需要阻塞获取请求){
            如果当前线程不再队列中,则将其插入队列
            阻塞当前线程
        }else{
            返回失败
        }
    }
    可能更新同步器状态
    如果线程位于队列中,则将其移出队列
    返回成功

    void release(){
        更新同步器的状态
        if(新的状态允许某个被阻塞的线程获取成功){
            解除队列中一个或多个线程的阻塞状态
        }
    }
}

根据同步器的不同,获取操作可以是一种独占操作(例如ReentrantLock),也可以是一种非独占操作(例如Semaphore和CountDownLatch)。一个操作包括两部分:

  1. 同步器判断当前状态是否允许获得操作,如果是,则允许线程执行;否则就将获取操作阻塞或失败。对于锁来说,如果它没有被某个线程持有,那么就可以获取。对于闭锁来说,如果它没有被某个线程持有,那么就能被获取。
  2. 更新同步器状态,获取同步器的某个线程可能会对其他线程能否获取该同步器造成影响。

如果某个同步器支持独占的获取操作,那么需要实现一些保护方法,包括 tryAcquiretryReleaseisHeldExclusively 等。而对于支持共享获取的同步器,则应该实现 tryAccquireSharedtryReleaseShared 等方法。AQS中的acquire、aquireShared、release和releaseShared等方法都将调用这些方法在子类中带有前缀try的版本来判断某个操作是否能执行。在同步器的子类中,可以根据其获取操作和释放操作的语义,使用 getState , setState , compareAndSetState 来检查和更新状态,并通过返回的状态值来告知基类 获取释放 同步器的操作是否成功。

一个简单的闭锁

使用AQS实现的二元闭锁

public class OneShotLatch {
    private final Sync sync = new Sync();

    public void signal(){
        sync.releaseShared(0);
    }

    public await() throws InterruptedException{
        sync.acquireSharedInterruptibly();
    }

    private class Sync extends AbstractQueueSynchronizer {
        protected int tryAcquireShared(int ignored){
            // 如果闭锁是开的(state == 1)那么这个操作将成功,否则失败
            return (getState == 1)? 1 : -1;
        }

        protected boolean tryReleaseShared(int ignored){
            setState(1); // 打开闭锁
            return true; // 现在其他线程可以获取到该闭锁。
        }
    }
}

包含有两个共有方法:await和signal,分别对应获取操作和释放操作。起初,闭锁是关闭的,任何调用await的线程都将阻塞并直到闭锁被打开。当通过调用signal打开闭锁时,所有等待中的线程都将被释放,并且随后到达闭锁的线程也被允许执行。

其中AQS状态用来表示闭锁状态 -- 关闭(0)或者打开(1)。await方法调用AQS的 acquireSharedInterruptibly() ,然后调用OneShotLatch中的tryAcquireShared方法。在tryAcquireShared的实现中必须返回一个值来表示该获取操作能否执行。如果之前已经打开了闭锁,那么tryAcquireShared将返回成功并允许线程通过,否者就会返回一个表示获取操作失败的值。

acquireSharedInterruptibly 方法在处理失败的时候,是把这个线程放入等待线程队列中。类似地, signal 将调用 releaseShared ,接下来又会调用 tryReleaseShared 。在tryReleaseShared中将无条件地把闭锁的状态设置为打开,(通过返回值)表示该同步器处于完全被释放的状态。因而AQS让所有等待中的线程都尝试重新请求该同步器,并且由于tryAcquireShared将返回成功,因此现在所有请求操作将成功。

java.util.concurrent同步器类中的AQS

java.util.concurrent中有许多可阻塞类,例如

ReentrantLock
Semaphore
ReentrantReadWriteLock
CountDownLatch
SynchronousQueue
FutureTask

ReentrantLcok

ReentrantLock 只支持独占方式的获取操作,因此它实现了 tryAcquiretryReleaseisHeldExclusively 。ReentrantLock将同步状态用于保持锁获取的次数,并且还维护了一个owner变量来保存当前所有者线程的标识符,只有在当前线程刚刚获得到锁,或者正要释放锁的时候,才会修改这个变量。在tryRelease中检查owner域,从而确保当前线程在执行unlock操作之前已经获取了锁:在tryAcquire中将用这个域来区分获取操作是重入的还是竞争的。

protected boolean tryAcquire(int ignored){
    final Thread current = Thread.cuurentThread();
    int c = getState();
    if(c == 0){
        if(compareAndSetState(0,1)){
            owner = current;
            return true;
        }
    } else if(current == owner){
        setState(c+1);
        return true;
    }
}

当一个线程尝试获取锁时,tryAcquire将首先检查锁的状态,如果锁未被持有,那么它将尝试更新锁的状态以表示锁已经被持有(compareAndSetState是原子的比较后更新),来表示这个锁已经被占有。如果锁状态表明它已经被持有,并且如果当前线程是锁的持有者,那么获取计数递增。如果当前线程不是锁的持有者,那么获取失败。

ReentrantLock还利用AQS对多个条件变量和多个等待线程集的内置支持。 Lock.newCondition将返回一个新的ConditionObject实例,这是AQS的一个内部类。

Semaphore与CountDownLatch

Semaphore将AQS的同步状态用于保存当前可用许可的数量。tryAcquireShared方法首先计算剩余许可的数量,如果没有足够许可,那么会返回一个值表示获取操作失败。如果还有剩余的许可,那么tryAcquireShared会通过compareAndSetState以原子的方式来降低许可数量。如果操作成功,那么将返回一个值表示操作成功,在返回值中还包含了表示其他共享获取操作能否成功的信息。如果成功,那么其他等待的线程同样会解除阻塞。

protected int tryAcquireShared(int acquires){
    while(true){
        int available = getState();
        int remaining = available - acquires;
        if(remaining < 0 || compareAndSetState(available,remaining)){
            return remaining;
        }
    }
}

protected boolean tryReleaseShared(int releases){
    while(true){
        int p = getState();
        if(compareAndSetState(p, p + releases)){
            return true;
        }
    }
}

当没有足够的许可,或者当tryAcquireShared可以通过原子方式来更新许可的计数以响应获取操作时,while循环将终止。虽然对compareAndSetState的调用可能由于与另一个线程发生竞争而失败,并使其重新尝试,但在经过了一定数量的重试操作后,这两个结束条件就会有一个变为真。同样,tryReleaseShared将增加许可计数,这就能解除某些线程中的阻塞状态,并且不断重试直到更新操作变为成功。

CountDownLatch使用AQS的方式与Semaphore很相似:在同步状态保存的是当前的计数值。countDown方法调用release,从而导致计数值递减,并且计数值为0时,解除所有等待线程的阻塞。await调用acquire,当计数器为零时,acquire将立即返回,否则将阻塞。

FutureTask

初看上去,FutureTask并不像一个同步器,但是Future.get的语义非常类似于闭锁的语义 -- 如果发生了某个事件(由FutureTask表示的任务执行完成或是被取消),那么线程就可以恢复执行,否则这些线程将停留在队列中,并直到该事件发生。

在FutureTask中,AQS同步状态被用来保存任务的状态,例如:正在运行、已完成、已取消。FutureTask还维护了一些额外的状态变量,用来保存计算结果或抛出的异常,此外它还维护了一个引用,指向正在执行计算任务的线程(如果它当前处于运行状态),因而如果任务取消,该线程就会中断。

ReentrantReadWriteLock

ReadWriteLock接口表示存在两个锁:一个读取锁,一个写入锁,但在基于AQS实现的 ReentrantReadWriteLock 中,单个AQS子类将同时管理读取加锁和写入加锁。 ReetrantReadWriteLock使用了一个16位的状态来表示写入锁的计数,并且使用了另一个16位状态来表示读取锁的计数。 在读取锁上的操作将使用共享的获取方法和释放方法,在写入锁上的操作将使用独占的获取方法和释放方法。

AQS在内部维护了一个等待线程队列,其中记录了某个线程请求的是独占访问还是共享访问。

在ReentrantReadWriteLock中,当锁可用时:

  • 如果位于队列头部的线程执行写入操作,那么线程会得到这个锁;
  • 如果位于队列头部的线程执行读取访问,那么队列中在第一个写入线程之前的所有线程都将获得这个锁。

小结

要实现一个依赖状态的类 -- 如果没有满足依赖状态的前提条件,那么这个类的方法必须阻塞,那么最好的方式是基于现有的库类来构建。然而有时候现有的库类不能提供足够的功能,在这种条件下,可以使用内置的条件队列、显式的Condition对象、或者AbstractQueuedSynchronizer来构建自己的同步类。

内置条件队列与内置锁是紧密绑定在一起的,因为管理状态依赖性的机制必须与确保状态一致性的机制关联起来。同样,显式Condition与显式的Lock也是紧密绑定到一起的,并且与内置的条件队列相比,还提供了拓展的功能集,包括:每个锁对应多个等待线程集(多个条件队列),可中断或不可中断的条件等待,公平或者非公平的队列操作,以及基于时限的等待。

原文  https://segmentfault.com/a/1190000022401461
正文到此结束
Loading...