【JUC】ArrayBlockingQueue之Condition应用

阻塞队列是一种生产者、消费者模式的应用;
ArrayBlockingQueue从源码角度来看,其本质是condition的一种应用
一、样例及原理

// == 1.队列初始化 ArrayBlockingQueue queue = new ArrayBlockingQueue(100); // == 2.入队 Thread threadA = new Thread(()->{ try { queue.put(new Object()); } catch (InterruptedException e) { e.printStackTrace(); } }); threadA.start(); // == 3.出队 Thread threadB = new Thread(()->{ try { Object object = queue.take(); } catch (InterruptedException e) { e.printStackTrace(); } }); threadB.start();

1.元素A加入队列并被消费流程 【JUC】ArrayBlockingQueue之Condition应用
文章图片

创建ArrayBlockingQueue时会构建一个数组,用来存放元素;同时会创建一个notEmpty的condition条件。
①、生产者生产元素
元素A会存放在数组中,同时会触发notEmpty这个condition的signal方法唤醒被阻塞的消费者。
②、消费者消费元素
此时另一个线程消费,本着FIFO原则元素A会被移除出数组,当数组元素的length=0时,触发await方法阻塞消费者
2.队列已满元素N被消费流程 【【JUC】ArrayBlockingQueue之Condition应用】见下图(逻辑类似,不做分析了,懒~)
【JUC】ArrayBlockingQueue之Condition应用
文章图片

二、源码分析 1.初始化
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); // 初始化数组 this.items = new Object[capacity]; // fair=false,非公平锁 lock = new ReentrantLock(fair); // 两个condition,不空、不满 notEmpty = lock.newCondition(); notFull =lock.newCondition(); }

2.入队put
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; // ## 加锁,被中断则抛出异常(抛异常是doAcquireInterruptibly()与acquireQueued()的主要区别)) lock.lockInterruptibly(); try { // -- 1.队列满了,await阻塞 while (count == items.length){ notFull.await(); } // -- 2.队列未满,入队 enqueue(e); } finally { lock.unlock(); } }// -- 2.队列未满,入队逻辑 java.util.concurrent.ArrayBlockingQueue#enqueue private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; // ## 唤醒notEmpty的condition notEmpty.signal(); }

3.出队take
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 加锁 lock.lockInterruptibly(); try { // -- 1.队列空,则阻塞 while (count == 0) notEmpty.await(); // -- 2.队列不空,出队 return dequeue(); } finally { lock.unlock(); } }// -- 2.队列不空,出队 java.util.concurrent.ArrayBlockingQueue#dequeue private E dequeue() { final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); // ## 唤醒notFull条件 notFull.signal(); return x; }

    推荐阅读