在上文已经说明,委托是构造线程安全类的一个最有效策略,也就是让现有的线程安全类管理所有的状态即可。以下将介绍这些基础构建模块。
同步容器类包括Vector和Hashtable以及由 Collections.synchronizedXxx
等工厂方法创建的同步封装器类。这些类实现线程安全的方式是:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。同步容器对所有容器状态的访问都串行化,严重降低了并发性;当多个线程竞争锁时,吞吐量严重下降。
同步容器类都是线程安全的,但是在某些情况下可能需要额外的客户端加锁来保护复合操作。
比如,在Vecotr中,getLast()和deleteLast()操作,如果是在多线程的环境下运行,如果不加锁,会产生异常情况。一个线程在getLast()后,另一个线程deleteLast(),然后该线程继续执行,进行deleteLast()操作,此时会抛出下标越界的异常。
又比如,在迭代的过程中,使用get(index)的操作,如果有多个线程运行,可能会删除其中元素,同样会造成异常。
对于如上的情况,我们需要通过客户端加锁来解决线程安全的问题。如在迭代时加锁:
synchronized(vector){ for(int i=0;i<vector.size();i++){ vector.get(i); } } 复制代码
在迭代或者for-each循环语法时,对容器类进行迭代的标准方式都是使用Iterator。然而,在设计同步容器类的迭代器时并没有考虑到并发修改的问题,并且它们表现出的行为时“及时失败”的,也就是当它们发现容器在迭代过程中被修改时,就会抛出ConcurrentModificationException。
如果在迭代期间,对容器加锁,首先会降低效率,提高线程的等待时间;然后还可能会产生死锁;降低了吞吐量和CPU的利用率。
如果不希望在迭代期间加锁,可以使用克隆容器的方法,并在克隆副本上进行迭代。
加锁可以防止迭代器抛出ConcurrentModificationException,但是要在所有对容器进行迭代的地方都要加锁。如hashCode,equals,containsAll,removeAll,retainAll等方法,在以容器为参数时,都会对容器进行迭代。这些间接的迭代操作可能抛出ConcurrentModificationException。
Java 5.0提供了多种并发容器类来改进同步容器的性能。同步容器对所有容器状态的访问都串行化,严重降低了并发性;当多个线程竞争锁时,吞吐量严重下降。
并发容器是针对多个线程并发访问设计的。通过并发容器来替代同步容器,可以极大地提高伸缩性并降低风险。并发容器包括ConcurrentHashMap(替代Map),CopyOnWriteArrayList(替代List),ConcurrentLinkedQueue,BlockingQueue等等。
同步容器类在执行每个操作期间都持有一个锁。ConcurrentHashMap采用了不同的加锁策略来提供更高的并发性和伸缩性。它并不是将每个方法都在同一个锁上同步,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为分段锁。
分段锁机制使得任意数量的读取线程可以并发访问Map,执行读取操作的线程和执行写入操作的线程可以并发访问Map,并且一定数量的写入线程可以并发地修改Map,因此提高了并发访问的吞吐量。
并发容器增强了同步容器类,它们提供的迭代器不会抛出ConcurrentModificationException,因此不需要在迭代过程中对容器加锁。其迭代器具有弱一致性,可以容忍并发的修改,在创建迭代器时会遍历已有元素,并可以(但是不保证)在迭代器被构造后将修改操作反映给容器。size(),isEmpty()等方法返回的是一个近似值。
由于ConcurrentHashMap与Hashtable和synchronizedMap有更多的优势,因此大多数情况应该使用并发容器类,至于当需要对整个容器加锁进行独占访问时,才应该放弃使用并发容器。
注意,此时不能再通过客户端加锁新建新的原子操作了,客户端只能对并发容器自身加锁,但并发容器内部使用的并不是自身锁。
写入时复制容器,在每次修改时都会加锁并创建和重新发布一个新的容器副本,直接修改容器引用,从而实现可见性。 写操作在一个复制的数组上进行,读操作还是在原始数组中进行,读写分离,互不影响。写操作需要加锁,防止并发写入时导致写入数据丢失。写操作结束之后需要把原始数组指向新的复制数组。
CopyOnWriteArrayList 在写操作的同时允许读操作,大大提高了读操作的性能,因此很适合读多写少的应用场景。 但是 CopyOnWriteArrayList 有其缺陷:
阻塞队列支持生产者-消费者模式。简化了开发过程,消除了生产者和消费者之间的代码依赖性。阻塞队列简化了生产者-消费者设计的实现过程。一种常见的生产者-消费者设计模式就是线程池与工作队列的组合。
阻塞队列提供了四种处理方法:
阻塞队列有多种实现。
Java 6提供了Dqueue和BlockingDeque,是双端队列,实现了在队列头和队列尾的高效插入和移除。双端队列适用于工作密取模式。在工作密取中,每个消费者都有各自的双端队列。如果一个消费者完成了自己的双端队列的全部工作,可以从其他消费者双端队列末尾秘密的获取工作。因为工作者线程不会再单个共享的任务队列上发生竞争。适用于既是生产者又是消费者问题。
线程会阻塞或暂停执行。被阻塞的线程必须等待某个不受它控制的事件发生后才能继续执行。当在代码中调用一个可以抛出InterruptedException的方法时,自己的方法就编程了阻塞方法,必须处理中断的响应。如果这个方法被中断,那么它将努力提前结束状态。
处理中断的响应有两种基本选择:
public void run(){ try{ something(); }catch(InterruptedException e){ Thread.currentThread().interrupt(); } } 复制代码
同步工具类可以是任何一个对象,只要它根据其自身的状态来协调线程的控制流。包括阻塞队列,信号量,栅栏以及闭锁。
闭锁用来确保某些活动直到其他活动都完成了才继续执行。如果有多个线程,其中一个线程需要等到其他 所有 线程活动结束后才继续执行,使用闭锁。
CountDownLatch是一种闭锁的实现,可以使得一个或者多个线程等待一组事情发生。包括一个计数器,表示需要等待的事件数量;countDown方法用来递减计数器,表示有一个事件已经发生了;await方法等待计数器为0,表示所有需要等待的事情已经发生。
// 初始化闭锁,并设置资源个数 CountDownLatch latch = new CountDownLatch(2); Thread t1 = new Thread( new Runnable(){ public void run(){ // 加载资源1 加载资源的代码…… // 本资源加载完后,闭锁-1 latch.countDown(); } } ).start(); Thread t2 = new Thread( new Runnable(){ public void run(){ // 加载资源2 资源加载代码…… // 本资源加载完后,闭锁-1 latch.countDown(); } } ).start(); Thread t3 = new Thread( new Runnable(){ public void run(){ // 本线程必须等待所有资源加载完后才能执行 latch.await(); // 当闭锁数量为0时,await返回,执行接下来的任务 任务代码…… } } ).start(); 复制代码
闭锁是一次性对象,一旦进入终止状态,就不能被重置。栅栏类似于闭锁,能阻塞一组进程直到某个时间发生。栅栏与闭锁的区别在于,所有线程必须同时到达栅栏位置,才能继续执行。
若有多条线程,他们到达屏障时将会被阻塞,只有当所有线程都到达屏障时才能打开屏障,所有线程同时执行,若有这样的需求可以使用同步屏障。此外,当屏障打开的同时还能指定执行的任务。
闭锁只会阻塞一条线程,目的是为了让该条任务线程满足条件后执行; 而同步屏障会阻塞所有线程,目的是为了让所有线程同时执行(实际上并不会同时执行,而是尽量把线程启动的时间间隔降为最少)。
// 创建同步屏障对象,并制定需要等待的线程个数 和 打开屏障时需要执行的任务 CyclicBarrier barrier = new CyclicBarrier(3,new Runnable(){ public void run(){ //当所有线程准备完毕后触发此任务 } }); // 启动三条线程 for( int i=0; i<3; i++ ){ new Thread( new Runnable(){ public void run(){ // 等待,(每执行一次barrier.await,同步屏障数量-1,直到为0时,打开屏障) barrier.await(); // 任务 任务代码…… } } ).start(); } 复制代码
信号量用于控制同时访问某个特定资源的操作数量,或者执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。
信号量可以用于实现资源池,也可以用于将容器变为有界阻塞容器。信号量管理着一组虚拟的许可,在执行操作时首先获取许可,并在使用以后释放许可。如果没有许可,将阻塞直到有许可或被中断,超时。
信号量的使用场景是,有m个资源,n个线程,且n>m,同一时刻只能允许m条线程访问资源。
// 创建信号量对象,并给予3个资源 Semaphore semaphore = new Semaphore(3); // 开启10条线程 for ( int i=0; i<10; i++ ) { new Thread( new Runnbale(){ public void run(){ // 获取资源,若此时资源被用光,则阻塞,直到有线程归还资源 semaphore.acquire(); // 任务代码 …… // 释放资源 semaphore.release(); } } ).start(); } 复制代码
可以用作闭锁,是一种可以生成结果的Runnable,可以处于以下三种状态:等待运行,正在运行和运行完成。当FutureTask进入完成状态后,它会停止在这个状态上。
FutureTask在Executor框架中表示异步任务,此外还可以用来表示一些时间较长的运算,这些计算可以在使用计算结构之前启动。
首先,使用HashMap和同步机制来初始化缓存。
public interface Computable<A,V> { V compute(A arg) throws InterruptedException; } public class ExpensiveFunc implements Computable<String,BigInteger> { @Override public BigInteger compute(String arg) throws InterruptedException { return new BigInteger(arg); } } public class Memoizer1<A,V> implements Computable<A,V> { private final Map<A,V> cache=new HashMap<>(); private final Computable<A,V> c; public Memoizer1(Computable<A,V> c){ this.c=c; } @Override public synchronized V compute(A arg) throws InterruptedException { V result=cache.get(arg); if(result==null){ result=c.compute(arg); cache.put(arg,result); } return result; } } 复制代码
在这种实现方法中,使用HashMap保存之前计算的结果。首先检查需要的结果是否已经在缓存中,如果存在则返回之前计算,否则将计算结果缓存到HashMap再返回。
为了确保线程安全,将整个compute方法进行同步。但是这样伸缩性差,缓存的性能并没有得到提升。
下面使用ConcurrentHashMap替换HashMap。但是,这种方法存在一些不足,当两个线程同时调用compute时,可能会导致计算得到相同的值。这样是低效的,因为缓存的作用就是避免相同的数据被计算多次。其问题在于,如果某个线程启动了一个计算,而其他线程并不知道这个计算正在进行,很可能会重复这个计算。
针对如上问题,我们考虑可以使用FutureTask来解决。使用该类来表示计算的过程,如果有结果可用,则返回结果,否则一直阻塞。
public class Memo2 <A,V> implements Computable<A,V>{ private final Map<A,Future<V>> cache=new ConcurrentHashMap<>(); private final Computable<A,V>c; public Memo2(Computable<A,V>c){ this.c=c; } @Override public V compute(A arg) throws InterruptedException { Future<V> future=cache.get(arg); if(future==null){ Callable<V> eval=new Callable<V>() { @Override public V call() throws Exception { return c.compute(arg); } }; FutureTask<V> ft=new FutureTask<>(eval); future=cache.putIfAbsent(arg,ft); if(future==null){ future=ft; ft.run(); } } try{ return future.get(); }catch (ExecutionException e){ e.printStackTrace(); } return null; } } 复制代码