转载

Java并发编程实战笔记3:基础构建模块

在上文已经说明,委托是构造线程安全类的一个最有效策略,也就是让现有的线程安全类管理所有的状态即可。以下将介绍这些基础构建模块。

同步容器类

同步容器类包括Vector和Hashtable以及由 Collections.synchronizedXxx 等工厂方法创建的同步封装器类。这些类实现线程安全的方式是:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。同步容器对所有容器状态的访问都串行化,严重降低了并发性;当多个线程竞争锁时,吞吐量严重下降。

同步容器类存在的问题

同步容器类都是线程安全的,但是在某些情况下可能需要额外的客户端加锁来保护复合操作。

比如,在Vecotr中,getLast()和deleteLast()操作,如果是在多线程的环境下运行,如果不加锁,会产生异常情况。一个线程在getLast()后,另一个线程deleteLast(),然后该线程继续执行,进行deleteLast()操作,此时会抛出下标越界的异常。

又比如,在迭代的过程中,使用get(index)的操作,如果有多个线程运行,可能会删除其中元素,同样会造成异常。

对于如上的情况,我们需要通过客户端加锁来解决线程安全的问题。如在迭代时加锁:

synchronized(vector){
    for(int i=0;i<vector.size();i++){
        vector.get(i);
    }
}
复制代码

迭代器

在迭代或者for-each循环语法时,对容器类进行迭代的标准方式都是使用Iterator。然而,在设计同步容器类的迭代器时并没有考虑到并发修改的问题,并且它们表现出的行为时“及时失败”的,也就是当它们发现容器在迭代过程中被修改时,就会抛出ConcurrentModificationException。

如果在迭代期间,对容器加锁,首先会降低效率,提高线程的等待时间;然后还可能会产生死锁;降低了吞吐量和CPU的利用率。

如果不希望在迭代期间加锁,可以使用克隆容器的方法,并在克隆副本上进行迭代。

加锁可以防止迭代器抛出ConcurrentModificationException,但是要在所有对容器进行迭代的地方都要加锁。如hashCode,equals,containsAll,removeAll,retainAll等方法,在以容器为参数时,都会对容器进行迭代。这些间接的迭代操作可能抛出ConcurrentModificationException。

并发容器

Java 5.0提供了多种并发容器类来改进同步容器的性能。同步容器对所有容器状态的访问都串行化,严重降低了并发性;当多个线程竞争锁时,吞吐量严重下降。

并发容器是针对多个线程并发访问设计的。通过并发容器来替代同步容器,可以极大地提高伸缩性并降低风险。并发容器包括ConcurrentHashMap(替代Map),CopyOnWriteArrayList(替代List),ConcurrentLinkedQueue,BlockingQueue等等。

ConcurrentHashMap

同步容器类在执行每个操作期间都持有一个锁。ConcurrentHashMap采用了不同的加锁策略来提供更高的并发性和伸缩性。它并不是将每个方法都在同一个锁上同步,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这种机制称为分段锁。

分段锁机制使得任意数量的读取线程可以并发访问Map,执行读取操作的线程和执行写入操作的线程可以并发访问Map,并且一定数量的写入线程可以并发地修改Map,因此提高了并发访问的吞吐量。

并发容器增强了同步容器类,它们提供的迭代器不会抛出ConcurrentModificationException,因此不需要在迭代过程中对容器加锁。其迭代器具有弱一致性,可以容忍并发的修改,在创建迭代器时会遍历已有元素,并可以(但是不保证)在迭代器被构造后将修改操作反映给容器。size(),isEmpty()等方法返回的是一个近似值。

由于ConcurrentHashMap与Hashtable和synchronizedMap有更多的优势,因此大多数情况应该使用并发容器类,至于当需要对整个容器加锁进行独占访问时,才应该放弃使用并发容器。

注意,此时不能再通过客户端加锁新建新的原子操作了,客户端只能对并发容器自身加锁,但并发容器内部使用的并不是自身锁。

CopyOnWriteArrayList

写入时复制容器,在每次修改时都会加锁并创建和重新发布一个新的容器副本,直接修改容器引用,从而实现可见性。 写操作在一个复制的数组上进行,读操作还是在原始数组中进行,读写分离,互不影响。写操作需要加锁,防止并发写入时导致写入数据丢失。写操作结束之后需要把原始数组指向新的复制数组。

CopyOnWriteArrayList 在写操作的同时允许读操作,大大提高了读操作的性能,因此很适合读多写少的应用场景。 但是 CopyOnWriteArrayList 有其缺陷:

  • 内存占用:在写操作时需要复制一个新的数组,使得内存占用为原来的两倍左右;
  • 数据不一致:读操作不能读取实时性的数据,因为部分写操作的数据还未同步到读数组中。

阻塞队列

阻塞队列支持生产者-消费者模式。简化了开发过程,消除了生产者和消费者之间的代码依赖性。阻塞队列简化了生产者-消费者设计的实现过程。一种常见的生产者-消费者设计模式就是线程池与工作队列的组合。

阻塞队列提供了四种处理方法:

  1. 抛出异常,使用add(e)插入,remove()删除,element()查询。当阻塞队列满时,插入元素;当队列空,删除元素都会抛出异常。
  2. 返回特殊值,使用offer(e)插入,poll()删除,peek()查询。插入时,如果成功返回true,移除时,如果没有对应的元素返回null。
  3. 阻塞,使用put(e)插入,take()删除。队列满,插入元素时会阻塞;队列空,取元素会阻塞。
  4. 超时退出:使用offer(e,time,unit)插入,poll(time,unit)删除。当队列满时,会阻塞,超过一定的时间,线程会退出。

阻塞队列有多种实现。

  • ArrayBlokcingQueue和LinkedBlockingQueue分别是数组和链表结构组成的有界的FIFO阻塞队列。
  • PriorityBlockingQueue是一个支持优先级排序的无界阻塞队列。
  • SynchronousQueue是一个不存储元素的阻塞队列,它不会为队列中元素维护存储空间。
  • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

双端队列与工作密取

Java 6提供了Dqueue和BlockingDeque,是双端队列,实现了在队列头和队列尾的高效插入和移除。双端队列适用于工作密取模式。在工作密取中,每个消费者都有各自的双端队列。如果一个消费者完成了自己的双端队列的全部工作,可以从其他消费者双端队列末尾秘密的获取工作。因为工作者线程不会再单个共享的任务队列上发生竞争。适用于既是生产者又是消费者问题。

阻塞方法与中断方法

线程会阻塞或暂停执行。被阻塞的线程必须等待某个不受它控制的事件发生后才能继续执行。当在代码中调用一个可以抛出InterruptedException的方法时,自己的方法就编程了阻塞方法,必须处理中断的响应。如果这个方法被中断,那么它将努力提前结束状态。

处理中断的响应有两种基本选择:

  1. 传递InterruptedException,把该异常抛出给方法的调用者。
  2. 恢复中断,捕获异常,并调用当前线程的interrupt方法恢复中断,引发更高层的代码中断。
public void run(){
    try{
        something();
    }catch(InterruptedException e){
        Thread.currentThread().interrupt();
    }
}
复制代码

同步工具类

同步工具类可以是任何一个对象,只要它根据其自身的状态来协调线程的控制流。包括阻塞队列,信号量,栅栏以及闭锁。

闭锁

闭锁用来确保某些活动直到其他活动都完成了才继续执行。如果有多个线程,其中一个线程需要等到其他 所有 线程活动结束后才继续执行,使用闭锁。

CountDownLatch是一种闭锁的实现,可以使得一个或者多个线程等待一组事情发生。包括一个计数器,表示需要等待的事件数量;countDown方法用来递减计数器,表示有一个事件已经发生了;await方法等待计数器为0,表示所有需要等待的事情已经发生。

// 初始化闭锁,并设置资源个数
CountDownLatch latch = new CountDownLatch(2);

Thread t1 = new Thread( new Runnable(){
    public void run(){
        // 加载资源1
        加载资源的代码……
        // 本资源加载完后,闭锁-1
        latch.countDown();
    }
} ).start();

Thread t2 = new Thread( new Runnable(){
    public void run(){
        // 加载资源2
        资源加载代码……
        // 本资源加载完后,闭锁-1
        latch.countDown();
    }
} ).start();

Thread t3 = new Thread( new Runnable(){
    public void run(){
        // 本线程必须等待所有资源加载完后才能执行
        latch.await();
        // 当闭锁数量为0时,await返回,执行接下来的任务
        任务代码……
    }
} ).start();
复制代码

栅栏(同步屏障)

闭锁是一次性对象,一旦进入终止状态,就不能被重置。栅栏类似于闭锁,能阻塞一组进程直到某个时间发生。栅栏与闭锁的区别在于,所有线程必须同时到达栅栏位置,才能继续执行。

若有多条线程,他们到达屏障时将会被阻塞,只有当所有线程都到达屏障时才能打开屏障,所有线程同时执行,若有这样的需求可以使用同步屏障。此外,当屏障打开的同时还能指定执行的任务。

闭锁只会阻塞一条线程,目的是为了让该条任务线程满足条件后执行; 而同步屏障会阻塞所有线程,目的是为了让所有线程同时执行(实际上并不会同时执行,而是尽量把线程启动的时间间隔降为最少)。

// 创建同步屏障对象,并制定需要等待的线程个数 和 打开屏障时需要执行的任务
CyclicBarrier barrier = new CyclicBarrier(3,new Runnable(){
    public void run(){
        //当所有线程准备完毕后触发此任务
    }
});

// 启动三条线程
for( int i=0; i<3; i++ ){
    new Thread( new Runnable(){
        public void run(){
            // 等待,(每执行一次barrier.await,同步屏障数量-1,直到为0时,打开屏障)
            barrier.await();
            // 任务
            任务代码……
        }
    } ).start();
}
复制代码

信号量

信号量用于控制同时访问某个特定资源的操作数量,或者执行某个指定操作的数量。计数信号量还可以用来实现某种资源池,或者对容器施加边界。

信号量可以用于实现资源池,也可以用于将容器变为有界阻塞容器。信号量管理着一组虚拟的许可,在执行操作时首先获取许可,并在使用以后释放许可。如果没有许可,将阻塞直到有许可或被中断,超时。

信号量的使用场景是,有m个资源,n个线程,且n>m,同一时刻只能允许m条线程访问资源。

// 创建信号量对象,并给予3个资源
Semaphore semaphore = new Semaphore(3);

// 开启10条线程
for ( int i=0; i<10; i++ ) {
    new Thread( new Runnbale(){
        public void run(){
            // 获取资源,若此时资源被用光,则阻塞,直到有线程归还资源
            semaphore.acquire();
            // 任务代码
            ……
            // 释放资源
            semaphore.release();
        }
    } ).start();
}
复制代码

FutureTask

可以用作闭锁,是一种可以生成结果的Runnable,可以处于以下三种状态:等待运行,正在运行和运行完成。当FutureTask进入完成状态后,它会停止在这个状态上。

FutureTask在Executor框架中表示异步任务,此外还可以用来表示一些时间较长的运算,这些计算可以在使用计算结构之前启动。

实战:构建缓存

首先,使用HashMap和同步机制来初始化缓存。

public interface Computable<A,V> {
    V compute(A arg) throws InterruptedException;
}
public class ExpensiveFunc implements Computable<String,BigInteger> {

    @Override
    public BigInteger compute(String arg) throws InterruptedException {
        return new BigInteger(arg);
    }
}
public class Memoizer1<A,V> implements Computable<A,V> {
    private final Map<A,V> cache=new HashMap<>();
    private final Computable<A,V> c;

    public Memoizer1(Computable<A,V> c){
        this.c=c;
    }

    @Override
    public synchronized V compute(A arg) throws InterruptedException {
        V result=cache.get(arg);
        if(result==null){
            result=c.compute(arg);
            cache.put(arg,result);
        }
        return result;
    }
}
复制代码

在这种实现方法中,使用HashMap保存之前计算的结果。首先检查需要的结果是否已经在缓存中,如果存在则返回之前计算,否则将计算结果缓存到HashMap再返回。

为了确保线程安全,将整个compute方法进行同步。但是这样伸缩性差,缓存的性能并没有得到提升。

下面使用ConcurrentHashMap替换HashMap。但是,这种方法存在一些不足,当两个线程同时调用compute时,可能会导致计算得到相同的值。这样是低效的,因为缓存的作用就是避免相同的数据被计算多次。其问题在于,如果某个线程启动了一个计算,而其他线程并不知道这个计算正在进行,很可能会重复这个计算。

针对如上问题,我们考虑可以使用FutureTask来解决。使用该类来表示计算的过程,如果有结果可用,则返回结果,否则一直阻塞。

public class Memo2 <A,V> implements Computable<A,V>{
    private final Map<A,Future<V>> cache=new ConcurrentHashMap<>();
    private final Computable<A,V>c;
    public Memo2(Computable<A,V>c){
        this.c=c;
    }

    @Override
    public V compute(A arg) throws InterruptedException {
        Future<V> future=cache.get(arg);
        if(future==null){
            Callable<V> eval=new Callable<V>() {
                @Override
                public V call() throws Exception {
                    return c.compute(arg);
                }
            };
            FutureTask<V> ft=new FutureTask<>(eval);
            future=cache.putIfAbsent(arg,ft);
            if(future==null){
                future=ft;
                ft.run();
            }
        }
        try{
            return future.get();
        }catch (ExecutionException e){
            e.printStackTrace();
        }
        return null;
    }

}
复制代码
原文  https://juejin.im/post/5c19aa876fb9a049fe3516f0
正文到此结束
Loading...