工作常用|java阻塞队列LinkedBlockingQueue源码分析
一、概要 阻塞队列与我们平时接触的普通队列(LinkedList 和ArrayList)的最大的不同点,在于阻塞队列支持阻塞添加和阻塞删除方法。阻塞队列常常应用于大数据框架源码和线程池源码中。
1、阻塞添加:阻塞添加指的时当组设队列元素已经满时候。队列会阻塞加入元素的线程,直到对垒元素不满时才会重新唤醒线程执行元素加入。
2、阻塞删除:是指在队列元素为空时,删除队列元素的线程被阻塞,直到队列不为空再执行删除操作。
ArrayList 和ArrayBlockingQueue一样,内部基于数组来存放元素,而linkedBlockingQueue 内部采用链表来存放元素。
下面时ArrayBlockingQueue 和linkedBlockingQueue 继承关系图。
文章图片
二、LinkedBlockingQueue 是一个基于链表实现的阻塞队列,默认情况下,该阻塞队列的大小为Integer.MAX_VALUE(这也是和arrayListQueue的区别之一),由于这个数值特别大,因此在很多地方称LinkedBlockingQueue是一个无界队列。在LinkedBlockingQueue进行初始化时,可以手动指定队列的大小,这样LinkedBlockingQueue就是一个有界队列了。
1、linkedBlockingQueue 之线程安全。
既然说LinkedBlockingQueue是线程安全的,那就要解决互斥和同步的问题,这一点我们可以通过Java中提供的锁来解决。Java中锁分两大类,一类是synchronized实现的隐式锁,另一类是并发编程大师Doug Lea基于AQS实现的锁。由于LinkedBlockingQueue是Doug Lea所编写的类,因此LinkedBlockingQueue底层使用的是AQS类型的锁(lock),即:ReentrantLock。
2、linkedBlockingQueue 之数据结构
文章图片
1)、head和last.。这两个属性的类型是Node类型,Node是LinkedBlockingQueue的一个内部类。每个Node又包含两个属性:item和next。item就是最终存元素的属性,next用来指向下一个节点,通过next属性就能在LinkedBlockingQueue内部维护一个单向链表。其中head和last分别表示链表的头部和尾部。(特别指出的是:实际存储中head的item属性始终是null,因此head不存放元素,它仅仅是表示一个链表的头结点。)
static class Node {
E item;
Node next;
Node(E x) { item = x;
}
}
2)初始化时候我们一般会指定阻塞队列的容量大小 如下
例如new LinkedBlockingQueue(1000)
3)、count。AtomicInteger类型,该属性的类型是一个原子类型,它表示的是当前队列中元素的个数。
【工作常用|java阻塞队列LinkedBlockingQueue源码分析】4)、takeLock。ReentrantLock类型,在器阻塞队列中,获取元素和添加元素采用了不同的锁。takeLock表示获取元素中使用的锁。
5)、putLock。ReentrantLock类型,putLock表示的是添加元素时使用的锁。
6)、notEmpty。非空等待队列,Condition类型,当队列为空时,不能再从队列中获取元素了,此时想从队列中获取元素的线程就需要等待,直到队列中有元素被添加进来。那么此时线程应该在哪等待呢?就是在nonEmpty这个非空等待队列中等待。notEmpty属性的值,是通过takeLock这把锁来创建的。
private final Condition notEmpty = takeLock.newCondition();
7)、notFull。非满等待队列,Condition类型,当队列已满时,不能再向队列中添加元素了,此时向队列中添加元素的线程就需要等待,直到队列不满。那么线程应该在哪儿等待呢?就是在notFull这个非满等待队列中等待。notFull属性的值,是通过putLock这把锁来创建的。
private final Condition notFull = putLock.newCondition();
注意:其中Condition类中实现了线程等待wait、唤醒等方法,具体参考其具体的代码
3、linkedBlockingQueue 之源码分析
上面我们了解了linkedBlockingQueue的内部数据结构,下面我们结合具体的源码分析下LinkedBlockingQueue的实现。将LinkedBlockingQueue的操作分为两类。存元素和取出元素。其linkedBlockingQueue存数据具体方法有:put(e)、offer(e)、offer(e,time,unit); 取出元素的方法有:take()、poll()、poll(time、unit)、peek()。等。
put(e)
当队列已满时,put(e)方法会一直阻塞线程,直到队列不满。当成功添加元素到队列中时,put(e)方法才会返回结束。该方法没有返回值。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//可中断的获取锁
putLock.lockInterruptibly();
try {
// 如果队列满了,则进行等待,等待队列是非满状态
while (count.get() == capacity) {
notFull.await();
}
// 入队
enqueue(node);
// 队列元素个数自增,注意,由于这里调用的是getAndIncrement()方法,
// 不是incrementAndGet()方法,所以返回的是自增之前的值。
c = count.getAndIncrement();
// 如果阻塞队列还没有满,就唤醒处于notFull等待队列中的线程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
具体执行流程为:(1)当调用put(e)时,先获取putLock锁,然后判断队列中是否已满,如果已满,就调用notFull.await()方法,让当前线程进入到notFull的等待对列中,当队列不满时,会调用notFull。signal()方法,唤醒notFull等待队列的线程。 (2)当队列不满时。调用enqueue()方法将元素存入inkedBlockingQueue内部维护的链表中。(3)、但元素入队成功之后,再判断再判断队列是否已满,如果未满,就唤醒notFull队列中的线程。 最后再判断队列在添加元素之前是否有元素,如果没有元素
那么可能有线程等待再notEmpty这个等待队列中,调用signalNotEmpty()方法就会唤醒处于notEmpty等待队列中的线程。
enqueue(node)方法的源码如下。
private void enqueue(Node node) {
last = last.next = node;
}
offer(e)
offer(e)方法不会阻塞线程,当阻塞队列已经满了时,如果再向阻塞队列中添加元素,那么offer(e)方法会直接返回false,如果元素添加成功,则会返回true。源码如下。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//如果阻塞队列满了直接返回false
if (count.get() == capacity)
return false;
int c = -1;
Node node = new Node(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
//入队
enqueue(node);
c = count.getAndIncrement();
//未满通知
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
//非空通知
if (c == 0)
signalNotEmpty();
// 如果线程调用putLock.lock()没有获取到锁,那么此时c等于-1,因此会也会返回false
return c >= 0;
}
**注意:**offer(e)方法的源码与put(e)方法的源码大部分逻辑一致,不同点就在于代码中标记①的地方。offer(e)在判断队列已满时,会直接返回结束,未满时,才会进行获取putLock锁,然后进行元素添加操作。
offer(e,time,unit)
此方法支持线程超时存放元素,当阻塞队列已满时,当前线程最多等待time时间。如果这段时间依旧没有将元素存入到队列中,那么就会返回false。如果元素添加成功,就返回true。相对于再offer(e)基础上增加了等待超时时间。
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
数据提取:
take()
take() 方法和put(e)方法对应,take()方法用来从阻塞队列中取出元素。当阻塞对列中没有元素存在时候,当前线程会一直等待,直到阻塞队列不为空。最终返回阻塞对垒中存储的第一个元素。take()方法源码如下。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 如果阻塞队列中一直没有元素,线程就一直等待,直到队列中有元素后调用notEmpty等待队列的signal()方法
while (count.get() == 0) {
notEmpty.await();
}
// 当阻塞队列中有元素后,会跳出上面的while循环,然后出阻塞队列
x = dequeue();
c = count.getAndDecrement();
// 如果阻塞队列中还有元素,就唤醒等待在notEmpty等待队列中的线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果在元素出队列前,队列处于已满状态,那么从队列中移出一个元素后,队列就变为非满状态了
// 此时就唤醒等待在notFull等待队列中的线程
if (c == capacity)
signalNotFull();
return x;
}
(1)当调用take()方法时,先判断LinkedBlockingQueue中有没有元素,如果没有,就调用notEmpty.await()方法,让当前线程进入到notEmpty这个等待队列中等待。当LinkedBlockingQueue中有元素时,其他线程就会调用notEmpty.signal()方法,就会让当前线程醒来,继续执行后面的逻辑。(2)如果LinkedBlockingQueue中有元素,就调用dequeue()方法从队列中取出元素。取出元素后,再判断队列中是否还有元素,如果还有,则唤醒处于notEmpty这个等待队列中的元素。(3)、最后判断LinkedBlockingQueue是否已满,如果没有满,就调用signalNotFull()方法,唤醒等待在notFull等待队列中的线程。
注意:由发现知道,take 方法中含有唤醒 存入队列元素线程的方法
dequeue() 自行分析源码
poll()
poll()方法也是从LinkedBlockingQueue中取出元素,但是它不会阻塞线程,当LinkedBlockingQueue队列中没有元素时,poll()方法就会直接返回null;有元素时,就会先尝试获取锁,然后再取出元素。源码如下:
public E poll() {
final AtomicInteger count = this.count;
// 如果队列为空,就立即返回null,不会阻塞线程
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
// 取元素
x = dequeue();
c = count.getAndDecrement();
// 非空通知
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
// 非满通知
if (c == capacity)
signalNotFull();
return x;
}
poll(time,unit)
LinkedBlockingQueue中没有元素时,poll(time,unit)也会阻塞线程,它支持的是超时阻塞。当在time时间内,没有获取到元素时,就会返回null。(例如应用在爬虫中我们会设置1分钟超时取爬取的url,然后另一个线程向这个队列中写入爬虫的url 这样就能很好形成一个爬虫循环)
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
// 如果已经超时,直接返回null
if (nanos <= 0)
return null;
// 等待(调用的是LockSupport.parkNanos(this, nanosTimeout))
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
poll(time,unit)与take()的区别是,take是一直阻塞,poll(time,unit)是超时阻塞
peek()
peek() 方法也是从队列中获取元素,但是他只会获取队列中的第一个元素,且不会将元素从队列中移除。
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 取第一个元素
Node first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
从peek()方法的源码中可以发现,peek()仅仅只是取出队列中的第一个元素,但是并没有修改链表的指针指向,因此它不会将元素从队列中删除。
综上:
1、LinkedBlockQueue是一个线程安全的队列,它十一额底层基于链表实现的无界队列,当指定队列容量时,它是一个有界队列。
2、在LinkedBlockingQueue中,取元素和存元素使用的是两把锁。锁的类型是ReentrantLock。它通过使用Condition的等待/通知来实现生产者-消费者模型。
3、由于LinkedBlockingQueue中存元素和取元素使用的是两把锁,存和取可以同时进行,因此他的吞吐量要高于ArrayBlockingQueue。
推荐阅读
- JAVA(抽象类与接口的区别&重载与重写&内存泄漏)
- 7.9号工作总结~司硕
- 标签、语法规范、内联框架、超链接、CSS的编写位置、CSS语法、开发工具、块和内联、常用选择器、后代元素选择器、伪类、伪元素。
- 事件代理
- 高天天工作室|溧清的剧本4
- Java|Java OpenCV图像处理之SIFT角点检测详解
- java中如何实现重建二叉树
- 如果你是一名工作乐之者,996的工作又算得了什么()
- 为什么985/211的学生能胜任工作获得老板的青睐。
- 数组常用方法一