J.U.C|J.U.C 阻塞队列(三) - LinkedBlockingQueue

1 概述 LinkedBlockingQueue 是一个用单项链表实现的的线程安全的有界的阻塞队列。队列按照先进先出(FIFO)的原则对元素进行排序。
通过以下关键词分析我们来更深入理解ArrayBlockingQueue
1.1 如何理解“队列” 队列这个概念非常好理解。你可以把它想象成排队买票,先来的先买,后来的人只能站末尾,不允许插队。先进者先出,这就是典型的“队列”。
相对于栈只支持两个基本操作:入栈 push()和出栈 pop(),对于也只支持两个操作入队 enqueue(),放一个数据到队列尾部;出队 dequeue(),从队列头部取一个元素,因此队列跟栈一样,也是一种操作受限的线性表数据结构
J.U.C|J.U.C 阻塞队列(三) - LinkedBlockingQueue
文章图片
image 1.2 如何理解“线程安全的” 在多线程情况下,会有多个线程同时操作队列,这个时候就会存在线程安全问题,线程安全的队列我们叫作并发队列.
那如何实现一个线程安全的队列呢?
方式一
最简单直接的实现方式是直接在 enqueue()、dequeue() 方法上加锁,但是锁粒度大并发度会比较低,同一时刻仅允许一个存或者取操作。
方式二
利用 CAS 原子操作,可以实现非常高效的并发队列。
1.3 阻塞队列 阻塞队列其实就是在队列基础上增加了阻塞操作。简单来说,就是在队列为空的时候,从队头取数据会被阻塞。因为此时还没有数据可取,直到队列中有了数据才能返回;如果队列已经满了,那么插入数据的操作就会被阻塞,直到队列中有空闲位置后再插入数据,然后再返回。
1.4 有界队列 有界队列表示队列中存储数据是有限,如果队列满后在次向队列中添加数据会失败或阻塞。
2 实现一个"队列" 我们知道了,队列跟栈一样,也是一种抽象的逻辑存储结构。它具有先进先出的特性,支持在队尾插入元素,在队头删除元素。如果要想实现一个队列可以用数组来实现,也可以用链表来实现,用数组实现的队列叫作顺序队列,用链表实现的队列叫作链式队列。
2.1 链式队列 【J.U.C|J.U.C 阻塞队列(三) - LinkedBlockingQueue】基于链表的实现,我们同样需要两个指针:head 指针和 tail 指针。它们分别指向链表的第一个结点和最后一个结点。
入队
tail->next= new_node, tail = tail->next;
出队
head = head->next
J.U.C|J.U.C 阻塞队列(三) - LinkedBlockingQueue
文章图片
image 3 LinkedBlockingQueue源码解析 3.1 类结构 J.U.C|J.U.C 阻塞队列(三) - LinkedBlockingQueue
文章图片
image 3.2 实现原理 1 LinkedBlockingQueue时基于链表实现的队列
2 相对于ArrayBlockingQueue,链表实现的队列出队入队完全可以多线程执行,并不相互依赖。因而按时锁的原则来说锁的粒度越小越好,因而LinkedBlockingQueue使用了2把锁,分别针对出队入队各自分配一把锁(takeLock,putLock)来保证线程安全。
3 LinkedBlockingQueue内部存在着二个可重入的锁(takeLock,putLock),同时分别生成二个等待队列Condition(notFull,notEmpty) -- 生产者和消费者模式

  • 当入队时首先获取入队的锁,并查看队列是否已经满了,如果满了则阻塞当前线程。如果没有满则入队成功,同时查看入队前的队列是否为空队列,如果是则释放出队等待队列中线程
  • 当出队时首先获取出队的锁,并查看队列是否已经为空,如果为空则阻塞当前线程。如果不位空则出队成功,同时查看出队前的队列是否为满队列,如果是则释放入队等待队列中线程
3.3 核心属性
static class Node { //队列节点中保存元素 E item; //当前节点的下一个节点 Node next; Node(E x) { item = x; } }/** 队列容量 */ private final int capacity; /** 队列保存的元素个数 */ private final AtomicInteger count = new AtomicInteger(); /** * 头部节点 */ transient Node head; /** * 尾部节点 */ private transient Node last; /** 出队锁 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 出队等待队列 */ private final Condition notEmpty = takeLock.newCondition(); /** 入队锁c */ private final ReentrantLock putLock = new ReentrantLock(); /** 入队等待队列 */ private final Condition notFull = putLock.newCondition();

3.4 构造函数
//创建一个带有默认的(Integer.MAX_VALUE)容量队列 public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }//创建一个带有给定的(capacity)容量队列 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node(null); }//创建一个带有默认的(Integer.MAX_VALUE)容量,它最初包含给定 collection 的元素,并以 collection 迭代器的遍历顺序添加元素。 public LinkedBlockingQueue(Collection c) { this(Integer.MAX_VALUE); //将指定容器中元素依此添加到队列中 final ReentrantLock putLock = this.putLock; //获取putLock独占锁,成功返回,失败则阻塞 putLock.lock(); try { int n = 0; for (E e : c) { //检查添加数据不能为null,NullPointerException if (e == null) throw new NullPointerException(); //添加collection超出,IllegalStateException if (n == capacity) throw new IllegalStateException("Queue full"); //添加到队列尾部 enqueue(new Node(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }

3.5 入队操作 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则抛出 IllegalStateException。
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }

将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则返回 false。
public boolean offer(E e) { //添加元素为null,抛出异常 if (e == null) throw new NullPointerException(); // 如果“队列已满”,则返回false,表示插入失败。 final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node node = new Node(e); final ReentrantLock putLock = this.putLock; //获取putLock独占锁,成功返回,失败则阻塞 putLock.lock(); try { if (count.get() < capacity) { //将当前添加节点添加队列尾部 enqueue(node); // 将“当前节点数量”+1,并返回“原始的数量” c = count.getAndIncrement(); // 如果在插入元素之后,队列仍然未满,则唤醒notFull等待队列上等待线程。 if (c + 1 < capacity) notFull.signal(); } } finally { // 释放“插入锁putLock” putLock.unlock(); } // 如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty等待队列上等待线程 if (c == 0) signalNotEmpty(); return c >= 0; }//将当前添加节点添加队列尾部 private void enqueue(Node node) { last = last.next = node; }//释放notEmpty等待队列中线程 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; //获取takeLock独占锁 takeLock.lock(); try { //释放notEmpty等待队列中线程 notEmpty.signal(); } finally { //释放takeLock独占锁 takeLock.unlock(); } }

将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间,在成功时返回 true。
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { //添加元素为null,抛出异常 if (e == null) throw new NullPointerException(); //获取等待的时间 long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //获取putLock独占锁,成功返回,失败则阻塞 putLock.lockInterruptibly(); try { //如果队列已满,等待时间大于0。将当前线程添加notFull等待队列,并限时阻塞当前线程 while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } //将当前添加节点添加队列尾部 enqueue(new Node(e)); //将“当前节点数量”+1,并返回“原始的数量” c = count.getAndIncrement(); // 如果在插入元素之后,队列仍然未满,则唤醒notFull等待队列上的等待线程。 if (c + 1 < capacity) notFull.signal(); } finally { // 释放“插入锁putLock” putLock.unlock(); } // 如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty等待队列上的等待线程 if (c == 0) signalNotEmpty(); return true; }

put,该操作成功返回 true,失败则进入阻塞。
public void put(E e) throws InterruptedException { //添加元素为null,抛出异常 if (e == null) throw new NullPointerException(); int c = -1; Node node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //获取putLock独占锁,成功返回,失败则阻塞,响应中断 putLock.lockInterruptibly(); try { //如果队列已满。将当前线程添加notFull等待队列,并阻塞当前线程 while (count.get() == capacity) { notFull.await(); } //将当前添加节点添加队列尾部 enqueue(node); //将“当前节点数量”+1,并返回“原始的数量” c = count.getAndIncrement(); // 如果在插入元素之后,队列仍然未满,则唤醒notFull等待队列上等待线程。 if (c + 1 < capacity) notFull.signal(); } finally { // 释放“插入锁putLock” putLock.unlock(); } // 如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty等待队列上等待线程 if (c == 0) signalNotEmpty(); }

3.6 出队操作
  • 获取并移除此队列的头,如果此队列为空,则返回 null。
public E poll() { final AtomicInteger count = this.count; //如果队列元素为空,返回null if (count.get() == 0) return null; E x = null; int c = -1; final ReentrantLock takeLock = this.takeLock; //获取putLock独占锁,失败则阻塞 takeLock.lock(); try { //队列中存在元素 if (count.get() > 0) { //获取队列首部元素 x = dequeue(); // 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。 c = count.getAndDecrement(); // 取出元素之后,队列仍不为空,则唤醒notFull等待队列上的等待线程 if (c > 1) notEmpty.signal(); } } finally { //释放takeLock独占锁 takeLock.unlock(); } //如果出队操作前队列时满的,则释放notFull等待队列中头部节点线程阻塞状态 if (c == capacity) signalNotFull(); return x; }

  • 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; //获取等待的时间 long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { //如果队列为空,将当前线程添加到notEmpty等待队列,并限时阻塞当前线程 while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } //获取队列首部元素 x = dequeue(); // 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。 c = count.getAndDecrement(); // 取出元素之后,队列仍不为空,则唤醒notFull等待队列上的等待线程 if (c > 1) notEmpty.signal(); } finally { //释放takeLock独占锁 takeLock.unlock(); } //如果出队操作前队列时满的,则释放notFull等待队列中头部节点线程阻塞状态 if (c == capacity) signalNotFull(); return x; }

  • 获取并移除此队列的头部,在元素变得可用之前一直等待
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { //如果队列为空,将当前线程添加notEmpty等待队列,并阻塞当前线程 while (count.get() == 0) { notEmpty.await(); } //获取队列首部元素 x = dequeue(); // 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。 c = count.getAndDecrement(); // 取出元素之后,队列仍不为空,则唤醒notFull等待队列上的等待线程 if (c > 1) notEmpty.signal(); } finally { //释放takeLock独占锁 takeLock.unlock(); } //如果出队操作前队列时满的,则释放notFull等待队列中头部节点线程阻塞状态 if (c == capacity) signalNotFull(); return x; }

3.7 指定元素出队
  • 查找我们需要遍历链表找到需要删除节点和前置节点
  • 通过链表操作将删除节点出队

    J.U.C|J.U.C 阻塞队列(三) - LinkedBlockingQueue
    文章图片
    image
public boolean remove(Object o) { if (o == null) return false; //同时获取putLock,takeLock独占锁 fullyLock(); try { //从头部节点开始向后遍历查找删除节点以及其前置前置节点 for (Node trail = head, p = trail.next; p != null; trail = p, p = p.next) { if (o.equals(p.item)) { //将节点从链表中删除 //p表示将要删除的节点 //trail表示要删除的节点前置系欸但 unlink(p, trail); return true; } } return false; } finally { //同时释放putLock,takeLock独占锁 fullyUnlock(); } }//将节点从链表中删除 //p表示将要删除的节点 //trail表示要删除的节点前置节点 void unlink(Node p, Node trail) { //将节点p从链表中删除 p.item = null; trail.next = p.next; //如果节点p是链表尾部,last指向trail if (last == p) last = trail; //如果出队操作前队列时满的,则释放notFull等待队列中头部节点线程阻塞状态 if (count.getAndDecrement() == capacity) notFull.signal(); }

3.8 查询操作 获取队列中元素数量
public int size() { return count.get(); }

查看队列中剩余的位置
public int remainingCapacity() { return capacity - count.get(); }

4 LinkedBlockingQueue使用 4.1 函数列表
// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue。 LinkedBlockingQueue() // 创建一个容量是 Integer.MAX_VALUE 的 LinkedBlockingQueue,最初包含给定 collection 的元素,元素按该 collection 迭代器的遍历顺序添加。 LinkedBlockingQueue(Collection c) // 创建一个具有给定(固定)容量的 LinkedBlockingQueue。 LinkedBlockingQueue(int capacity) // 从队列彻底移除所有元素。 void clear() // 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。 int drainTo(Collection c) // 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。 int drainTo(Collection c, int maxElements) // 返回在队列中的元素上按适当顺序进行迭代的迭代器。 Iterator iterator() // 将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false。 boolean offer(E e) // 将指定元素插入到此队列的尾部,如有必要,则等待指定的时间以使空间变得可用。 boolean offer(E e, long timeout, TimeUnit unit) // 获取但不移除此队列的头;如果此队列为空,则返回 null。 E peek() // 获取并移除此队列的头,如果此队列为空,则返回 null。 E poll() // 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。 E poll(long timeout, TimeUnit unit) // 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。 void put(E e) // 返回理想情况下(没有内存和资源约束)此队列可接受并且不会被阻塞的附加元素数量。 int remainingCapacity() // 从此队列移除指定元素的单个实例(如果存在)。 boolean remove(Object o) // 返回队列中的元素个数。 int size() // 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。 E take() // 返回按适当顺序包含此队列中所有元素的数组。 Object[] toArray() // 返回按适当顺序包含此队列中所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。 T[] toArray(T[] a) // 返回此 collection 的字符串表示形式。 String toString()

示例
import java.util.*; import java.util.concurrent.*; /* *LinkedBlockingQueue是“线程安全”的队列,而LinkedList是非线程安全的。 * *下面是“多个线程同时操作并且遍历queue”的示例 *(01) 当queue是LinkedBlockingQueue对象时,程序能正常运行。 *(02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。 * * @author skywang */ public class LinkedBlockingQueueDemo1 {// TODO: queue是LinkedList对象时,程序会出错。 //private static Queue queue = new LinkedList(); private static Queue queue = new LinkedBlockingQueue(); public static void main(String[] args) {// 同时启动两个线程对queue进行操作! new MyThread("ta").start(); new MyThread("tb").start(); }private static void printAll() { String value; Iterator iter = queue.iterator(); while(iter.hasNext()) { value = https://www.it610.com/article/(String)iter.next(); System.out.print(value+", "); } System.out.println(); }private static class MyThread extends Thread { MyThread(String name) { super(name); } @Override public void run() { int i = 0; while (i++ < 6) { // “线程名” + "-" + "序号" String val = Thread.currentThread().getName()+i; queue.add(val); // 通过“Iterator”遍历queue。 printAll(); } } } }

    推荐阅读