Java|Java JUC ConcurrentLinkedQueue解析
ConcurrentLinkedQueue 原理探究
介绍
ConcurrentLinkedQueue 是线程安全的无界非阻塞队列,底层使用单向链表实现,对于入队和出队操作使用 CAS 实现线程安全。
文章图片
ConcurrentLinkedQueue 内部的队列使用单向链表方式实现,其中有两个volatile类型的Node节点
分别用来存放队列的头、尾节点。
下面无参构造函数中可以知道,默认头尾节点都是指向 item 为 null 的哨兵节点,新元素插入队尾,获取元素从头部出队。
private transient volatile Node head;
private transient volatile Node tail;
public ConcurrentLinkedQueue() {
head = tail = new Node(null);
}
在 Node 节点中维护一个使用 volatile 修饰的变量 item,用来存放节点的值;next 存放链表的下一个节点;其内部则使用 UNSafe 工具类提供的 CAS 算法来保证出入队时操作链表的原子性。
offer 操作 offer 操作是在队尾增加一个元素,如果传递的参数是 null 则抛出异常,否则由于 ConcurrentLinkedQueue 是无界队列,该方法会一直返回 true,另外,由于使用的是 CAS 算法,所以该方法不会被阻塞挂起调用方线程。下面我们看源码:
public boolean offer(E e) {
//(1)为null则抛出异常
checkNotNull(e);
//(2)构造Node节点,内部调用unsafe.putObject
final Node newNode = new Node(e);
//(3)从尾节点插入
for (Node t = tail, p = t;
;
) {
Node q = p.next;
//(4)如果q == null则说明p是尾节点,执行插入
if (q == null) {
//(5)使用CAS设置p节点的next节点
if (p.casNext(null, newNode)) {
//(6)如果p != t,则将入队节点设置成tail节点,更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点
if (p != t) // hop two nodes at a time
casTail(t, newNode);
// Failure is OK.
return true;
}
}
else if (p == q)
//(7)多线程操作时,由于poll操作移除元素后可能会把head变成自引用,也就是head的next变成了head,这里需要重新找新的head
p = (t != (t = tail)) ? t : head;
else
//(8)寻找尾节点
p = (p != t && t != (t = tail)) ? t : q;
}
}
我们解析一下这个方法的执行流程,首先当调用该方法时,代码(1)对传参进行检查,为 null 则抛出异常,否则执行代码(2)并使用 item 作为构造函数参数创建一个新的节点,然后代码(3)从队列尾部节点开始循环,打算从尾部添加元素,到代码(4)时队列状态如下图。
文章图片
这时候节点 p、t、head、tail 都指向 item 为 null 的哨兵节点,由于哨兵节点 next 节点为 null,所以 q 也是 null。
代码(4)发现 q == null 则执行代码(5),使用 CAS 判断 p 节点的 next 是否为 null,为 null 则使用 newNode 替换 p 的 next 节点,然后执行代码(6),但这时候 p == t 所以没有设置尾部节点,然后退出 offer 方法。目前队列状态如下图:
文章图片
刚才我们讲解的是一个线程调用 offer 方法的情况,但是如果多个线程同时调用,就会存在多个线程同时执行到代码(5)的情况。
假设线程 1 调用了 offer(item1),线程 2 调用了 offer(item2),都同时执行到了代码(5),p.casNext(null,newNode)。由于 CAS 是原子性的,我们假设线程 1 先执行 CAS 操作,发现 p.next 为 null,则更新为 item1,这时候线程 2 也会判断 p.next 是否为 null,发现不为 null,则跳转到代码(3),然后执行代码(4)。这时队列分布如下图:
文章图片
随后线程 2 发现不满足代码(4),则跳转执行代码(8),然后把 q 赋值给了 p。队列状态如下:
文章图片
然后线程 2 再次跳转到代码(3)执行,当执行到代码(4)时队列状态如下图:
文章图片
这时候
q == null
,所以线程 2 会执行代码(5),通过 CAS 操作判断 p.next 节点是否为 null,是 null 则使用 item2 替换,不是则继续循环尝试。假设 CAS 成功,那么执行代码(6),由于p != t
,所以设置 tail 节点为 item2,然后退出 offer 方法。这时候队列分布如下:文章图片
到现在我们就差代码(7)没有讲过,其实在这一步主要是在执行 poll 操作后才会执行。在执行 poll 可能会发生如下情况:
文章图片
我们分析一下如果是这种状态时调用 offer 添加元素,在执行到代码(4)时状态图如下:
文章图片
这里由于 q 节点不为空并且
p == q
所以执行到代码(7)由于t == tail
所以 p 被赋值为 head,然后重新循环,循环后执行到代码(4),队列状态如下:文章图片
这时候由于
q == null
,所以执行代码(5)进行 CAS 操作,如果当前没有其他线程执行 offer 操作,则 CAS 操作会成功,p.next 节点被设置为新增节点。然后执行代码(6),由于p != t
所以设置新节点为队列的尾部节点,现在队列状态如下图:文章图片
需要注意,这里自引用节点会被垃圾回收掉总结:offer 操作的关键步骤是代码(5),通过 CAS 操作来控制同一时刻只有一个线程可以追加元素到队列末尾,而失败的线程会进行循环尝试 CAS 操作,直到 CAS 成功。
add 操作 add 操作是在链表末尾添加一个元素,内部还是调用的 offer 方法。
public boolean add(E e) {
return offer(e);
}
poll 操作 poll 操作是在队列头部获取并移除一个元素,如果队列为空则返回 null。下面看看该方法的实现原理。
public E poll() {
//(1)goto标记
restartFromHead:
//(2)无限循环
for (;
;
) {
for (Node h = head, p = h, q;
;
) {
//(3)保存当前节点
E item = p.item;
//(4)当前节点有值切CAS变为null
if (item != null && p.casItem(item, null)) {
//(5)CAS成功后标记当前节点并移除
if (p != h) // hop two nodes at a time
//更新头节点
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
//(6)当前队列为空则返回null
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
//(7)如果当前节点被自引用,则重新寻找头节点
else if (p == q)
continue restartFromHead;
else//(8)如果下一个元素不为空,则将头节点的下一个节点设置成头节点
p = q;
}
}
}
final void updateHead(Node h, Node p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
由于 poll 方法是从头部获取一个元素并移除,所以代码(2)内层循环是从 head 节点开始,代码(3)获取当前队列头节点,队列一开始为空时队列状态如下:
文章图片
由于 head 节点指向的 item 为 null 的哨兵节点,所以会执行到代码(6),假设这个过程中没有线程调用 offer 方法,则此时 q 等于 null,这时队列状态如下:
文章图片
随后执行 updateHead 方法,由于
h == p
,所以没有设置头节点,然后返回 null。假设执行到代码(6)时,已经有其它线程调用了 offer 方法并成功添加了一个元素到队列,这时候 q 指向的是新增加的元素节点,如下图:
文章图片
然后不满足代码(6)
(q = p.next) == null
,执行代码(7)的时候 p != q
则执行代码(8),然后将 p 指向了节点 q,队列状态如下:文章图片
然后程序又去执行代码(3),p 现在指向的不为 null,则执行
p.casItem(item, null)
通过 CAS 操作尝试设置 p 的 item 值为 null,如果此时 CAS 成功,执行代码(5),此时p != h
则设置头节点为 p,并且设置 h 的 next 节点为自己,poll 然后返回被移除的节点 item。队列图如下:文章图片
目前的队列状态就是我们刚才讲 offer 操作时,执行到代码(7)的状态。
现在我们还有代码(7)没执行过,我们看一下什么时候执行。假设线程 1 执行 poll 操作时,队列状态如下:
文章图片
那么执行
p.casItem(item, null)
通过 CAS 操作尝试设置 p 的 item 值为 null,假设 CAS 设置成功则标记该节点并从队列中将其移除。队列状态如下:文章图片
然后,由于
p != h
,所以会执行 updateHead 方法,假如线程 1 执行 updateHead 前另外一个线程 2 开始 poll 操作,这时候线程 2 的 p 指向 head 节点,但是还没有执行到代码(6),这时候队列状态如下:文章图片
然后线程 1 执行 updateHead 操作,执行完毕后线程 1 退出,这时候队列状态如下:
文章图片
然后线程 2 继续执行代码(6),
q = p.next
,由于该节点是自引用节点,所以p == q
,所以会执行代码(7)跳到外层循环 restartFromHead,获取当前队列头 head,现在的状态如下:文章图片
总结:poll 方法在移除一个元素时,只是简单地使用 CAS 操作把当前节点的 item 值设置为 null,然后通过重新设置头节点将该元素从队列里面移除,被移除的节点就成了孤立节点,这个节点会在垃圾回收时被回收掉。另外,如果在执行分支中发现头节点被修改了,要跳到外层循环重新获取新的头节点。
peek 操作 peek 操作是获取队列头部一个元素(只获取不移除),如果队列为空则返回 null。下面看下其实现原理。
public E peek() {
//1.
restartFromHead:
for (;
;
) {
for (Node h = head, p = h, q;
;
) {
//2.
E item = p.item;
//3.
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
//4.
else if (p == q)
continue restartFromHead;
//5.
else
p = q;
}
}
}
peek 操作的代码结构与 poll 操作类似,不同之处在于代码(3)中少了 castItem 操作。其实这很正常,因为 peek 只是获取队列头元素值,并不清空其值。根据前面的介绍我们知道第一次执行 offer 后 head 指向的是哨兵节点(也就是 item 为 null 的节点),那么第一次执行 peek 时在代码(3)中会发现
item == null
,然后执行q = p.next
,这时候 q 节点指向的才是队列里面第一个真正的元素,或者如果队列为 null 则 q 指向 null。当队列为空时,队列如下:
文章图片
这时候执行 updateHead,由于 h 节点等于 p 节点,所以不进行任何操作,然后 peek 操作会返回 null。
当队列中至少有一个元素时(假设只有一个),队列状态如下:
文章图片
这时候执行代码(5), p 指向了 q 节点,然后执行代码(3),此时队列状态如下:
文章图片
执行代码(3)时发现 item 不为 null,所以执行 updateHead 方法,由于
h != p
,所以设置头节点,设置后队列状态如下:文章图片
最后剔除哨兵节点。
总结:peek 操作的代码与 poll 操作类似,只是前者只获取队列头元素但是并不从队列里将它删除,而后者获取后需要从队列里面将它删除。另外,在第一次调用 peek 操作时,会删除哨兵节点,并让队列的 head 节点指向队列里面第一个元素或者为 null。
size 操作 计算当前队列元素个数,在并发环境下不是很有用,因为 CAS 没有加锁,所以从调用 size 方法到返回结果期间有可能增删元素,导致统计的元素个数不精确。
public int size() {
int count = 0;
for (Node p = first();
p != null;
p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
//获取第一个队列元素,(剔除哨兵节点),没有则为null
Node first() {
restartFromHead:
for (;
;
) {
for (Node h = head, p = h, q;
;
) {
boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) {
updateHead(h, p);
return hasItem ? p : null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
remove 操作 如果队列里面存在该元素则删除该元素,如果存在多个则删除第一个,并返回 true,否则返回 false。
public boolean remove(Object o) {
if (o != null) {
Node next, pred = null;
for (Node p = first();
p != null;
pred = p, p = next) {
boolean removed = false;
E item = p.item;
if (item != null) {
// 若不匹配,则获取next节点继续匹配
if (!o.equals(item)) {
next = succ(p);
continue;
}
//相等则使用CAS设置为null
removed = p.casItem(item, null);
}
// 获取删除节点的后继节点
next = succ(p);
//如果有前驱节点,并且next节点不为null,则链接前驱节点到next节点
if (pred != null && next != null) // unlink
// 将被删除的节点通过CAS移除队列
pred.casNext(p, next);
if (removed)
return true;
}
}
return false;
}
contains 操作 判断队列里面是否含有指定对象,由于是遍历整个队列,所以像 size 操作一样结果也不是那么精确,有可能调用该方法时元素还在队列里面,但是遍历过程中其他线程才把该元素删除了,那么就会返回 false。
public boolean contains(Object o) {
if (o == null) return false;
for (Node p = first();
p != null;
p = succ(p)) {
E item = p.item;
if (item != null && o.equals(item))
return true;
}
return false;
}
总结 【Java|Java JUC ConcurrentLinkedQueue解析】ConcurrentLinkedQueue 的底层使用单向链表数据结构来保存队列元素,每个元素被包装成一个 Node 节点。队列是靠头、尾节点来维护的,创建队列时头、尾节点指向一个 item 为 null 的哨兵节点。第一次执行 peek 或者 first 操作时会把 head 指向第一个真正的队列元素。由于使用非阻塞 CAS 算法,没有加锁,所以在计算 size 时有可能进行了 offer、poll 或者 remove 操作,导致计算的元素个数不精确,所以在并发情况下 size 方法不是很有用。
推荐阅读
- JAVA(抽象类与接口的区别&重载与重写&内存泄漏)
- 事件代理
- Java|Java OpenCV图像处理之SIFT角点检测详解
- java中如何实现重建二叉树
- 数组常用方法一
- 【Hadoop踩雷】Mac下安装Hadoop3以及Java版本问题
- Java|Java基础——数组
- RxJava|RxJava 在Android项目中的使用(一)
- java之static、static|java之static、static final、final的区别与应用
- Java基础-高级特性-枚举实现状态机