LinkedBlockingQueue分析

单链表特点
【LinkedBlockingQueue分析】通过沿着一个外部的头链接(head link)来访问第一个项。然后用户通过从第一个项产生的,串联起来的单个链条,来访问其它项。
最后一项是没有指向下一项的链接。头结点不带数据。
单链表的节点包含: 一个数据项;到结构中的下一个节点的一个链接。
LinkedBlockingQueue特点

  1. 按先进先出的原则对元素进行排序;
  2. 队列头部是队列中时间最长的元素,尾部是队列时间最短的元素。
  3. 元素插入在尾部, 元素取出在头部。
  4. 链接队列通常具有比基于数组实现的队列更高的吞吐量,原因是添加和删除个一把锁,互不影响,但是在大多数并发应用程序中性能较差,原因是会产生额外node对象,处理大批量数据对GC有影响,
  5. 内部 添加线程和消费线程各自通过各自的ReenterLock锁对并发进行控制,添加和消费不是互斥的,各自管理好自己的线程。
  6. 在添加线程里,队列不满 唤醒添加notfull上的添加线程,线需要判断添加前元素前的队列大小,因为消费线程唤醒后会一直运行,直到队列为空的时候才进入等待。
    如果是0,说明之前的消费线程已经停止,notempty条件对象上可能存在等待的消费线程,现在需要唤醒消费线程消费。
  7. 如果是消费线程,队列不为空需要唤醒消费线程。线需要判断添加前元素前的队列大小,如果是满了,需要唤醒添加线程,在notfull条件上可能存在等待线程
  8. 默认的大小Integer.MAX_VALUE,在添加速度大于移动速度时候,可能会导致内存溢出的问题,建议指定合适的大小
LinkedBlockingQueue源码分析
public class LinkedBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable {//单链表节点 static class Node{E item; Node next; Node(E x){ item = x; }}//阻塞队列元素的最大值 private final int capacity; //当前阻塞队列的元素个数 private final AtomicInteger count = new AtomicInteger(); // head of link listhead.item == null; transient Node head; //tail of link list head.next == null private transient Node last; private ReentrantLock takeLock = new ReentrantLock(); private Condition notEmpty = takeLock.newCondition(); private ReentrantLock putLock = new ReentrantLock(); private Condition notFull = new Condition(); public LinkedBlockingQueue(){ this(Integer.MAX_VALUE); }public LinkedBlockingQueue(int capacity){ if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node(null); }public LinkedBlockingQueue(Collection c){this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); try{int n = 0; for(E e : c){ if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node(e)); ++n; } count.set(n); }finally{ putLock.unlock(); } }private void enqueue(Node node){last = last.next = node; }public void put(E e){ if (e == null) throw new NullPointerException(); int c = -1; Node node = new Node; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try{while(count.get() == capacity){ notFull.await(); } enqueue(e); //以原子方式将当前值加 1。返回旧值 c = count.getAndIncreate(); if(c+1 < capacity) //队列未满 唤醒添加的线程 notFull.signal(); }finally{ lock.unlock(); } //如果是0,说明之前的消费线程已经停止,notempty条件对象上可能存在等待的消费线程,现在需要唤醒消费线程消费 if(c == 0){ signalNotEmpty(); }}public boolean offer(E e, long timeout , TimeUnit util) throws InterruptedException{if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { //剩余的等待时间, 超时后直接返回false; if (nanos <= 0L) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }//从队列中返回元素并移除 public E take() throws InterruptedException{E x; int c = -1; final AtomicInteger count = this.count; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try{ while(count.get() == 0){ notEmpty.await(); } x = dequeue(); //以原子方式将当前值减 1。返回旧值 c = count.getAndDecrement(); if(c > 1){ //队列不为空, 通知等待的消费线程 notEmpty.signal; } }finally{takeLock.unlock(); } //如果队列是满了,添加线程是停止了,处于等待状态,需要唤醒添加线程,在notfull条件上可能存在等待线程 if(c = capacity){ signalNotFull(); }}//检索元素 public E peek(){if(count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try{return (count.get()>0)? head.next.item : null; }finally{ takeLock.unlock(); } }//移除元素时, 需要获取到消费线程和添加线程的锁 public boolean remove(Object o){ if (o == null) return false; fullyLock(); try{ //p==null的时候, 说明处在了队列的尾节点了 for(Node trail = head, p = trail.next; p != null; trail = p, p = p.next){ if(o.equals(p.item)){ unlink(p,tail); return true; } } return false; }finally{ fullyUnlock(); } }//移除节点p void unlink(Node p, Node trail){p.item = null; trail.next = p.next; if(last == p){ last = trail; } //如果队列是满了,添加线程是停止了,处于等待状态,需要唤醒添加线程,在notfull条件上可能存在等待线程 if(count.getAndDecrement() == capacity){ notFull.signal(); } }void fullyLock() { putLock.lock(); takeLock.lock(); } }}

    推荐阅读