转载

从0学习java并发编程实战-读书笔记-基础构建模块(4)

同步容器类

同步容器类包括Vector和Hashtable,这两个是早期JDK的一部分。此外还包括在JDK1.2中添加Collections.synchronizedXxx等工厂方法,这些类实现线程安全的方式是:将它们的状态封装起来,并对公有方法进行同步,使得每次只有一个线程能访问容器的状态。

同步容器类的问题

同步容器类都是线程安全的,但是某些 复合操作 需要额外的客户端加锁来保护,常见的复合操作:

  • 迭代:反复访问元素,直到遍历完容器中的所有元素
  • 跳转:根据指定顺序找到当前元素的下一个元素
  • 条件运算:例如“若没有就添加”

由于同步容器类要遵守同步策略,即支持客户端加锁,因此可能会创造一些新的操作,只要我们知道应该使用哪一锁,那么这些新操作就与其他操作一样都是原子操作。

for (int i = 0; i < vector.size();i++){
    doSomething(vector.get(i))
 }

一个线程访问,一个线程删除,在这两个线程交替执行的时候,代码就可能出现问题,将抛出ArrayIndexOutOfBoundsException。

解决方式是将怎么for循环加锁,锁为vector对象。

迭代器与ConcurrentModificationException

不论是直接迭代还是jdk5引入的for-each语法,对容器的标准访问方式就是使用 Iterator(迭代器) ,但是Iterator并没有考虑到并发修改的问题,迭代器使用的策略是 快速失败(fail-fast) 。当迭代器发现容器在迭代过程被修改了,就会抛出一个ConcurrentModificationException异常。

这种快速失败机制并不算是完备的处理机制,只是捕获了可能会出现并发错误,只能作为一个并发问题预警指示器。要想避免ConcurrentModificationException异常,就必须在迭代过程持有容器的锁。

隐藏迭代器

在某些情况下,迭代器会隐藏起来。例如 javaSystem.out.println("iterm:"+set); ,编译器将字符串的连接操作转换为调用 StringBuilder.append(Object) ,而这个方法将会调用容器的 toString 方法,标准容器的toString方法将迭代容器。

正如封装对象的状态有助于维持不变性条件一样,封装对象的同步机制同样有助于确保实施同步策略。

并发容器

同步容器通过对所有容器状态的访问都串行化,以实现它们的线程安全性。当然这种办法的代价是严重降低并发性,当多个线程竞争容器的锁时,吞吐量将严重降低。

通过并发容器来代替同步容器,可以极大的提高伸缩性并降低风险。

  • JDK5中增加了 ConcurrentHashMap ,用来代替同步且基于散列的Map。
  • 以及 CopyOnWriteArrayList ,用于在遍历操作为主要操作的情况下代替同步的List。
  • 在新的 ConCurrentMap 接口中增加了一些常用的复合条件,如“若没有就添加”,替换,以及有条件删除等。
  • JDK5中增加了两种新容器, QueueBlockingQueue ,用来临时保存一组等待处理的元素。它提供了几种实现,包括:

    ConcurrentLinkedQueue
    PriorityQueue
    
  • Queue上的操作不会阻塞,如果队列为空,那么获取元素的操作将会返回空值。
  • 可以用List来模拟Queue的行为( Queue本身就是由LinkedList实现的 ),但是还是需要一个Queue类,因为它能去掉List的随机访问需求,实现更加高效的并发。
  • BlockingQueue 拓展了Queue,增加了可阻塞的插入和获取等操作。

    • 如果队列为空,那么获取元素的操作将一直阻塞。
    • 如果队列已满,那么插入元素的操作将一直阻塞 ,直到队列中出现可用的空间。
  • JDK6引入了:

    • ConcurrentSkipListMap :同步的 SortedMap 的并发替代品
    • ConcurrentSkipListSet :同步的 SortedSet 的并发替代品

ConcurrentHashMap

同步容器类在执行每个操作期间都持有一个锁。与HashMap一样,ConcurrentHashMap也是一个 基于散列的Map ,使用了一种不同的策略来提供更高的并发性和伸缩性:

  • ConcurrentHashMap并不是将每个方法都在同一个锁上同步并使得同时只能由一个线程访问容器,而是利用 分段锁(Locking Striping) 做更细粒度的加锁机制来实现更大程度的共享。这样的好处是,在多线程并发访问下将实现更高的吞吐量,而单线程环境只损失非常小的性能。
  • ConcurrentHashMap与其他容器增强了同步容器类:他们提供的迭代器不会抛出 ConcurrentModificationException异常 ,ConcurrentHashMap返回的迭代器具有 弱一致性(Weakly Consistent)弱一致性的迭代器可以容忍并发的修改 ,当创建迭代器的时候会遍历已有的元素,可以(并不保证)在迭代器在被构造后将修改操作反映给容器。

与Hashtable和synconizedMap相比,ConcurrentHashMap有着更多的优势和更少的劣势,在大多数并发情况下,用ConcurrentHashMap来代替同步Map能提高代码的可伸缩性,只有当需要加锁Map进行访问时,才应该放弃使用concurrentHashMap。

额外的原子Map操作

由于ConcurrentMap并不是通过持有锁来控制对象的独占访问,所以我们无法靠加锁新建原子操作。但是常见的如:“若没有则添加”,“若相等则移除”,“若相等则替换” 等复合操作都已经实现(具体可以看接口描述)。

CopyOnWriteArraryList

CopyOnWriteArraryList 用于替代同步List,在某些情况下它能提供更好的并发性能,在迭代期间并不需要对容器进行复制或者加锁。

  • CopyOnWrite(写入时复制) 容器的线程安全性在于,只要 正确发布一个事实不变的对象 ,那么在访问该对象的时候就不需要进一步的同步。
  • 在每次修改的时候,都会创建并重新发布一个新的容器副本,从而实现可变性。 容器的迭代器保留一个指向底层数组的引用,这个数组当且位于迭代器的起始位置,由于每次都会创建新对象,所以和读进程互不干扰。
  • 由于每次修改都需要进行复制,所以不适合需要经常修改或者容器规模很大的情况。

阻塞队列和生产者-消费者模式

阻塞队列提供了:

  • 可阻塞的 puttake 方法:如果队列满了,那么put队列将阻塞至有空间可用;如果队列为空,take方法将会阻塞至有元素可用。
  • 定时的 offerpoll 方法

队列可以是有界的也可以是无界的,无界队列永远也不会充满,所以put方法永远也不会阻塞。

阻塞队列支持生产者-消费者模式,把 找出需要完成的工作执行工作 这两个过程分开,并将工作放入一个 待完成 列表,以便后续处理,而不是找出立即处理。

  • 生产者-消费者模式能简化开发过程,它消除了生产者和消费者直接的代码依赖性,该模式还将生产数据的过程与使用数据的过程解耦来简化工作负载的管理,因为这两个过程在处理数据的速率不同。
  • 如果生产者生成工作的速率比消费者处理工作的速度快,那么工作就会在队列中累计起来,直到耗尽内存。put的阻塞特性极大的简化了生产者的编码。如果使用有界队列,那么当队列充满时,生产者将阻塞并且不能继续生成工作,而消费者就有时间来赶上工作处理进度。

在构建高可靠的应用程序时,有界队列是一种强大的资源管理工具:它们能抑制并防止产生过多的工作项,使应用程序在负荷过载的情况下变得更加健壮。

类库中的BlockingQueue实现

  • LinkedBlockingQueueArrayBlockingQueueFirst In,First Out(FIFO)队列 ,分别和LinkedList和ArrayList相似。但比同步List有更好的并发性能。
  • PriorityBlockingQueue 是一个优先队列,可以通过元素的自然顺序比较,也可以使用Comparator方法。
  • SynchronousQueue ,实际上并不算是一个真正的队列,因为它不会为队列中的元素维护存储空间。与其他队列不同的是,它维护一组线程,这些线程在等着把元素加入或者移出队列。这种实现队列的方式看起来很奇怪,等于直接将工作交付给消费者线程,从而降低了将数据从生产者移动到消费者的延迟(避免串行入列和出列)。并且一旦工作被交付,生产者可以立即得到反馈。

串行线程封闭

  • 对于可变对象,生产者-消费者这种设计与阻塞队列一起,促进了串行线程封闭,从而将对象所有权从生产者交付给消费者。
  • 线程封闭对象只能由单个线程拥有,但是可以通过安全发布该对象来“转移”所有权,并且转移后,发布对象的线程不会再访问它,对象被封闭在新的线程里。
  • 对象池利用了串行线程封闭,将对象“借给”一个请求线程,重要对象池保护足够的内部同步来安全发布池中的对象,并且客户代码本身不会发布池中的对象,且在将对象返回给对象池之后不会再使用它,那么就可以安全地在线程之间传递所有权。

双端队列与工作密取

jdk6增加了两种容器类型:

Deque
BlockingDeque

Deque是一个 双端队列 ,实现了在队列头和队列尾的高效插入和移除,具体实现有ArrayDeque和LinkedBlockingDeque。

正如阻塞队列适用于生产者-消费者模式,双端队列适用于工作密取模式。

在生产者-消费者模式中,所有消费者都有一个共享的工作队列。而工作密取中,每个消费者都有一个自己的双端队列,如果一个消费者完成了自己双端队列中的全部工作,那么它可以从其他消费者队列的尾部来秘密获取工作(G1中的处理DCQ就是使用工作密取模式)。

工作密取比生产者消费者模式具有更好的伸缩性,消费者基本不会在单个共享的任务队列上发生竞争。当一个消费者线程要访问另一个队列时,是从尾部而不是头部获取,进一步降低了队列的竞争程度。

阻塞方法与中断方法

线程可能会阻塞或暂停执行,原因有多种:

  • 等待I/O操作结束
  • 等待获得一个锁
  • 等待从Thread.sleep方法中醒来
  • 等待另一个线程的计算结果

当线程阻塞时,它通常被挂起,并处于某种阻塞状态:

BLOCK
WAITING
TIMED_WAITING 

被阻塞的线程必须等待某个不受它控制的事件发生后才能继续执行,例如等待的I/O操作已经完成,锁可用了等等。当某个外部事件发生时,线程被置回 RUNNABLE 状态,并且可以执行调度。

Thread提供了 interrupt 方法,用于中断线程或者查询线程是否已经被中断,每个线程都有个boolean来表示线程的中断状态。

中断是一种协作机制,一个线程不能强制其他线程停止正在执行的操作而去执行其他操作。

当线程A中断B,A仅仅是要求B在执行到某个可以暂停的地方停止正在执行的操作(前提是如果B愿意停下)。最常使用的中断的情况就是取消某个操作。

当在代码中调用了一个将抛出InterruptedException异常的方法时,你的方法就变成了阻塞方法,并且必须处理中断的响应:

  • 传递InterruptedExcption :避开这个异常通常是最好的选择,只需要将这个异常传递给调用者,并不捕获或者恢复该异常,然后在执行某种简单的清理工作后再次抛出这个异常。
  • 恢复中断 :有时候不能抛出这个异常,例如代码是Runnable的一部分时,必须捕获该异常,并通过当前线程上的 interrupt 方法恢复中断。

同步工具类

闭锁

闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。闭锁可以用来确保某些活动直到其他活动都完成后才继续执行。

CountDownLatch 是一种灵活的闭锁实现,它可以使一个或多个线程等待一组事件发生。闭锁状态有一个计数器,这个计数器被初始化为一个正数,表示等待的事件数量。countDown方法递减计数器,表示已经有一个事件发生了,而await方法会一直阻塞,直到计数器为0,或者等待的线程中断或者超时。

public class TestHarness {
    public long timeTask(int nThreads, final Runnable task) throws InterruptedException{
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);

        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread(){
              public void run(){
                  try {
                      startGate.await();
                      try{
                          task.run();
                      }finally {
                          endGate.countDown();
                      }
                  }catch (InterruptedException ignored){}
              }
            };
            t.start();
        }

        long start = System.nanoTime();
        startGate.countDown();
        endGate.await();
        long end = System.nanoTime();
        return end - start;

    }
}
  • 第一个闭锁:startGate,确保所有的线程都准备就绪后才开始执行任务。
  • 第二个闭锁:endGate,等到所有线程都执行完成后计算消耗的时间。

FutureTask

FutureTask也可以用做为闭锁。FutureTask实现了Future语义,表示一种抽象的可生成结果的计算。FutureTask表示的计算是通过Callable来实现的。

  • 等待运行(Waiting to run)
  • 正在运行 (Running)
  • 运行完成(Completed)

当FutureTask进入完成状态后,它会永远停在这个状态上。

Future.get行为取决于任务的状态:

  • 任务完成:get立即返回结果
  • 任务还未完成:get将阻塞直到任务进入完成状态,然后返回结果或是抛出异常

FutureTask将计算结果从执行计算的线程传递到获取这个结果的线程,并且FutureTask能够 保证这种传递过程能实现结果的安全发布

FutureTask 类实现了 RunnableFuture 接口, RunnableFuture 继承了 Runnable接口和Future接口 ,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。 事实上,FutureTask是Future接口的一个唯一实现类。

Callable表示的任务可以抛出受检查的或未受检查的异常,并且任何代码都可能抛出一个Error。无论任务代码抛出什么异常,都会被封装到一个ExecutionException中,并在Future.get中重新抛出。所以当ExcutionException时,可能是以下三种情况之一:

  • Callable抛出的受检查异常
  • RuntimeException
  • Error

信号量

计数信号量(Counting Semaphore) :用来控制访问某个特定资源的操作数量,或者同时执行某个制定操作的数量。计数信号量还可以用来实现某些资源池,或者对容器加边界。

Semaphore中管理一组许可,许可的初始数量可以通过构造函数来指定。在执行操作的时候,首先获得许可,在使用以后释放许可。如果没有许可,acquire将阻塞直到有许可(或者被中断或超时)。

public class BoundedHashSet<T> {

    private final Set<T> set;

    private final Semaphore semaphore;

    public BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet<T>());
        semaphore = new Semaphore(bound);
    }

    public boolean add(T t) throws InterruptedException {
        semaphore.acquire();
        boolean wasAdded = false;
        try {
            wasAdded = set.add(t);
            return wasAdded;
        } finally {
            if (!wasAdded) {
                semaphore.release();
            }
        }
    }

    public boolean remove(T o) {
        boolean wasRemoved = set.remove(o);
        if (wasRemoved) {
            semaphore.release();
        }
        return wasRemoved;
    }
}

这里实现了一个有界阻塞Set容器,信号量的计数值初始化为容器的最大值。每次添加元素之前,要获得一个许可,如果add失败,并没有成功添加上元素,就释放许可。删除成功也释放许可,底层的set实现并不知道关于边界的信息,都是由BoundedHashSet来处理的。

栅栏(Barrier)

闭锁和栅栏的区别

  • 闭锁是一次性对象,一旦进入终止状态,就不能被重制
  • 栅栏能阻塞一组线程直到某个事件发生,所有线程必须同时到达栅栏位置,才能继续执行。

闭锁用于等待事件,而栅栏用来等待其他线程。

CyclicBarrier 可以使一定数量的参与方反复地在栅栏位置汇集,它在并行迭代算法中非常有用:

BrokenBarrierException
原文  https://segmentfault.com/a/1190000022097715
正文到此结束
Loading...