

BlockingQueue的实现类ArrayBlockingQueue,见名思意,是基于数组实现的BlockingQueue。在《BlockingQueue初识》中,对BlockingQueue进行了介绍,BlockingQueue定义和约束了相关方法的行为。学习ArrayBlockingQueue,就是观察其是如何借助数组去实现BlockingQueue中定义和约束的方法的。
ArrayBlockingQueue借助了ReentrantLock与condition来保证线程安全以及实现阻塞等待的功能,关于ReentrantLock的介绍请戳《JUC之ReentrantLock》
█ 字段
在ArrayBlockingQueue中定义了如何几个字段。
final Object[] items;
int takeIndex;
int putIndex;
int count;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
transient Itrs itrs = null;- items
数组items,用于存在BlockingQueue中的元素。ArrayBlockingQueue借助于数组,就是借助items数组来存放元素。往ArrayBlockingQueue中添加和移出元素就是向items数组中存放和取出元素数据。items数组的大小在初始化的时候便确定了长度,无法扩容和缩容改变,即items数组是一个定长数组。
- takeIndex
一个items数组下标,用于标记下一次该被取出(take, poll, peek 或remove)的元素的位置。
- putIndex
一个items数组下标,用于标记下一次添加(put, offer或 add)的元素该存放的位置。
- count
ArrayBlockingQueue中元素的个数。(注意这个count是items数组的长度,count的大小小于或等于items数组的长度)当count等于0时,表示items数组中还没有任何元素,当count等于items数组的长度则表示数组已满。
- lock、notEmpty、notFull
BlockingQueue的具有阻塞等待的Queue,ArrayBlockingQueue实现阻塞等待的功能借助了ReentrantLock与Condition实现线程间的通信。
- itrs
迭代器,用于遍历ArrayBlockingQueue。
补充:
关于定义takeIndex与putIndex来标记数组中取元素和添加元素的位置,是因为将items数组看成一个环形。关于往往定长数组中存放和取出元素,有两种实现方式。
(1)取出元素始终取0号下标的元素,此时只需要标记下次元素可以存放的位置putIndex即可。

①如上图始终取数组0号下标的元素,第一次取出0号下标的元素aa,则数组变成了如下图这样。此时0号下标的位置就没有元素了。

②要想下次继续从0号位置取到元素,那么数组将数组下标1、2、3的位置的元素都像前移动,则变成:

可见此种方式,每次取出元素之后,都要移动元素位置,进行数组拷贝。但是其有个方便的地方,就是当putIndex等于数组长度-1的时候,即表示数组已经满了。
(2)增加一个takeIndex来标记下次取出元素的位置。
记录下一次取出元素的位置,而不是每次都从0号下标的位置来取元素,这样可以避免数组的移动,即数组拷贝的消耗。

①取出元素aa

②添加元素ee

此方式不用数组频繁的进行数据的拷贝。但是判断数组满的条件就负责了,不能仅仅依靠putIndex等于数组的长度-1条件来判断了。如上图,putIndex=5即等于了数组的长度-1了,但此时数组下标为0的地方还没有存放元素,数组还没有满。关于这种方式如何判断数组为满的条件,在ArrayBlockingQueue中维护一个count字段来记录元素的个数,当count等于items数组的长度时,即表明数组已经满了无法继续添加元素。对于上图的这种情况,当再添加元素的时候,0号下标可以继续存放元素,此时putIndex=0了。想想这样的过程是不是将数组看成了环形。
ArrayBlockingQueue为了考虑效率,避免频繁的进行数组元素的移动和拷贝,所以其使用了第二种方式,定义了takeIndex和putIndex。
█ 方法
介绍ArrayBlockingQueue中的方法,分为Queue接口中定义和约束的方法,和
BlockingQueue中定义和约束的方法。
(1)Queue中定义的方法,不具有阻塞等待的行为。
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();- offer
往ArrayBlockingQueue中添加元素。当items数组满无法继续添加元素时,方法返回false,添加元素成功返回true。
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}使用ReentrantLock互斥锁保证线程安全。当count等于items数组的长度时,表示数组已经满了,无法继续添加元素了,方法返回false。count小于数组的长度,则表示数组没满,可以添加元素。enqueue向items数组中添加元素。
enqueue:
private void enqueue(E x) {
final Object[] items = this.items;
// putIndex标记了可以存放元素的数组下标
items[putIndex] = x;
// 将putIndex自增1。
// 判断条件即putIndex+1=items.length
if (++putIndex == items.length)
// putIndex自增1之前已经位于数组的最后一个下标了,因为是定长数组,所以
// 下一次可以存放元素的位置回到了数组的开头,即下标为0的位置。
putIndex = 0;
// 将元素的个数加1
count++;
// 唤醒此时在等待取元素的线程
notEmpty.signal();
}- add
向ArrayBlockingQueue中添加元素,如果items数组已经满了,无法继续添加元素,则抛出IllegalStateException异常。
public boolean add(E e) {
return super.add(e);
}super.add即调用了AbstractQueue的add方法:
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}add方法又调用了offer方法。offer当数组满了无法继续添加元素的时候,会返回false,对于add方法则会抛出IllegalStateException("Queue full")异常。
- poll
从队列中取出元素,当队列中没有元素可以取出的时候,返回null。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}count == 0成立时,则表示items数组中还可以元素,返回null。否则取出数组中的第一个元素。
dequeue:
private E dequeue() {
final Object[] items = this.items;
// takeIndex标记了可以被取出的元素的位置
E x = (E) items[takeIndex];
items[takeIndex] = null;
// 超过了数组的尾部,要回到数组的开头
if (++takeIndex == items.length)
takeIndex = 0;
// 数量减1
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒等待添加元素的线程
notFull.signal();
return x;
}- remove
取元素,当items数组中没有元素可以取出的时候,抛出NoSuchElementException异常。ArrayBlockingQueue没有重写remove方法,而是直接使用了AbstractQueue中的remove方法:
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}- peek
取出数组中第一个元素,即takeInde下标位置的元素。当items数组中没有元素的时候,返回null。
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}itemAt:
final E itemAt(int i) {
return (E) items[i];
}- element
取出数组中第一个元素,当数组为空取出到元素的时候,抛出NoSuchElementException异常。element也是在AbstractQueue实现的方法:
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}(2)BlockingQueue中定义的方法,具有阻塞等待的行为。
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit)
throws InterruptedException; - put
向items数组中添加元素,如果数组满了无法继续添加,则会一直阻塞等待,直到数组有了空闲位置插入元素成功。
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}count == items.length成立,则表明数组已满,无法继续添加元素,则调用notFull.await(),当前线程阻塞等待,直到有其他线程调用了notFull.signal方法唤醒该线程。前面的介绍中,在dequeue中成功取出一个元素之后,会调用notFull.signal。添加元素仍然是调用enqueue。
- offer
与put不同,offer方法提供了超时等待。在一定时间内线程会被唤醒,继续尝试添加元素。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}借助了Condition的awaitNanos来实现超时等待的功能,并使用了while自旋。
- take
取元素,当items数组中没有元素可以取出时,会一直阻塞等待,直到数组中有元素可以被取出。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}count == 0成立,则表示items数组为空,没有任何元素。notEmpty.await使当前线程进入阻塞等待,直到notEmpty.signal被调用。在enqueue方法中,当成功添加了一个元素后,会调用notEmpty.signal。
- poll
具有超时等待的取元素。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}补充:
在ArrayBlockingQueue的一些方法中使用的是同一个ReentrantLock对象,因此在ReentrantLock的作用下,ArrayBlockingQueue所有的这些方法同时只能被一个线程访问。即一个线程访问了take方法,并执行到了lock.lockInterruptibly获取锁成功,在其还没有调用lock.unlock()释放锁之前,当其他线程在同时去访问poll方法的时候,执行到lock.lockInterruptibly()会阻塞等待,因为此时无法获取到锁。
█ 构造器
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}ArrayBlockingQueue提供了三个构造器来创建对象。在构造器中初始化items数组的大小,来设置ReentrantLock锁的性质,是公平锁还是非公平锁。
本文介绍了ArrayBlockingQueue中的主要几个方法,其他方法可以查看JDK源码学习。