#|【JUC系列】并发容器之ConcurrentLinkedQueue(JDK1.8版)

ConcurrentLinkedQueue
文章目录

  • ConcurrentLinkedQueue
    • 类图
    • 组成
      • 数据结构
      • 内部类
      • 类的属性
      • 类的构造函数
      • 核心方法
        • 入队列 boolean offer(E e)
        • 出队列 E poll()
        • 移除操作remove(Object o)
        • first()
        • isEmpty()
        • size()
    • 状态说明

类图 #|【JUC系列】并发容器之ConcurrentLinkedQueue(JDK1.8版)
文章图片

组成 数据结构
【#|【JUC系列】并发容器之ConcurrentLinkedQueue(JDK1.8版)】#|【JUC系列】并发容器之ConcurrentLinkedQueue(JDK1.8版)
文章图片

一个头结点和一个尾结点
每个结点Node包含item和下一个结点next引用,默认情况下,tail等于head,元素为null。
内部类
private static class Node { // 元素 volatile E item; // 下一个结点引用 volatile Node next; /** * Constructs a new node.Uses relaxed write because item can * only be seen after publication via casNext. */ // 构造函数 Node(E item) { // 设置item的值 UNSAFE.putObject(this, itemOffset, item); }// 设置item的值 boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); }void lazySetNext(Node val) { // 设置next域的值,并不会保证修改对其他线程立即可见 UNSAFE.putOrderedObject(this, nextOffset, val); }// 比较并替换next域的值 boolean casNext(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); }// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE; // item域的偏移量 private static final long itemOffset; // next域的偏移量 private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class k = Node.class; itemOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } }

类的属性
// 链表首结点 private transient volatile Node head; // 链表尾结点 private transient volatile Node tail;

类的构造函数
默认无参构造函数,创建一个空的ConcurrentLinkedQueue。
public ConcurrentLinkedQueue() { // 初始化头结点与尾结点 head = tail = new Node(null); }

将给的集合c转换成ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。
public ConcurrentLinkedQueue(Collection c) { Node h = null, t = null; for (E e : c) { //遍历c集合 // 检查元素不为null checkNotNull(e); // 新建node结点 Node newNode = new Node(e); // 首结点为空 if (h == null) h = t = newNode; else { // 尾插法 // 直接头结点的next域 t.lazySetNext(newNode); // 重新赋值头结点 t = newNode; } } // 头结点为null if (h == null) // 新结头结点与尾结点 h = t = new Node(null); head = h; tail = t; }

核心方法
方法名 描述
boolean add(E e) 在此队列的尾部插入指定元素。 由于队列是无界的,因此该方法永远不会抛出IllegalStateException或返回false
void updateHead(Node h, Node p) 尝试 CAS 前往 p。 如果成功,将旧头重新指向自身作为succ()的哨兵,如下所示。
Node succ(Node p) 如果p.next已链接到self,则返回p的后继结点或头结点,这仅在使用现在不在列表中的过时指针遍历时才为真。
boolean offer(E e) 在此队列的尾部插入指定元素。 由于队列是无界的,因此此方法永远不会返回false
E poll() 此方法移除并返回此ConcurrentLinkedQueue的头部;如果此队列为空,则返回null。
E peek() 此方法返回此ConcurrentLinkedQueue的头部,而不删除它
Node first() 返回列表中的第一个活动(未删除)结点,如果没有,则返回 null。 这是 poll/peek 的另一种变体; 这里返回第一个结点,而不是元素。 我们可以让 peek() 成为 first() 的包装器,但这会花费额外的 volatile 读取项,并且需要添加一个重试循环来处理输给并发 poll() 的竞争的可能性。
int size() 返回此队列中的元素数。 如果此队列包含多个 Integer.MAX_VALUE 元素,则返回 Integer.MAX_VALUE。

请注意,与大多数集合不同,此方法不是恒定时间操作。 由于这些队列的异步特性,确定当前元素的数量需要 O(n) 遍历。 此外,如果在此方法执行期间添加或删除元素,则返回的结果可能不准确。 因此,这种方法在并发应用程序中通常不是很有用。
boolean remove(Object o) 从此队列中移除指定元素的单个实例(如果存在)。 更正式地说,如果该队列包含一个或多个这样的元素,则删除一个元素 e 使得 o.equals(e)。 如果此队列包含指定的元素(或等效地,如果此队列因调用而更改),则返回 true。
入队列 boolean offer(E e)
public boolean offer(E e) { // 元素不为null checkNotNull(e); // 新建一个结点 final Node newNode = new Node(e); // 无限循环遍历链表 // 开启一个从tail指针开始遍历的循环,p指向当前疑似尾结点; for (Node t = tail, p = t; ; ) { // 初始化q=p的下一个结点; Node q = p.next; // 如果q==null,说明此刻p确实是尾结点,新结点应该插到后面; if (q == null) { // p is last node if (p.casNext(null, newNode)) {// CAS将新结点链接到p结点上 //首次添加时,p 等于t,不进行尾结点更新,所以所尾结点存在滞后性 //并发环境,可能存添加/删除,tail就更难保证正确指向最后结点。 if (p != t) // 和poll方法一样,如果插入的地方不是在tail处,才往后移,这个行为可以保证,tail后面最多还有一个结点; // p不等t,CAS替换新结点为尾结点 casTail(t, newNode); // cas操作安全移动tail指针。 return true; } // Lost CAS race to another thread; re-read next } else if (p == q) //p==q,也即p的next字段指向自身,说明p是一个脱离了链表的结点,需要找一个结点重新开始遍历。 // 原来的尾结点与现在的尾结点是否相等,若相等,则p为head,否则,赋值为现在的尾结点 // 当tail不执行最后结点时,如果执行出列操作,很有可能将tail也给移除了 // 此时需要对tail结点进行复位,复位到head结点 // 从head重新开始肯定正确,但是如果tail指针有更新过,那么从tail开始大概率可能效率更高。 p = (t != (t = tail)) ? t : head; else // Check for tail updates after two hops.按道理直接p=q跳到下一个结点就Ok了,但是代码里面做了这个判断(p != t && t != (t = tail)),如果p已经正常往后移动了一次,且tail发生了变化,那么从新的tail重新开始。为什么要加个(p!=t)的前置判断呢?我认为是为了提高效率,因为tail是valotile变量,读取有一定代价,当p==t的时候,作者认为再往后跳一下成功的概率挺高。 p = (p != t && t != (t = tail)) ? t : q; } }

入队列主要考虑两件事:
  1. 新增新结点,设置为当前未结点的next
  2. 是否更新tail结点,若tail的next不为空,则将入队列的结点变为新的tail结点;若tail的next为空,则不更新tail结点。
    通过减少更新tail的次数,提高入队效率
    #|【JUC系列】并发容器之ConcurrentLinkedQueue(JDK1.8版)
    文章图片

出队列 E poll()
public E poll() { restartFromHead: for (; ; ) { // 开始一个类似遍历的for循环,初始化h,p为head结点,p是当前检查的结点,q是下一个结点。 for (Node h = head, p = h, q; ; ) { E item = p.item; // 如果p结点的数据不为空,那么p肯定就是真正的第一个结点,只要能将结点的数据字段(item)置为null,出队列就算成功);p.casItem(item, null)操作保证了并发poll的安全,如果有其他线程抢先一步执行成功,那么casItem操作会失败。 if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. // p == h,说明刚出队列的p就是遍历开始的第一个结点,有2种情况 // 1)没有并发poll,head指向了空结点,队列里面只有且仅有一个空结点,此时没有必要做head移动; // 2)有并发poll,p==h说明本线程在竞争中领先,那么交由那个落后的线程来做head移动。 if (p != h) // hop two nodes at a time // 如果p.next==null,说明p是尾结点,链表空了,head也指向p;否则head指向p.next。两种情况都是正确的。 updateHead(h, ((q = p.next) != null) ? q : p); return item; } // 无论p的item字段为null,还是在并发中竞争失败,我们都要往后遍历; 于是q=p.next,如果q==null,说明链表空了,此时将head置为尾结点;当然,如果h==p是不需要执行的,updateHead里面有这个判断。 else if ((q = p.next) == null) { updateHead(h, p); return null; } // p==q实际上就是p==p.next,也即p的next指向自身,这说明p结点已经和链表断开了(参考第一结总结的结点属性),这种情况下,只能重新开始poll。 else if (p == q) continue restartFromHead; else // 正常迭代下一个结点。 p = q; } } }

将首结点的指针从h转移到p,就是将节点h出队列,一旦出队列成功,就将h的next字段指向自身,防止那些出队列的节点仍然互相链接,妨碍垃圾收集。
updateHead(Node h, Node p)
final void updateHead(Node h, Node p) { if (h != p && casHead(h, p)) // h的next指向自己,从链表中断开 h.lazySetNext(h); }

h.lazySetNext(h)操作没有volatile的语义,有可能对其他线程暂时不可见。 假设poll方法执行到((q = p.next) == null)时,p节点已经被另外一个线程出队列了,但是本线程不知道,那么(p == q)的判断也可能失败,最终执行到 p = q,这种情况下程序仍然是正确的,不过会多遍历一些节点。(只能认为这种情况造成的性能损失低于volatile造成的性能损失)。
#|【JUC系列】并发容器之ConcurrentLinkedQueue(JDK1.8版)
文章图片

移除操作remove(Object o)
public boolean remove(Object o) { if (o != null) { Node next, pred = null; for (Node p = first(); p != null; pred = p, p = next) { boolean removed = false; E item = p.item; // 判断当前是不是空结点。 if (item != null) { // 如果结点数据与输入参数相等,说明这是一个需要移除的结点,否则应该跳到下一个结点; if (!o.equals(item)) { // 结点数据不相等,取出当前结点p的下一个结点,然后重新开始循环; next = succ(p); continue; } // 结点数据相等,尝试清空数据;如果别的线程并发执行remove或poll,这一步操作可能失败; removed = p.casItem(item, null); } // 执行到这一行说明,p是空结点,(本来就是空,或在L5被清空);取出next结点; next = succ(p); // 由于p是空结点,尝试将p的前继和后继相连; if (pred != null && next != null) // unlink pred.casNext(p, next); if (removed) return true; } } return false; }

first()
Node first() { restartFromHead: // 自旋 for (; ; ) { // 开始一个类似遍历的for循环,初始化h,p为head结点,p是当前检查的结点,q是下一个结点。 for (Node h = head, p = h, q; ; ) { // p中是否存在元素 boolean hasItem = (p.item != null); // 若有元素或next为null则更新首结点 if (hasItem || (q = p.next) == null) { updateHead(h, p); // 若结点中有元素则返回此结点,链表中存在包含元素的结点 // 若结点中没有元素则返回null,链表中没有实际数据。 return hasItem ? p : null; } // p==q实际上就是p==p.next,也即p的next指向自身,这说明p结点已经和链表断开了,这种情况下,只能重新开始first。 else if (p == q) continue restartFromHead; else // 正常迭代下一个结点。 p = q; } } }

isEmpty()
public boolean isEmpty() { // 通过first的返回值判断链表是否存在实际数据 return first() == null; }

size()
public int size() { int count = 0; // 从实际的首结点开始遍历链表,计算链表中实际包含数据的结点数 for (Node p = first(); p != null; p = succ(p)) if (p.item != null) // Collection.size() spec says to max out // 链表最大值是Integer.MAX_VALUE if (++count == Integer.MAX_VALUE) break; return count; }

如果 p.next 已链接到 self,则返回首结点。否则返回下一个结点。
final Node succ(Node p) { Node next = p.next; return (p == next) ? head : next; }

状态说明 在整个生命周期内,算法保持以下属性:
  1. 链表里面至少会有一个节点,数据字段为null的节点是空节点;
  2. 顺着HEAD指针肯定能找到的真正的头节点,并能访问到所有节点;
  3. TAIL指针不一定指向有效节点,更不能保证指向真正的尾节点,但是它大部分情况下指向尾节点或接近尾节点,因此可以提高效率;
  4. 和队列断开的节点,next字段指向自身;
  5. 对于入了队列的节点(哪怕又出了队列),只有尾节点的next字段才等于null。
#|【JUC系列】并发容器之ConcurrentLinkedQueue(JDK1.8版)
文章图片
#|【JUC系列】并发容器之ConcurrentLinkedQueue(JDK1.8版)
文章图片

#|【JUC系列】并发容器之ConcurrentLinkedQueue(JDK1.8版)
文章图片

#|【JUC系列】并发容器之ConcurrentLinkedQueue(JDK1.8版)
文章图片

参考
作者:longhuihu 链接:https://www.jianshu.com/p/ce6108e4b2c4

    推荐阅读