阻塞队列(BlockingQueue)
什么是阻塞队列?
阻塞队列 是一种数据结构,它是一个队列,可以存放0到N个元素。我们可以对这个队列执行插入或弹出元素操作,弹出元素操作就是获取队列中的第一个元素,并且将其从队列中移除;而插入操作就是将元素添加到队列的末尾。当队列中没有元素时,对这个队列的弹出操作将会被阻塞,直到有元素被插入时才会被唤醒;当队列已满时,对这个队列的插入操作就会被阻塞,直到有元素被弹出后才会被唤醒。
在线程池中,往往就会用阻塞队列来保存那些暂时没有空闲线程可以直接执行的任务,等到线程空闲之后再从阻塞队列中弹出任务来执行。一旦队列为空,那么线程就会被阻塞,直到有新任务被插入为止。
在多线程中,阻塞的意思是,在某些情况下会挂起线程,一旦条件成熟,被阻塞的线程就会被自动唤醒。
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。
在阻塞队列不可用时,这两个附加操作提供了4种处理方式

**抛出异常:**是指当阻塞队列满时候,再往队列里插入元素,会抛出 IllegalStateException(“Queue full”) 异常。当队列为空时,从队列里获取元素时会抛出 NoSuchElementException 异常 。
**返回特殊值:**插入方法会返回是否成功,成功则返回 true。移除方法,则是从队列里拿出一个元素,如果没有则返回 null
**一直阻塞:**当阻塞队列满时,如果生产者线程往队列里 put 元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里 take 元素,队列也会阻塞消费者线程,直到队列可用。
**超时退出:**当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。
如果是无界阻塞队列,队列不可能会出现满的情况,所以使用put或offer方法永远不会被阻塞,而且使用offer方法时,该方法永远返回true
JDK7 提供了 7 个阻塞队列
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列(默认值是Integer.MAX_VALUE)。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
DelayQueue:一个使用优先级队列实现的无界阻塞队列。
SynchronousQueue:一个不存储元素的阻塞队列 , 单个元素的队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
ArrayBlockingQueue 与LinkedBlockingQueue
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
LinkedBlockingQueuefairQueue = new LinkedBlockingQueue(1000,true);
ArrayBlockingQueue 和LinkedBlockingQueue区别:
底层实现不同
ArrayBlockingQueue 底层使用数组来维护队列,
LinkedBlockingQueue 底层使用链表来维护队列,在添加和删除队列中的元素的时候,会创建和销毁节点对象,在高并发和大量数据的时候,GC压力很大。
锁的方式不同
ArrayBlockingQueue 获取数据和添加数据都是使用同一个锁对象,这样添加和获取就不是一个并发的过程,不过,在ArrayBlockingQueue 中使用Condition的等待/通知机制,这样使得ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜
LinkedBlockingQueue 获取数据和添加数据使用不同的锁对象。
SynchronousQueue
SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。它支持公平访问队列。默认情况下线程采用非公平性策略访问队列。使用以下构造方法可以创建公平性访问的SynchronousQueue,如果设置为true,则等待的线程会采用先进先出的顺序访问队列
SynchronousQueue<E> queue = new SynchronousQueue<E>(true);
**SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。**队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。
为什么使用堵塞队列?
在多线程领域,所谓堵塞就是在某些情况下挂起线程(堵塞),一旦条件满足,被挂起的线程又会自动被唤醒。
为什么使用BlockingQueue
好处是我们不需要关系什么时候需要堵塞线程,什么时候需要唤醒线程,因为这一切都没BlockingQueue包办了,在JUC包发布之前,在多线程环境下,我们需要手动去控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度。
堵塞队列版生产消费模式:
public class ProdAndCons {
public static void main(String[] args) {
//设置长度
MyResource myResource = new MyResource(new ArrayBlockingQueue<String>(10));
new Thread(() ->{
try {
myResource.prod();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"prod").start();
new Thread(() ->{
try {
myResource.cons();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"cons").start();
//5s后主线程叫停任务
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("5s 停止任务");
myResource.stop();
}
}
//资源类
class MyResource{
//设置标志位,volatile保证其可见性,可以随时关停生产消费模式,默认为true
private volatile boolean FLAG = true;
//多线程中i++,++i是不安全的,因此使用原子类进行操作
private AtomicInteger atomicInteger = new AtomicInteger();
//设置堵塞队列,产品进入堵塞队列,参数为接口,即支持7种堵塞队列
BlockingQueue <String> blockingQueue = null;
public MyResource(BlockingQueue <String> blockingQueue){
this.blockingQueue = blockingQueue;
}
//生产者模式 , 生产一个休眠1s 使之生产一个消费一个
public void prod() throws InterruptedException {
//生产的数据
String data;
//判断是否插入成功
boolean result ;
//首先判断是否为启动状态
while (FLAG){
//设置生产的数据
data = atomicInteger.getAndIncrement() + "号数据";
result = blockingQueue.offer(data);
if(result){
System.out.println("添加成功!数据为: "+data);
}else{
System.out.println("已满");
return;
}
// 休眠1s 生产一个 消费一个
Thread.sleep(1000);
}
System.out.println("生产模式手动关闭");
}
//生产者模式 ,2s后收不到信息后,结束
public void cons() throws InterruptedException {
//生产的数据
String data ;
//判断是否插入成功
boolean result ;
//首先判断是否为启动状态
while (FLAG){
//取数据,设置超时时间为2L
data = blockingQueue.poll(2, TimeUnit.SECONDS);
if(data == null ){
System.out.println("2S后没有消费数据,退出");
//退出指 生产消费全部停止
FLAG = false;
return;
}else{
System.out.println("数据消费完成:!"+data);
}
}
System.out.println("2s内没有获取到数据,消费模式关闭");
}
//停止方法
public void stop(){
FLAG = false;
}
手撕一个堵塞队列,基于LinkedList实现:
底层维护一个Linked
private LinkedList<E> queue = new LinkedList<E>();Linked提供了addLast ,remoceFirst方法。如果阻塞队列内部的容器元素大于或者等于maxSize,此时等待消费者将元素。
如果内部容器是空的,等待消费者生产元素。
//手撕堵塞队列(维护一个ArrayList)
class MyBlockingQueue<E> {
//设置队列长度
private int size ;
//设置默认长度
private static final int MAX_SIZE = 10;
//LinkedList
private LinkedList<E> queue = new LinkedList<E>();
//使用锁
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
public MyBlockingQueue(int size){
this.size = size;
}
public MyBlockingQueue(){
this(MAX_SIZE);
}
/**
* 如果阻塞队列内部的容器元素大于或者等于maxSize,此时等待消费者将元素
* 取出
* @param e 放入阻塞队列中的元素
*/
public boolean offer(E e) throws InterruptedException {
lock.lock();
try {
//队列满,使用wait方法堵塞
while (queue.size() >= 10) {
condition1.await();
return false;
}
queue.addLast(e);
//通知消费者进行消费
condition2.signal();
} finally {
lock.unlock();
}
return true;
}
/**
* 如果内部容器是空的,等待消费者生产元素。
* 无论是添加还是获取元素的操作,在哪之后都需要
* 主动调用notifyAll,让等待取消
* @return
*/
public E take() throws InterruptedException {
lock.lock();
E data;
try {
//队列满,使用wait方法堵塞
while (queue.isEmpty()) {
condition2.await();
}
data = queue.removeFirst();
//通知消费者进行消费
condition1.signal();
} finally {
lock.unlock();
}
return data;
}
}