转载

Java并发阻塞队列之ArrayBlockingQueue

JUC简介

在 Java 5.0 提供了java.util.concurrent(简称JUC)包,在此包中增加了在并发编程中很常用的工具类,用于定义类似于线程的自定义子系统,包括线程池,异步IO和轻量级任务框架;还提供了设计用于多线程上下文中的Collection实现等;

今天要讲的ArrayBlockingQueue便是JUC包下的一个工具类。

ArrayBlockingQueue简介

ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列。线程安全是指类内部通过“互斥锁”保护竞争资源,实现多线程对竞争资源的互斥访问。

“有界”则是指ArrayBlockingQueue对应的数组是有界限且固定的,在创建对象时由构造函数指定,一旦指定则无法更改。

阻塞队列,是指多线程访问竞争资源时,当竞争资源已被某线程获取时,其它要获取该资源的线程需要阻塞等待;

ArrayBlockingQueue是按FIFO(先进先出)原则对元素进行排序,元素都是从尾部插入到队列,从头部开始返回。

ArrayBlockingQueue函数列表

// 创建一个带有给定的(固定)容量和默认访问策略的ArrayBlockingQueue。
ArrayBlockingQueue(int capacity)
// 创建一个具有给定的(固定)容量和指定访问策略的ArrayBlockingQueue。
ArrayBlockingQueue(int capacity, boolean fair)
// 创建一个具有给定的(固定)容量和指定访问策略的 ArrayBlockingQueue,它最初包含给定collection的元素,并以collection迭代器的遍历顺序添加元素。
ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)

// 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回true,如果此队列已满,则抛出IllegalStateException。
boolean add(E e)
// 自动移除此队列中的所有元素。
void clear()
// 如果此队列包含指定的元素,则返回true。
boolean contains(Object o)
// 移除此队列中所有可用的元素,并将它们添加到给定collection中。
int drainTo(Collection<? super E> c)
// 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection中。
int drainTo(Collection<? super E> c, int maxElements)
// 返回在此队列中的元素上按适当顺序进行迭代的迭代器。
Iterator<E> iterator()
// 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则返回false。
boolean offer(E e)
// 将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间。
boolean offer(E e, long timeout, TimeUnit unit)
// 获取但不移除此队列的头;如果此队列为空,则返回null。
E peek()
// 获取并移除此队列的头,如果此队列为空,则返回null。
E poll()
// 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
E poll(long timeout, TimeUnit unit)
// 将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间。
void put(E e)
// 返回在无阻塞的理想情况下(不存在内存或资源约束)此队列能接受的其他元素数量。
int remainingCapacity()
// 从此队列中移除指定元素的单个实例(如果存在)。
boolean remove(Object o)
// 返回此队列中元素的数量。
int size()
// 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
E take()
// 返回一个按适当顺序包含此队列中所有元素的数组。
Object[] toArray()
// 返回一个按适当顺序包含此队列中所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
<T> T[] toArray(T[] a)
// 返回此 collection 的字符串表示形式。
String toString()

源代码分析

构造函数

ArrayBlockingQueue提供了三个构造函数。

public ArrayBlockingQueue(int capacity) {
        this(capacity, false);
    }

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    
    public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

其中,第一个构造函数只需指定队列(数组)初始化大小,这正是前面提到的“有界”的边界所在。同时,它调用了第二个构造函数,默认将fair参数传值为false。

fair是“可重入的独占锁(ReentrantLock)”的类型。fair为true,表示是公平锁;fair为false,表示是非公平锁。公平性通常会降低吞吐量,但是减少了可变性和避免了“不平衡性”。

构造函数中this.items对应的代码为:

/** The queued items */
    final Object[] items;

这是存储阻塞队列数据的数组。在第三个构造函数中提供了初始化队列数组中数据的方法。

加入队列

ArrayBlockingQueue提供了4个方法将元素添加入队列。

  • add(E e) :如果立即可行且不会超过该队列的容量,将指定的元素插入到队列的尾部。成功返回true,队列已满则抛出IllegalStateException("Queue full")异常。

  • offer(E e) :如果立即可行且不会超过该队列的容量,将指定的元素插入到此队列的尾部。成功返回true,队列已满则返回false。

  • offer(E e, long timeout, TimeUnit unit) :将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间

  • put(E e) :将指定的元素插入此队列的尾部,如果队列已满则等待可用的空间。成功则返回true,等待超时则返回false。

源代码说明:

// 方法一
public boolean add(E e) {
    return super.add(e);
}
// 方法二
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
// 方法三
public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();  // 一直等到获取锁
    try {
        if (count == items.length)  //假如当前容纳的元素个数已经等于数组长度,那么返回false
            return false;
        else {
            enqueue(e);		// 将元素插入到队列中,返回true
            return true;
        }
    } finally {
        lock.unlock();		//释放锁
    }
}

add方法调用了父类的add方法(方法二),通过父类的add方法可以得出,最终还是调用了offer方法(方法三)。可以看出,父类方法调用offer之后,如果offer返回false,则表示队列已满,父类方法会抛出异常。

而offer方法首先校验添加的对象是否为null,如果null则直接抛出空指针异常。然后获得锁进行队列大小(count记录了队列中元素的个数)比较,如果当前队列中的元素个数与count相等,则返回false,不进行插入。否则,将元素插入队列,并返回true。

下面再看一下offer中调用的enqueue方法:

private void enqueue(E x) {	

    //调用enqueue的方法都都已经进行过同步处理
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;		//putIndex是下一个被添加元素的坐标
    if (++putIndex == items.length)	
    //putIndex+1, 并且与数组长度进行比较,相同则从数组开头
        putIndex = 0;			//插入元素,这就是循环数组的奥秘了
    count++;				//当前元素总量+1
    notEmpty.signal();			//给等到在数组非空的线程一个信号,唤醒他们。
}

enqueue方法才是所有添加队列方法真正调用来操作添加的方法,其中putIndex是下一个被添加元素的坐标。整个方法的业务逻辑是这样的:首先将待添加的元素添加到putIndex所在的位置,并且对putIndex进行自增(指向下一个待添加的位置)。然后比较下一个待添加的位置是否和数组的长度相同,如果相同则将putIndex指向数组开头(进入此方法的前提条件是队列数组未满)。然后队列总量加1。

通过这段代码我们就可以真正了解到ArrayBlockingQueue是如何循环使用数组的。首先创建一个定长空数组,然后依次填满数组的0,1,2,……,items.length-1 位置。与此同时,队列中的0,1,2,……位置的元素也在不停的被消费掉。当数组的items.length-1也被填充了元素,次数队列依旧未满,那么新增的元素将放置在哪里?对了,就是像上面的代码一样,会从数组的0坐标重新依次开始添加新的元素。通过这种方式,ArrayBlockingQueue实现了在定长数组下FIFO的队列。

取出队列

ArrayBlockingQueue提供了以下方法支持取出队列:

  • poll() :获取并移除此队列的头,如果队列为空,则返回null。

  • poll(long timeout, TimeUnit unit) :获取并移除此队列的头部,在指定的等待时间前等待可用的元素,超时则返回null。

  • remove(Object o) :从此队列中移除指定元素的单个实例(如果存在多个则只移除第一个)。如果不存在要移除的元素则返回false。

  • take() :获取并移除此队列的头部,如果队列为空,则一直等待可用元素,也就是说必须要拿到一个元素,除非线程中断。

  • peek():获取队列中takeIndex(待获取元素索引)位置的元素,如果为null则返回空。

源代码:

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            do {
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}

下面重点分析一下取出队列原因调用的dequeue方法:

/**
 * Extracts element at current take position, advances, and signals.
 * Call only when holding lock.
 */
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    // 取出指定元素
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    // 并将取出元素索引内容置为null
    items[takeIndex] = null;
    // 将待取出索引+1,并与队列长度做比较,如果超出数组长度则从0重新开始
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    // 如果迭代器不为null,则进行迭代处理
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

在enqueue中解了添加元素进入队列的操作之后就不难理解从队列中取出数据的过程了。首先调用dequeue的取出操作,都会先将元素取出,然后再将数组对应位置置null。然后对takeIndex的位置进行后移1位,如果takeIndex处于数组的最后一位,则重新从0开始。

实战

在学习了基础理论知识之后,我们用一个实例来练习一下。

package com.secbro2.juc;

import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;

/**
 * @author zzs
 */
public class ArrayBlockingQueueDemo {

	private static Queue<String> queue = new ArrayBlockingQueue<String>(20);

	public static void main(String[] args) {
		new QueueThread("QTA").start();
		new QueueThread("QTB").start();
	}

	private static class QueueThread extends Thread {
		QueueThread(String name) {
			super(name);
		}

		@Override
		public void run() {
			for (int i = 0; i < 6; i++) {
				// 线程名称+序号
				String str = Thread.currentThread().getName() + "-" + i;
				queue.add(str);
				printQueue();
			}

		}
	}

	private static void printQueue() {
		StringBuilder sb = new StringBuilder();
		for (Object aQueue : queue) {
			sb.append(aQueue).append(",");
		}

		System.out.println(sb.toString());
	}
}

打印结果如下:

QTA-0,QTB-0,
QTA-0,QTB-0,
QTA-0,QTB-0,QTA-1,
QTA-0,QTB-0,QTA-1,QTB-1,
QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,
QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,QTB-2,
QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,QTB-2,QTA-3,
QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,QTB-2,QTA-3,QTB-3,
QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,QTB-2,QTA-3,QTB-3,QTA-4,QTB-4,
QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,QTB-2,QTA-3,QTB-3,QTA-4,QTB-4,
QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,QTB-2,QTA-3,QTB-3,QTA-4,QTB-4,QTB-5,
QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,QTB-2,QTA-3,QTB-3,QTA-4,QTB-4,QTB-5,QTA-5,
原文  http://www.choupangxia.com/topic/detail/30
正文到此结束
Loading...