阻塞队列|阻塞队列 BlockingQueue
1、简介
1-1、简介
阻塞队列是JDK的 concurrent 包下面提供的一组线程安全的队列。
其最大的特点为阻塞。
即当获取队列元素但是队列为空时,会阻塞当前线程,等待队列中有元素再返回;
当添加元素但是队列已满时,会阻塞当前线程,等待队列可以放入新元素时再放入。
1-2、应用场景
阻塞队列主要是设计用来实现生产者-消费者队列的。
典型的一个应用是用来实现线程池的等待队列(workQueue)。可以参看这里第二章的说明:java线程池的拒绝策略
1-3、主要操作
concurrent 包下面的 BlockingQueue.java 是JDK提供的接口,约定了所有阻塞队列的基本操作。
Throws exception | Special value | Blocks | Times out | |
---|---|---|---|---|
Insert | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove | remove() | poll() | take() | poll(time, unit) |
Examine | element() | peek() | not applicable | not applicable |
对于队列的操作有三类:添加(Insert)、取出(Remove)、读(Examine)。
每一类操作在不同的场景下有不同的methods供调用。
- Throws exception:这一组操作在遇到异常时(add时队列已满;remove时队列已空)会抛出异常
- Special value:这一组操作在遇到异常时会返回特殊值(null 或 true/false),比如offer时队列已满返回false;poll时队列已空会返回null。
- Blocks:put在队列已满时阻塞;take在队列已空时阻塞
- Times out:阻塞等待,直到成功或者超时(阻塞时间满time),超时后直接返回false。
- BlockingQueue 不接受 null 值的插入。在“Special value”这种场景下, null 值作为poll()方法的特殊值使用,如果插入的是null,就无法区分究竟是获取失败,还是获取到的就是null。
- BlockingQueue 的实现都是线程安全的。但是一些实现里面提供的批量操作,如 addAll, containsAll, retainAll 和 removeAll 不一定是原子操作
文章图片
BlockingQueue的UML图 可以看到,阻塞队列的绝大多数实现,都是继承 AbstractQueue 抽象类,实现 BlockingQueue 接口。
3、各种具体实现 下面逐个说明每一种阻塞队列的特点以及其具体实现。
3-1、ArrayBlockingQueue ArrayBlockingQueue 是 BlockingQueue 接口的有界队列实现,先进先出(FIFO Queue)。
底层采用数组来实现队列功能。
ArrayBlockingQueue 在并发控制上采用可重入锁(ReentrantLock),添加和取出操作都在同一把锁的控制下进行。
与另一种阻塞队列的经典实现LinkedBlockingQueue相比,ArrayBlockingQueue的构造函数中支持指定公平锁策略。
如果指定了“公平锁” = true 的话,那么所有等待添加/取出的线程也会遵守FIFO原则,先到的先操作。公平锁可以保证等待时间最长的线程优先获取锁,防止“饿死”,但是其效率稍低,需要根据实际场景取舍。
以下为相关的源码分析,说明以注释方式写入了代码中。
关键成员变量:
/** 实现队列的数组 */
final Object[] items;
/** 由于是使用数组模拟队列,因此这个变量相当于一个游标,指示队列取出时的具体位置(数组的下标) */
int takeIndex;
/** 指示队列添加时的具体位置(数组的下标) */
int putIndex;
/** 队列(数组)中当前存储的元素数量 */
int count;
/** 用于保证线程安全的可重入锁 */
final ReentrantLock lock;
/** 配合可重入锁的Condition,控制取出线程 */
private final Condition notEmpty;
/** 配合可重入锁的Condition,控制添加线程 */
private final Condition notFull;
以put(e)为例,看一下具体的操作细节:
// 添加元素
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 添加操作获取锁
lock.lockInterruptibly();
try {
// 元素数量==数组长度,说明队列中元素已满,调用notFull.await()阻塞添加线程,将控制权返还
while (count == items.length)
notFull.await();
// 正常获取锁,并且可以添加的情况下进行操作
enqueue(e);
} finally {
// 解锁
lock.unlock();
}
}// 添加操作
private void enqueue(E x) {
final Object[] items = this.items;
// 根据游标添加新元素
items[putIndex] = x;
// 游标++;如果游标已经到达了数组最后,则返回数组的开头(利用数组模拟队列)
if (++putIndex == items.length)
putIndex = 0;
// 队列中元素数量++
count++;
// 唤醒等待中的取出线程
notEmpty.signal();
}
3-2、LinkedBlockingQueue LinkedBlockingQueue 的特性与 ArrayedBlockingQueue 差不多。都是BlockingQueue最基础的实现。
绝大多数场景下 LinkedBlockingQueue 的存取效率优于 ArrayedBlockingQueue。
主要差别在于:
- 底层的数据结构是一个单向列表。
- 不提供公平锁策略
- 可以用于实现有界队列,也可以实现无界队列,取决于使用的构造函数。设定为无界队列队列时,其元素最大数量为 Integer.MAX_VALUE。注意:无界危险,可能造成OOM,慎用!
(其实ABQ也完全可以改造成双锁的,进行诸如把count的类型改造成AtomicInteger之类的改造是完全可行的)
关键成员变量:
/**
* 节点
*/
static class Node {
E item;
Node next;
Node(E x) { item = x;
}
}/** 队列长度,默认为 Integer.MAX_VALUE */
private final int capacity;
/** 队列中的元素数量,注意使用了原子类型 AtomicInteger */
private final AtomicInteger count = new AtomicInteger();
/**
* 头结点
*/
transient Node head;
/**
* 尾节点
*/
private transient Node last;
/** take, poll 等取出操作使用的锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 配合takeLock锁使用的Conditon */
private final Condition notEmpty = takeLock.newCondition();
/** put, offer 等添加操作使用的锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 配合putLock锁使用的Conditon */
private final Condition notFull = putLock.newCondition();
仍然以put(e)为例,看一下具体的操作细节:
// 添加元素。由于使用了两把锁,所以与ABQ相比一些细节有变化
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// 使用int类型的本地变量记录元素数量
int c = -1;
Node node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 获取添加锁
putLock.lockInterruptibly();
try {
// 元素数量=队列长度,说明队列已满,阻塞当前线程
while (count.get() == capacity) {
notFull.await();
}
// 添加元素的方法(里面直接一句 last = last.next = node;
解决问题)
enqueue(node);
// 原子操作,队列元素数量+1,【同时返回+1前的值给c】←注意正确理解这个操作
c = count.getAndIncrement();
// 元素数量<队列长度,说明还可以继续添加,唤醒其他添加线程
if (c + 1 < capacity)
notFull.signal();
} finally {
// 释放添加锁
putLock.unlock();
}
// c=0,意味着这次添加元素之前,队列是空的,那么就有可能有取出线程在阻塞,所以此处有唤醒操作
if (c == 0)
signalNotEmpty();
}
LBQ使用两把锁,造成它的remove()等方法略有不同,因为remove操作要移动的位置不固定,所以必须将两把锁都加锁,才能保证线程安全。
public boolean remove(Object o) {
if (o == null) return false;
// 这里,两把锁都要抓,两把锁都要硬
fullyLock();
try {
for (Node trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}void fullyLock() {
putLock.lock();
takeLock.lock();
}
3-plus、为什么ABQ和LBQ的锁机制不一样? 这个问题,网上去搜一下就能发现是没有定论的(包括OverStackFlow上)。
主要的讨论点肯定都是集中在性能上面,但是很难有一个明确的结论。
所以,只能去找原作者问了……
3-3、SynchronousQueue 同步队列。
这是一种比较特殊的阻塞队列。
它的同步指的是取出线程和添加线程需要同步,一个取出线程匹配一个添加线程。
也就是说一个线程往队列中添加一个元素时,添加操作不会立即返回,需要等待另一个线程来将这个元素取出;
一个取出线程操作的时候,同样需要一个相匹配的添加线程。
SynchronousQueue 队列是“虚”的,不提供任何空间(一个都没有)来存储元素。Capacity=0。它只是提供两个线程进行信息交换的场所。
【阻塞队列|阻塞队列 BlockingQueue】数据必须从某个添加线程交给某个取出线程,而不是写到某个队列中等待被消费。
不能在 SynchronousQueue 中使用 peek 方法,peek 的语义是只读取不移除,这个方法的语义不符合 SynchronousQueue 的特征。
所以peek()直接返回null
public E peek() {
return null;
}
SynchronousQueue 也不能被迭代,因为根本就没有元素可以做迭代。
SynchronousQueue的一个典型应用,是使用newCachedThreadPool()方法构建缓存线程池。
newCachedThreadPool()构建线程池时,使用的构造函数是这样的:
ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue())
也就是说线程池核心线程数量是0,即不保有固定数量的线程,随需创建;可创建的线程数量上限为Integer.MAX_VALUE;空闲的线程最多存活60秒;未消费的任务在SynchronousQueue中等待。
简单看一下SynchronousQueue的实现。
SynchronousQueue定义了一个核心的抽象类和抽象方法:
abstract static class Transferer {
// 这个方法用于转移元素,从生产者手上转到消费者手上
// 也可以被动地,消费者调用这个方法来从生产者手上取元素
// 第一个参数 e 如果不是 null,代表场景为:将元素从生产者转移给消费者
//如果是 null,代表消费者等待生产者提供元素,然后返回值就是相应的生产者提供的元素
// 第二个参数代表是否设置超时,如果设置超时,超时时间是第三个参数的值
// 返回值如果是 null,代表超时,或者中断。具体是哪个通过检测中断状态得到
abstract E transfer(E e, boolean timed, long nanos);
}
SynchronousQueue是可以指定公平锁策略的,公平锁模式下使用TransferQueue实现Transferer,而非公平锁模式下使用TransferStack实现Transferer。
static final class TransferQueue extends Transferer {
……
}static final class TransferStack extends Transferer {
……
}
之后的具体实现比较复杂,贴上来也意义不大,主要说一下TransferQueue中transfer方法的思路。
先看一下TransferQueue中的节点定义:
static final class QNode {
volatile QNode next;
// next node in queue
volatile Object item;
// CAS'ed to or from null
volatile Thread waiter;
// 节点中会保存线程对象的引用,用于阻塞/唤醒
final boolean isData;
// isData = https://www.it610.com/article/true 是写入线程,否则为取出线程
然后就是transfer方法:
- 调用transfer法时,如果队列为空,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而队列中的元素isData = https://www.it610.com/article/true)。这种情况下,将当前线程加入到等待队列即可。
- 如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是取出操作线程,当前线程是添加操作线程)。这种情况下,匹配等待队列的队头,出队,返回相应数据。
3-4、LinkedTransferQueue(TransferQueue) TransferQueue 是一个继承 BlockingQueue 的接口,它目前的唯一实现是 LinkedTransferQueue。
TransferQueue 是一种重要的队列,它提供了一个场所,生产者线程使用transfer方法传入一些对象并阻塞,直至这些对象被消费者线程全部取出。上面的同步队列SynchronousQueue就像是一个容量为0的TransferQueue。
从表现出来的行为上来看,TransferQueue是ConcurrentLinkedQueue, 公平锁模式的SynchronousQueue, 无界的LinkedBlockingQueue等队列的超集。
TransferQueue的实现思路其实跟上面SynchronousQueue的思路很像,这里同样不贴代码了,主要列一下其用法。
- transfer(E e),若当前存在一个正在等待获取的消费者线程,即立刻移交;否则会将元素e插入到队列尾部,并进入阻塞状态,直到有消费者线程取走该元素。
- tryTransfer(E e),若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),即立刻移交; 否则返回false,并且不进入队列,这是一个非阻塞的操作。
- tryTransfer(E e, long timeout, TimeUnit unit) 若当前存在一个正在等待获取的消费者线程,即立刻移交;否则会将元素e插入到队列尾部,并且等待被消费者线程获取消费掉,若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素被移除。
- hasWaitingConsumer() 判断是否存在消费者线程。
- getWaitingConsumerCount() 获取所有等待获取元素的消费线程数量。
public class TransferQueueExam {
public static void main(String[] args) {
TransferQueue queue = new LinkedTransferQueue<>();
ExecutorService service = Executors.newCachedThreadPool();
service.submit(new Producer("Producer1", queue));
service.submit(new Consumer("Consumer1", queue));
service.submit(new Consumer("Consumer2", queue));
service.shutdown();
}static class Producer implements Runnable {
private final String name;
private final TransferQueue queue;
Producer(String name, TransferQueue queue) {
this.name = name;
this.queue = queue;
}
@Override
public void run() {
System.out.println("begin transfer objects");
try {
for (int i = 0;
i < 10;
i++) {
queue.transfer("ABCD_" + i);
System.out.println(name + " transfer "+"ABCD_"+i);
}
System.out.println("after transformation");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name + " is over");
}
}static class Consumer implements Runnable {
private final String name;
private final TransferQueue queue;
private static Random rand = new Random(System.currentTimeMillis());
Consumer(String name, TransferQueue queue) {
this.name = name;
this.queue = queue;
}@Override
public void run() {
try {
for (int i = 0;
i < 5;
i++) {
String str = queue.take();
System.out.println(name + " take " + str);
TimeUnit.SECONDS.sleep(rand.nextInt(5));
}
System.out.println(name + " is over");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
3-5、PriorityBlockingQueue 优先级阻塞队列。是一种带排序的 BlockingQueue 实现。是 PriorityQueue 的线程安全版本。
PriorityBlockingQueue具有以下特点:
- 队列为无界队列。只能指定初始的队列大小,插入元素时,如果空间不够就会自动扩容。
- 插入队列的对象必须是可比较大小的(comparable)。
- 队列中的元素总是按照“自然顺序”进行排序,或者根据构造函数中给定的Comparator进行排序。即“权重队列”。
- 因为它是无界队列,所以插入操作 put 方法不会阻塞(take 方法在队列为空的时候会阻塞)。
- 取出时根据优先级(权重)出队,而不是FIFO。
下面结合源码看一下其具体实现,主要关注点是扩容。
关键成员变量:
/**
* 队列的默认初始大小。至于为什么是11????不理解
*/
private static final int DEFAULT_INITIAL_CAPACITY = 11;
/**
* 队列最大长度。由于要存储一些头部信息,所以有 "- 8"
*/
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/**
* 基于数组的小顶堆。
* queue[n]的左右孩子为queue[2*n+1] and queue[2*(n+1)],父节点为 queue[(n-1)/2]
* 代码中给出的父节点index公式是这样的 int parent = (n - 1) >>> 1
*/
private transient Object[] queue;
/**
* 元素数量
*/
private transient int size;
/**
* 用于排序的comparator,如果为null则使用元素的“自然顺序”
*/
private transient Comparator super E> comparator;
/**
* 添加和取出操作公用的可重入锁
*/
private final ReentrantLock lock;
/**
* 配合锁使用的取出Condition(添加是无条件动作,不用阻塞,因此不用Condition)
*/
private final Condition notEmpty;
/**
* 扩容时,CAS操作使用的自旋锁
*/
private transient volatile int allocationSpinLock;
put操作无阻塞,直接调用了offer方法。
public void put(E e) {
offer(e);
// never need to block
}
其实add和带超时的offer方法都同样直接调用了这个offer方法。
对于offer方法,最重要的是理解其中扩容时对锁的操作。
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
// 获得锁
lock.lock();
int n, cap;
Object[] array;
// size大于等于数组的长度时扩容
while ((n = size) >= (cap = (array = queue).length))// Doug Lea 你写这样的代码真的不会被打死么……
// ☆ 扩容。注意这里传入的是quque的复制品array,这一点结合tryGrow方法来看
tryGrow(array, cap);
try {
Comparator super E> cmp = comparator;
if (cmp == null)
// 默认排序方法。这里面主要是堆的操作,此处不展开
siftUpComparable(n, e, array);
else
// 自定义排序方法
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
// 唤醒被阻塞的取出线程
notEmpty.signal();
} finally {
// 释放锁
lock.unlock();
}
return true;
}
private void tryGrow(Object[] array, int oldCap) {
// 首先释放了刚才已经获得的独占锁,使得扩容操作和取出操作可以同时进行,提高吞吐量。
lock.unlock();
// 创建扩容后的新数组
Object[] newArray = null;
// 用 CAS 操作将 allocationSpinLock 由 0 变为 1,相当于获取扩容使用的自旋锁
// 也就是说,允许其他线程执行添加操作,但是扩容只能由一个线程来操作
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// 根据规则计算扩容后的数组容量 newCap
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) {// possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
// 注意这里的条件 queue == array 。
// 如果 queue != array,说明有其他线程也在做扩容,并且已经分配完内存空间,quque被指向了新空间
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
// 重置为0,相当于释放自旋锁
allocationSpinLock = 0;
}
}
// newArray == null 说明上面没有执行 newArray = new Object[newCap],即有其他线程正在进行扩容操作
if (newArray == null)
// 扩容失败时,让出CPU控制权,线程由运行中状态变为就绪状态
Thread.yield();
// 重新获取最开始释放掉的独占锁
lock.lock();
// 扩容成功后调用native方法做数组复制
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
// 本线程扩容失败后,会退到外层方法offer()中,如果此时扩容仍未完成,
// 会继续在 while ((n = size) >= (cap = (array = queue).length)) 中循环,直到扩容完成,添加元素成功
}
3-6、DelayQueue 延时优先级阻塞队列。是一种支持延时获取元素的无界阻塞队列。
DelayQueue队列中只能存入实现了Delayed接口的对象。
public class DelayQueue extends AbstractQueue
Delayed接口的定义如下:
public interface Delayed extends Comparable {
long getDelay(TimeUnit unit);
}
其继承的Comparable接口定义如下:
public interface Comparable {
public int compareTo(T o);
}
因此DelayQueue中存入的对象要同时实现getDelay和compareTo两个方法。
getDelay方法是用来检测队列中的元素是否到期;compareTo方法是用来给队列中的元素进行排序。
DelayQueue实现的思路大致如下:
持有一个PriorityQueue,每个Delayed对象实际上都放入了这个队列,队列中的对象按照优先级(按照compareTo)进行了排序,队列头部是最先超时的对象。
当队列中对象的getDelay方法返回的值小于等于0(即对象已经超时)时,才可以将对象从队列中取出。若使用take方法,则方法会一直阻塞,直到队列头部的对象超时被取出;若使用poll方法,则当没有超时对象时,直接返回null。
另外因为DelayQueue是无界队列,所以put操作是非阻塞的,直接调用offer方法。
下面是一个使用例子:
public class DelayQueueExam {private static Random rand = new Random(System.currentTimeMillis());
public static void main(String[] args) throws InterruptedException {
DelayQueue queue = new DelayQueue<>();
for (int i = 0;
i < 10;
i++) {
int rndDelayTime = rand.nextInt(1000 * (i + 1));
queue.put(new DelayElement(rndDelayTime, "DelayElement_" + i + "_" + rndDelayTime));
}
while (!queue.isEmpty()) {
DelayElement delayElement = queue.take();
System.out.println(delayElement.getName());
}
}static class DelayElement implements Delayed {
private long expired;
private final String name;
DelayElement(int delay, String name) {
this.name = name;
expired = System.currentTimeMillis() + delay;
}public String getName() {
return name;
}@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expired - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}@Override
public int compareTo(Delayed o) {
long d = (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
}
}
可以看到take的时候,是按照rndDelayTime的升序输出的,而不是添加元素时候的index。
DelayElement_6_62
DelayElement_0_293
DelayElement_5_328
DelayElement_1_497
DelayElement_2_1261
DelayElement_4_1945
DelayElement_3_2563
DelayElement_8_2778
DelayElement_9_3049
DelayElement_7_7060Process finished with exit code 0
推荐阅读
- 《数据结构与算法之美》——队列
- Redis——发布订阅/消息队列
- MQ(消息队列)功能介绍
- Java深入了解数据结构之栈与队列的详解
- 阻塞与非阻塞,同步与非同步,你真的懂吗()
- 并发,并行,阻塞,非阻塞,异步,同步
- 基于rabbitmq实现的延时队列(golang版)
- (转)蚂蚁金服(消息队列事务型消息原理浅析)
- JUC--CLH同步队列
- Code|Code Forces-681C(模拟题,优先队列,设计STL)