Java高级|原子变量、无锁算法和非阻塞机制

原子变量 【Java高级|原子变量、无锁算法和非阻塞机制】锁使用起来比较方法,但是它存在一些问题:

  • 性能问题。尽管现代JVM对于锁进行了很多优化,但是在多线程下,线程上下文切换仍需要os的支持,这部分开销始终是无法避免的。在计算任务比较断的时候,线程上下文切换将占据极大的部分,发生激烈的竞争,影响性能。换句话来说,锁太重了。
  • 活跃性问题。一个线程失败,可能引起所有线程阻塞。
volatile显然是一种非常轻量的同步操作,它不会引起上下文切换,但它无法支持复合操作,例如i++,看起来是一条指令,实际上是三条字节码指令。所以它也无法支持“测试并更新”这种在并发世界里极为重要但复合操作。
有没有一种粒度更细,类似于volatile,又支持原子更新操作的机制呢?
原子变量运而生。它是对于常规变量的一种封装,利用底层native接口,提供了变量上的原子一系列复合操作,例如i++,CAS等。具体参考CAS和原子类。原子变量本质上是用硬件实现的“测试和更新”复合原子操作,并且,在大量精巧的非阻塞并发程序中,核心就是将一致性问题一步步缩小范围到某一组原子变量上面。
非阻塞算法设计
如果在算法的每个步骤中都存在某个线程能够执行下去,这种算法就被称为无锁(Lock-Free)算法。
不管是非阻塞算法还是无锁算法,都是针对锁的活跃性问题衍生出来的概念。这种算法通常比锁更难设计,核心就是将一致性问题一步步缩小范围到某一组原子变量上面。
那么,如何利用原理变量设计更好的非阻塞机制呢?我们来看一些案例。
非阻塞计数器:
@ThreadSafe public class CasCounter { private SimulatedCAS value; public int getValue() { return value.get(); }public int increment() { int v; do{ v = value.get(); // CAS更新,避免更新丢失 } while (v != value.compareAndSwap(v, v + 1)); return v + 1; } }

维持复合不变性条件(涉及多个变量):
  • 代码中必须保证lower
  • 由于涉及到两个变量,两个变量必须同时更新
  • 防止丢失修改,采用CAS
public class CasNumberRange { // 1. 保证复合不变性条件,lower和upper必须作为一个整体更新 @Immutable private static class IntPair { final int lower; // Invariant: lower <= upper final int upper; }private final AtomicReference values = new AtomicReference(new IntPair(0, 0)); public int getLower() { return values.get().lower; }public int getUpper() { return values.get().upper; }public void setLower(int i) { // 2. CAS防止更新丢失 while (true) { IntPair oldv = values.get(); if (i > oldv.upper) throw new IllegalArgumentException( "Can't set lower to " + i + " > upper"); IntPair newv = new IntPair(i, oldv.upper); if (values.compareAndSet(oldv, newv)) return; // similarly for setUpper } } }

随机数生成算法
  • 随机数生成依赖上一个状态
@ThreadSafe public class ReentrantLockPseudoRandom extends PseudoRandom { private final Lock lock = new ReentrantLock(false); private int seed; ReentrantLockPseudoRandom(int seed) { this.seed = seed; }public int nextInt(int n) { lock.lock(); try { int s = seed; seed = calculateNext(s); int remainder = s % n; return remainder > 0 ? remainder : remainder + n; } finally { lock.unlock(); } } }@ThreadSafe public class AtomicPseudoRandom extends PseudoRandom { private AtomicInteger seed; AtomicPseudoRandom(int seed) { this.seed = new AtomicInteger(seed); }public int nextInt(int n) { while (true) { int s = seed.get(); int nextSeed = calculateNext(s); if (seed.compareAndSet(s, nextSeed)) { int remainder = s % n; return remainder > 0 ? remainder : remainder + n; } } } }

这边我们可以研究一下CAS方式的可伸缩性,讨论一下CAS和Lock怎么选用。
Java高级|原子变量、无锁算法和非阻塞机制
文章图片

可以看到,在竞争非常激烈的时候,CAS(Atomic)的方式实际上比锁还慢,这是因为竞争激烈的时候产生了大量的自旋。但是实际情况不会是这样——不会有程序除了竞争锁啥也不干,因此CAS的可伸缩性要更接近第二个图的情况,通常要好得多。
但讨论到这里,还没说完全,CAS和Lock的选用,主要还是要考虑自旋的开销:**自旋的时间和竞争的激烈程度,和获得锁的时间都有关系(例如io设备的锁,可能会导致长时间的自旋)。**所以,如果持有锁进行耗时操作,那么使用lock也没什么不好,lock的也更容易保证正确性,即使会有性能损耗。
非阻塞的栈 栈基于头插法,头插法的非阻塞算法比较好实现,可以对比下边链表的尾插法的实现。
@ThreadSafe public class ConcurrentStack { AtomicReference> top = new AtomicReference>(); public void push(E item) { Node newHead = new Node(item); Node oldHead; // 自旋CAS保证push不丢失 do { oldHead = top.get(); newHead.next = oldHead; } while (!top.compareAndSet(oldHead, newHead)); }public E pop() { Node oldHead; Node newHead; // 自旋CAS保证pop不丢失 do { oldHead = top.get(); if (oldHead == null) return null; newHead = oldHead.next; } while (!top.compareAndSet(oldHead, newHead)); return oldHead.item; }private static class Node { public final E item; public Node next; public Node(E item) { this.item = item; } } }

非阻塞的链表 链表的操作更加复杂,例如中间插入,那么需要保证两个指针更新操作的原子性;又或者,链表的尾部插入也需要更新两个指针(连接新节点,移动尾部指针)。这些用CAS来设计都相对比较复杂。
@ThreadSafe public class LinkedQueue { private static class Node { final E item; final AtomicReference> next; public Node(E item, Node next) { this.item = item; this.next = new AtomicReference>(next); } }private final Node dummy = new Node(null, null); private final AtomicReference> head = new AtomicReference>(dummy); private final AtomicReference> tail = new AtomicReference>(dummy); public boolean put(E item) { Node newNode = new Node(item, null); // 自旋CAS防止更新丢失 while (true) { Node curTail = tail.get(); Node tailNext = curTail.next.get(); if (curTail == tail.get()) { if (tailNext != null) { // 帮助推进尾部指针 // Queue in intermediate state, advance tail tail.compareAndSet(curTail, tailNext); } else { // 插入新节点 // In quiescent state, try inserting new node if (curTail.next.compareAndSet(null, newNode)) { // 推进尾部指针 // Insertion succeeded, try advancing tail tail.compareAndSet(curTail, newNode); return true; } } } } } }

上述是一个尾插法的一个设计实例,中间插入等都是类似的。上述实现看起来比较难懂,实际上它本质上的操作如下:
while(true) { oldNode = tail; // 如果有其他线程在这里插入新的尾部节点,则这里会更新失败,重试 if (tail.next.CAS(null, newNode)) tail.CAS(oldNode, newNode) return }

上述代码的区别主要是进行了一个小优化:如果当前线程发现取到的tail.next非空,说明有人插入了新的尾节点,这时候它会帮助对方推进尾部指针,加快自己插入的效率,不需要等待对方推进完成才能插入。

    推荐阅读