原子变量 【Java高级|原子变量、无锁算法和非阻塞机制】锁使用起来比较方法,但是它存在一些问题:
- 性能问题。尽管现代JVM对于锁进行了很多优化,但是在多线程下,线程上下文切换仍需要os的支持,这部分开销始终是无法避免的。在计算任务比较断的时候,线程上下文切换将占据极大的部分,发生激烈的竞争,影响性能。换句话来说,锁太重了。
- 活跃性问题。一个线程失败,可能引起所有线程阻塞。
有没有一种粒度更细,类似于volatile,又支持原子更新操作的机制呢?
原子变量运而生。它是对于常规变量的一种封装,利用底层native接口,提供了变量上的原子一系列复合操作,例如i++,CAS等。具体参考CAS和原子类。原子变量本质上是用硬件实现的“测试和更新”复合原子操作,并且,在大量精巧的非阻塞并发程序中,核心就是将一致性问题一步步缩小范围到某一组原子变量上面。
非阻塞算法设计
如果在算法的每个步骤中都存在某个线程能够执行下去,这种算法就被称为无锁(Lock-Free)算法。不管是非阻塞算法还是无锁算法,都是针对锁的活跃性问题衍生出来的概念。这种算法通常比锁更难设计,核心就是将一致性问题一步步缩小范围到某一组原子变量上面。
那么,如何利用原理变量设计更好的非阻塞机制呢?我们来看一些案例。
非阻塞计数器:
@ThreadSafe
public class CasCounter {
private SimulatedCAS value;
public int getValue() {
return value.get();
}public int increment() {
int v;
do{
v = value.get();
// CAS更新,避免更新丢失
} while (v != value.compareAndSwap(v, v + 1));
return v + 1;
}
}
维持复合不变性条件(涉及多个变量):
- 代码中必须保证lower
- 由于涉及到两个变量,两个变量必须同时更新
- 防止丢失修改,采用CAS
public class CasNumberRange {
// 1. 保证复合不变性条件,lower和upper必须作为一个整体更新
@Immutable
private static class IntPair {
final int lower;
// Invariant: lower <= upper
final int upper;
}private final AtomicReference values = new AtomicReference(new IntPair(0, 0));
public int getLower() {
return values.get().lower;
}public int getUpper() {
return values.get().upper;
}public void setLower(int i) {
// 2. CAS防止更新丢失
while (true) {
IntPair oldv = values.get();
if (i > oldv.upper)
throw new IllegalArgumentException(
"Can't set lower to " + i + " > upper");
IntPair newv = new IntPair(i, oldv.upper);
if (values.compareAndSet(oldv, newv))
return;
// similarly for setUpper
}
}
}
随机数生成算法
- 随机数生成依赖上一个状态
@ThreadSafe
public class ReentrantLockPseudoRandom extends PseudoRandom {
private final Lock lock = new ReentrantLock(false);
private int seed;
ReentrantLockPseudoRandom(int seed) {
this.seed = seed;
}public int nextInt(int n) {
lock.lock();
try {
int s = seed;
seed = calculateNext(s);
int remainder = s % n;
return remainder > 0 ? remainder : remainder + n;
} finally {
lock.unlock();
}
}
}@ThreadSafe
public class AtomicPseudoRandom extends PseudoRandom {
private AtomicInteger seed;
AtomicPseudoRandom(int seed) {
this.seed = new AtomicInteger(seed);
}public int nextInt(int n) {
while (true) {
int s = seed.get();
int nextSeed = calculateNext(s);
if (seed.compareAndSet(s, nextSeed)) {
int remainder = s % n;
return remainder > 0 ? remainder : remainder + n;
}
}
}
}
这边我们可以研究一下CAS方式的可伸缩性,讨论一下CAS和Lock怎么选用。
文章图片
可以看到,在竞争非常激烈的时候,CAS(Atomic)的方式实际上比锁还慢,这是因为竞争激烈的时候产生了大量的自旋。但是实际情况不会是这样——不会有程序除了竞争锁啥也不干,因此CAS的可伸缩性要更接近第二个图的情况,通常要好得多。
但讨论到这里,还没说完全,CAS和Lock的选用,主要还是要考虑自旋的开销:**自旋的时间和竞争的激烈程度,和获得锁的时间都有关系(例如io设备的锁,可能会导致长时间的自旋)。**所以,如果持有锁进行耗时操作,那么使用lock也没什么不好,lock的也更容易保证正确性,即使会有性能损耗。
非阻塞的栈 栈基于头插法,头插法的非阻塞算法比较好实现,可以对比下边链表的尾插法的实现。
@ThreadSafe
public class ConcurrentStack {
AtomicReference> top = new AtomicReference>();
public void push(E item) {
Node newHead = new Node(item);
Node oldHead;
// 自旋CAS保证push不丢失
do {
oldHead = top.get();
newHead.next = oldHead;
} while (!top.compareAndSet(oldHead, newHead));
}public E pop() {
Node oldHead;
Node newHead;
// 自旋CAS保证pop不丢失
do {
oldHead = top.get();
if (oldHead == null)
return null;
newHead = oldHead.next;
} while (!top.compareAndSet(oldHead, newHead));
return oldHead.item;
}private static class Node {
public final E item;
public Node next;
public Node(E item) {
this.item = item;
}
}
}
非阻塞的链表 链表的操作更加复杂,例如中间插入,那么需要保证两个指针更新操作的原子性;又或者,链表的尾部插入也需要更新两个指针(连接新节点,移动尾部指针)。这些用CAS来设计都相对比较复杂。
@ThreadSafe
public class LinkedQueue {
private static class Node {
final E item;
final AtomicReference> next;
public Node(E item, Node next) {
this.item = item;
this.next = new AtomicReference>(next);
}
}private final Node dummy = new Node(null, null);
private final AtomicReference> head = new AtomicReference>(dummy);
private final AtomicReference> tail = new AtomicReference>(dummy);
public boolean put(E item) {
Node newNode = new Node(item, null);
// 自旋CAS防止更新丢失
while (true) {
Node curTail = tail.get();
Node tailNext = curTail.next.get();
if (curTail == tail.get()) {
if (tailNext != null) {
// 帮助推进尾部指针
// Queue in intermediate state, advance tail
tail.compareAndSet(curTail, tailNext);
} else {
// 插入新节点
// In quiescent state, try inserting new node
if (curTail.next.compareAndSet(null, newNode)) {
// 推进尾部指针
// Insertion succeeded, try advancing tail
tail.compareAndSet(curTail, newNode);
return true;
}
}
}
}
}
}
上述是一个尾插法的一个设计实例,中间插入等都是类似的。上述实现看起来比较难懂,实际上它本质上的操作如下:
while(true) {
oldNode = tail;
// 如果有其他线程在这里插入新的尾部节点,则这里会更新失败,重试
if (tail.next.CAS(null, newNode))
tail.CAS(oldNode, newNode)
return
}
上述代码的区别主要是进行了一个小优化:如果当前线程发现取到的tail.next非空,说明有人插入了新的尾节点,这时候它会帮助对方推进尾部指针,加快自己插入的效率,不需要等待对方推进完成才能插入。
推荐阅读
- 数据结构|七大常用排序
- Java工程师高薪训练营爱分享
- 拉钩Java工程师就业急训营
- 疯了!Spring 再爆惊天大漏洞。。
- 十大排序|十大排序--计数排序
- ADS2 Lab
- CS 120 Modul
- 数据库|从 SQL Server 到 MySQL (一)(异构数据库迁移)
- mysql|7、mysql的redo log、bin log日志