阻塞队列是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法:
阻塞队列常用于生产者消费者的场景。其中生产者是向队列添加元素的线程,消费者是从队列取出元素的线程,阻塞队列是存放和获取元素的容器。
阻塞队列的4种处理方式:
add(e) remove() element()
offer(e) poll() peek()
put(e) take()
offer(e, time, unit) poll(time, unit)
ArrayBlockingQueue 是一个用数组实现的有界的,按照 FIFO 原则对元素排序的阻塞队列。它还支持对等待的生产者和消费者线程进行排序时的可选公平策略,默认情况下不保证线程公平的访问,在构造时可以选择公平策略。公平性会降低吞吐量,但是减少了可变性和避免了“不平衡性”。
这是一个用链表实现的有界阻塞队列,默认长度和最大长度都是 Integer.MAX_VALUE 。该队列也是按照 FIFO 原则对元素排序,确定线程执行的先后顺序。
这是一个支持优先级的无界祖苏队列,默认情况下采取自然顺序升序排序,也可以通过构造函数指定 Comparator 来对元素进行排序。但是它不能保证相同优先级元素的顺序。
底层是采用二叉最大堆来实现优先级排序的。
这是一个支持延时获取元素的无界阻塞队列,其队列使用优先队列 PriorityQueue 实现。队列中的元素必须实现 Delayed 接口,创建元素时可以指定多久之后才能从队列中获取该元素,只有在元素到期时才能获取。
主要用于缓存,如清除缓冲中超时的数据。还用于定时任务的调度。
元素创建时,要实现 Delayed 接口,首先进行初始化;然后实现 getDelay(Timeunit unit)
方法,返回的值是当前元素还需要延时多长时间;最后实现 compareTo(Delayed other)
方法,用来指定元素的顺序。
当消费者从队列中获取元素时,如果元素还没有到延时时间,就阻塞当前线程。此外,设置了 leader 变量表示等待获取队列头部元素的线程。如果 leader 不为空,表示有现成等待获取队列头部元素,使用 await() 方法让当前线程等待信号。如果 leader 为空,则把当前线程设置为 leader,使用 awaitNanos() 方法让当前线程等待接收信号或等待 delay 时间。
与其他阻塞队列不同,这是一个不存储元素的阻塞队列,每一个 put 操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。分为公平和不公平访问队列,默认情况采用非公平性策略访问队列。
该种队列本身不存储任何元素,适合传递性场景,把生产者线程处理的数据直接传递给消费者线程,其吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue。
这是一个由链表结构组成的 FIFO 的无界阻塞 TransferQueue 队列。它采取一种预占模式,也就是有就直接拿走,没有就占着这个位置直到拿到、超时或中断。相对于其他阻塞队列,多了 tryTransfer 方法和 transfer 方法。
transfer(e,[timeout,unit]) tryTransfer(e,[timeout,unit])
是一个由链表组成的双向阻塞队列。可以从队列两端插入和移除元素。
该框架主要应用在并行计算中,把一个大人物分割成若干个小任务,最终汇总每个小任务结果后得到大结果的框架。Fork 就是把一个大任务切分成若干子任务并行的执行,Join 就是合并这些子任务的执行结果,最终得到这个大任务的结果。
工作窃取是指某个线程从其他队列里窃取任务来执行。通常使用双端队列,被窃取任务线程永远从双端队列头部拿任务执行,窃取任务的线程永远从双端队列尾部拿任务执行。
优点是充分利用线程进行并行计算,减少了线程间的竞争。缺点是在某些情况下存在竞争,比如队列只有一个任务时,会消耗更多的资源。
首先,分割任务,将一个大任务分割成子任务,不停分割直到分割出的子任务足够小。
然后,执行任务并合并结果。分割的子任务分别放在双端队列,然后几个启动线程分别从双端队列获取任务执行。执行结果放在一个队列里,启动一个线程从队列拿数据,然后合并这些线程。
public class ForkJoinCase extends RecursiveTask<Integer> { private final int threshold=5; private int first; private int last; public ForkJoinCase(int first,int last){ this.first=first; this.last=last; } @Override protected Integer compute() { int ret=0; if(last-first<=threshold){//任务足够小,执行 for(int i=first;i<=last;i++){ ret+=i; } }else{//分解任务 int mid=first+(last-first)/2; ForkJoinCase leftTask=new ForkJoinCase(first,mid); ForkJoinCase rightTask=new ForkJoinCase(mid+1,last); //执行子任务 leftTask.fork(); rightTask.fork(); //合并子任务结果 ret=leftTask.join()+rightTask.join(); } return ret; } } 复制代码