LinkedBlockingQueue分析
单链表特点
【LinkedBlockingQueue分析】通过沿着一个外部的头链接(head link)来访问第一个项。然后用户通过从第一个项产生的,串联起来的单个链条,来访问其它项。
最后一项是没有指向下一项的链接。头结点不带数据。
单链表的节点包含: 一个数据项;到结构中的下一个节点的一个链接。
LinkedBlockingQueue特点
- 按先进先出的原则对元素进行排序;
- 队列头部是队列中时间最长的元素,尾部是队列时间最短的元素。
- 元素插入在尾部, 元素取出在头部。
- 链接队列通常具有比基于数组实现的队列更高的吞吐量,原因是添加和删除个一把锁,互不影响,但是在大多数并发应用程序中性能较差,原因是会产生额外node对象,处理大批量数据对GC有影响,
- 内部 添加线程和消费线程各自通过各自的ReenterLock锁对并发进行控制,添加和消费不是互斥的,各自管理好自己的线程。
- 在添加线程里,队列不满 唤醒添加notfull上的添加线程,线需要判断添加前元素前的队列大小,因为消费线程唤醒后会一直运行,直到队列为空的时候才进入等待。
如果是0,说明之前的消费线程已经停止,notempty条件对象上可能存在等待的消费线程,现在需要唤醒消费线程消费。 - 如果是消费线程,队列不为空需要唤醒消费线程。线需要判断添加前元素前的队列大小,如果是满了,需要唤醒添加线程,在notfull条件上可能存在等待线程
- 默认的大小Integer.MAX_VALUE,在添加速度大于移动速度时候,可能会导致内存溢出的问题,建议指定合适的大小
public class LinkedBlockingQueue extends AbstractQueue
implements BlockingQueue, java.io.Serializable {//单链表节点
static class Node{E item;
Node next;
Node(E x){
item = x;
}}//阻塞队列元素的最大值
private final int capacity;
//当前阻塞队列的元素个数
private final AtomicInteger count = new AtomicInteger();
// head of link listhead.item == null;
transient Node head;
//tail of link list head.next == null
private transient Node last;
private ReentrantLock takeLock = new ReentrantLock();
private Condition notEmpty = takeLock.newCondition();
private ReentrantLock putLock = new ReentrantLock();
private Condition notFull = new Condition();
public LinkedBlockingQueue(){
this(Integer.MAX_VALUE);
}public LinkedBlockingQueue(int capacity){
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node(null);
}public LinkedBlockingQueue(Collection extends E> c){this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try{int n = 0;
for(E e : c){
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node(e));
++n;
}
count.set(n);
}finally{
putLock.unlock();
}
}private void enqueue(Node node){last = last.next = node;
}public void put(E e){
if (e == null) throw new NullPointerException();
int c = -1;
Node node = new Node;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try{while(count.get() == capacity){
notFull.await();
}
enqueue(e);
//以原子方式将当前值加 1。返回旧值
c = count.getAndIncreate();
if(c+1 < capacity)
//队列未满 唤醒添加的线程
notFull.signal();
}finally{
lock.unlock();
}
//如果是0,说明之前的消费线程已经停止,notempty条件对象上可能存在等待的消费线程,现在需要唤醒消费线程消费
if(c == 0){
signalNotEmpty();
}}public boolean offer(E e, long timeout , TimeUnit util) throws InterruptedException{if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
//剩余的等待时间, 超时后直接返回false;
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}//从队列中返回元素并移除
public E take() throws InterruptedException{E x;
int c = -1;
final AtomicInteger count = this.count;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try{
while(count.get() == 0){
notEmpty.await();
}
x = dequeue();
//以原子方式将当前值减 1。返回旧值
c = count.getAndDecrement();
if(c > 1){
//队列不为空, 通知等待的消费线程
notEmpty.signal;
}
}finally{takeLock.unlock();
}
//如果队列是满了,添加线程是停止了,处于等待状态,需要唤醒添加线程,在notfull条件上可能存在等待线程
if(c = capacity){
signalNotFull();
}}//检索元素
public E peek(){if(count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try{return (count.get()>0)? head.next.item : null;
}finally{
takeLock.unlock();
}
}//移除元素时, 需要获取到消费线程和添加线程的锁
public boolean remove(Object o){
if (o == null) return false;
fullyLock();
try{
//p==null的时候, 说明处在了队列的尾节点了
for(Node trail = head, p = trail.next;
p != null;
trail = p, p = p.next){
if(o.equals(p.item)){
unlink(p,tail);
return true;
}
}
return false;
}finally{
fullyUnlock();
}
}//移除节点p
void unlink(Node p, Node trail){p.item = null;
trail.next = p.next;
if(last == p){
last = trail;
}
//如果队列是满了,添加线程是停止了,处于等待状态,需要唤醒添加线程,在notfull条件上可能存在等待线程
if(count.getAndDecrement() == capacity){
notFull.signal();
}
}void fullyLock() {
putLock.lock();
takeLock.lock();
}
}}
推荐阅读
- 如何寻找情感问答App的分析切入点
- D13|D13 张贇 Banner分析
- 自媒体形势分析
- 2020-12(完成事项)
- Android事件传递源码分析
- Python数据分析(一)(Matplotlib使用)
- 泽宇读书会——如何阅读一本书笔记
- Java内存泄漏分析系列之二(jstack生成的Thread|Java内存泄漏分析系列之二:jstack生成的Thread Dump日志结构解析)
- ffmpeg源码分析01(结构体)
- 关于两种潜能生的性格分析