数据结构|数据结构 - PriorityBlockingQueue 优先级阻塞队列

数据结构|数据结构 - PriorityBlockingQueue 优先级阻塞队列
文章图片
简介 PriorityBlockingQueue 是优先级阻塞队列,虽然我们称它为无界,实际上它也是有界的。它跟PriorityQueue 最大的区别在于他是线程安全的,在入队出队时使用同一把锁,在扩容时先解锁,再使用cas原子操作,再重新获取锁。
PriorityBlockingQueue 类

public class PriorityBlockingQueue extends AbstractQueue

只继承AbstractQueue抽象类
PriorityBlockingQueue 属性
// 默认初始长度 private static final int DEFAULT_INITIAL_CAPACITY = 11; // 最大长度 private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; // 元素数组 private transient Object[] queue; // 元素个数 private transient int size; // 比较器 private transient Comparator comparator; // 锁 private final ReentrantLock lock; // 空限制条件 private final Condition notEmpty; // 自旋扩容锁 private transient volatile int allocationSpinLock; // 序列化 private PriorityQueue q;

PriorityBlockingQueue 构造函数 无参构造函数
public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); }

带有初始长度构造函数
public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); }

有初始长度和比较器的构造函数
public PriorityBlockingQueue(int initialCapacity, Comparator comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); // 初始化非公平锁 this.lock = new ReentrantLock(); // 初始化空监控条件 this.notEmpty = lock.newCondition(); // 比较器 this.comparator = comparator; // 初始化元素数组 this.queue = new Object[initialCapacity]; }

线性集合的构造函数
public PriorityBlockingQueue(Collection c) { // 初始化非公平锁 this.lock = new ReentrantLock(); // 初始化空监控条件 this.notEmpty = lock.newCondition(); boolean heapify = true; // true if not known to be in heap order boolean screen = true; // true if must screen for nulls if (c instanceof SortedSet) { // 找线性集合比较器 SortedSet ss = (SortedSet) c; this.comparator = (Comparator) ss.comparator(); heapify = false; } else if (c instanceof PriorityBlockingQueue) { // 原来就是阻塞优先级队列,找原比较器 PriorityBlockingQueue pq = (PriorityBlockingQueue) c; this.comparator = (Comparator) pq.comparator(); screen = false; if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } // 线性集合转数组 Object[] a = c.toArray(); int n = a.length; // 兼容老版本BUG if (a.getClass() != Object[].class) a = Arrays.copyOf(a, n, Object[].class); // 查找是否有空元素,有空元素抛异常 if (screen && (n == 1 || this.comparator != null)) { for (int i = 0; i < n; ++i) if (a[i] == null) throw new NullPointerException(); } // 初始化数组和元素个数 this.queue = a; this.size = n; // 数组中建堆 if (heapify) heapify(); }

PriorityBlockingQueue 添加 优先级队列属于无界队列,忽略满时抛异常
public boolean add(E e) { return offer(e); }

添加元素
public boolean offer(E e) { // 空元素抛异常 if (e == null) throw new NullPointerException(); // 初始化锁 final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; // 达到当前数组长度尝试扩容 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator cmp = comparator; if (cmp == null) // 使用自然排序 siftUpComparable(n, e, array); else // 使用比较器排序 siftUpUsingComparator(n, e, array, cmp); // 长度增加 size = n + 1; // 放开空限制条件 notEmpty.signal(); } finally { // 解锁 lock.unlock(); } return true; }

优先级队列属于无界队列,忽略等待
public void put(E e) { offer(e); // never need to block }

优先级队列属于无界队列,忽略超时
public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); }

PriorityBlockingQueue 扩容
private void tryGrow(Object[] array, int oldCap) { // 先解锁(后面解释) lock.unlock(); Object[] newArray = null; // 使用原则锁CAS if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { // 数组扩容逻辑跟PriorityQueue一样, // 数组长度小于64时,新长度=原长度*2+2 // 数组长度大于等于64时,新长度=原长度*1.5 int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); // 新长度大于默认长度最大值 if (newCap - MAX_ARRAY_SIZE > 0) { // 使用原长度加1策略 int minCap = oldCap + 1; // 是否越界,越界抛异常 if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); // 不越界就使用数组最大值作为新长度 newCap = MAX_ARRAY_SIZE; } // queue == array 必须没有其他线程操作过 if (newCap > oldCap && queue == array) // 创建新数组 newArray = new Object[newCap]; } finally { // 重置锁标志 allocationSpinLock = 0; } } // 未拿到cas锁 if (newArray == null) // 让出当前CPU Thread.yield(); // 重新获取锁 lock.lock(); // 拷贝数组(queue == array限制没有操作过) if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }

为什么要解锁呢?我们先来看看不解锁会怎么样,其他写的线程和读的线程全部都在等待获取锁,然后这边在慢慢拷贝数组。往细了说,扩容应该只影响写,如果有大量的读,其实扩容是可以取消的,有人取过数据(queue == array)将不再成立,这是可以不用扩容了(有空位了)。这里先解锁,在用cas加锁计算新长度创建新数组,然后重新获得锁,完全就是为了性能考虑。
PriorityBlockingQueue 出队 出队
public E poll() { // 获取锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 取元素 return dequeue(); } finally { // 解锁 lock.unlock(); } }

出队,为空就等待
public E take() throws InterruptedException { // 获取锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { // 为空等待 while ( (result = dequeue()) == null) notEmpty.await(); } finally { // 解锁 lock.unlock(); } return result; }

出队,为空等待,超时返回false
public E poll(long timeout, TimeUnit unit) throws InterruptedException { // 等待时间 long nanos = unit.toNanos(timeout); // 获取锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { // 为空时等待一段时间,超时在读 while ( (result = dequeue()) == null && nanos > 0) nanos = notEmpty.awaitNanos(nanos); } finally { // 解锁 lock.unlock(); } return result; }

【数据结构|数据结构 - PriorityBlockingQueue 优先级阻塞队列】真正出队
private E dequeue() { // 元素个数减1 int n = size - 1; // 为空返回null if (n < 0) return null; else { // 获取数组 Object[] array = queue; // 取头 E result = (E) array[0]; // 取最后一个 E x = (E) array[n]; // 最后一个置空 array[n] = null; Comparator cmp = comparator; // 用尾替换头并重新平衡 if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } }

PriorityBlockingQueue 查询方法
public E peek() { // 先获取锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 数组为空返回null return (size == 0) ? null : (E) queue[0]; } finally { // 解锁 lock.unlock(); } }

PriorityBlockingQueue 获取长度
public int size() { // 先获取锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 返回长度 return size; } finally { // 解锁 lock.unlock(); } }

数据结构|数据结构 - PriorityBlockingQueue 优先级阻塞队列
文章图片

    推荐阅读