108|108 AbstractQueuedSynchronizer 与 JMM

一、AbstractQueuedSynchronizer

队列同步器AbstractQueuedSynchronizer(以下简称同步器或AQS),是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。并发包的大师(Doug Lea)期望它能够成为实现大部分同步需求的基础
AQS使用方式
AQS的主要使用方式是继承,子类通过继承AQS并实现它的抽象方法来管理同步状态,在AQS里由一个int型的state来代表这个状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))来进行操作,因为它们能够保证状态的改变是安全的
  • 相关方法源码
/** * The synchronization state. */ private volatile int state; /** * Returns the current value of synchronization state. * This operation has memory semantics of a {@code volatile} read. * @return current state value */ protected final int getState() { return state; } /** * Sets the value of synchronization state. * This operation has memory semantics of a {@code volatile} write. * @param newState the new state value */ protected final void setState(int newState) { state = newState; } /** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual *value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }

  • 在实现上,子类推荐被定义为自定义同步组件的静态内部类,AQS自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)
  • 同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器。可以这样理解二者之间的关系:
  • 锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;
  • 同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注的领域。
  • 实现者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。
/** * *类说明:实现我们自己独占锁,不可重入 */ public class SelfLock implements Lock { // 静态内部类,自定义同步器 private static class Sync extends AbstractQueuedSynchronizer {/*判断处于占用状态*/ @Override protected boolean isHeldExclusively() { return getState()==1; }/*获得锁*/ @Override protected boolean tryAcquire(int arg) { if(compareAndSetState(0,1)){ setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }/*释放锁*/ @Override protected boolean tryRelease(int arg) { if(getState()==0){ throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); //compareAndSetState(1,0); return true; }// 返回一个Condition,每个condition都包含了一个condition队列 Condition newCondition() { return new ConditionObject(); } }// 仅需要将操作代理到Sync上即可 private final Sync sync = new Sync(); // 获取锁 public void lock() { System.out.println(Thread.currentThread().getName()+" ready get lock"); sync.acquire(1); System.out.println(Thread.currentThread().getName()+" already got lock"); } // 仅在调用时锁为空闲状态才获取该锁,可以响应中断 public boolean tryLock() { return sync.tryAcquire(1); }// 释放锁 public void unlock() { System.out.println(Thread.currentThread().getName()+" ready release lock"); sync.release(1); System.out.println(Thread.currentThread().getName()+" already released lock"); } // 返回绑定到此 Lock 实例的新 Condition 实例 public Condition newCondition() { return sync.newCondition(); }public boolean isLocked() { return sync.isHeldExclusively(); }public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } // 如果当前线程未被中断,则获取锁,可以响应中断 public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }// 如果锁在给定的等待时间内空闲,并且当前线程未被中断,则获取锁 public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } }

AQS采用了模板方法设计模式
  • 实现自定义同步组件时,将会调用同步器提供的模板方法
108|108 AbstractQueuedSynchronizer 与 JMM
文章图片
image.png
  • 可重写的方法

    108|108 AbstractQueuedSynchronizer 与 JMM
    文章图片
    image.png
  • 访问或修改同步状态的方法
    重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态。
    ?getState():获取当前同步状态。
    ?setState(int newState):设置当前同步状态。
    ?compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性。
CLH队列锁
CLH队列锁即Craig, Landin, and Hagersten (CLH) locks。
CLH队列锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程仅仅在本地变量上自旋,它不断轮询前驱的状态,假设发现前驱释放了锁就结束自旋。
  • 当一个线程需要获取锁时
  1. 创建一个的QNode,将其中的locked设置为true表示需要获取锁,myPred表示对其前驱结点的引用

    108|108 AbstractQueuedSynchronizer 与 JMM
    文章图片
    image.png
  2. 线程A对tail域调用getAndSet方法,使自己成为队列的尾部,同时获取一个指向其前驱结点的引用myPred

    108|108 AbstractQueuedSynchronizer 与 JMM
    文章图片
    image.png
    线程B需要获得锁,同样的流程再来一遍
  3. 线程就在前驱结点的locked字段上旋转,直到前驱结点释放锁(前驱节点的锁值 locked == false)
  4. 当一个线程需要释放锁时,将当前结点的locked域设置为false,同时回收前驱结点

    108|108 AbstractQueuedSynchronizer 与 JMM
    文章图片
    image.png
    如上图所示,前驱结点释放锁,线程A的myPred所指向的前驱结点的locked字段变为false,线程A就可以获取到锁。
    CLH队列锁的优点是空间复杂度低(如果有n个线程,L个锁,每个线程每次只获取一个锁,那么需要的存储空间是O(L+n),n个线程有n个myNode,L个锁有L个tail)。CLH队列锁常用在SMP体系结构下。
    Java中的AQS是CLH队列锁的一种变体实现。
ReentrantLock的实现
  • 锁的可重入
重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,该特性的实现需要解决以下两个问题。
1)线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
2)锁的最终释放。线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于0时表示锁已经成功释放。
3)nonfairTryAcquire方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。同步状态表示锁被一个线程重复获取的次数。
4)如果该锁被获取了n次,那么前(n-1)次tryRelease(int releases)方法必须返回false,而只有同步状态完全释放了,才能返回true。可以看到,该方法将同步状态是否为0作为最终释放的条件,当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功。
  • 公平和非公平锁
    1)ReentrantLock的构造函数中,默认的无参构造函数将会把Sync对象创建为NonfairSync对象,这是一个“非公平锁”;而另一个构造函数ReentrantLock(boolean fair)传入参数为true时将会把Sync对象创建为“公平锁”FairSync。
    2)nonfairTryAcquire(int acquires)方法,对于非公平锁,只要CAS设置同步状态成功,则表示当前线程获取了锁,而公平锁则不同。tryAcquire方法,该方法与nonfairTryAcquire(int acquires)比较,唯一不同的位置为判断条件多了hasQueuedPredecessors()方法,即加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。
/** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock.Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } /** * Sync object for fair locks */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); }/** * Fair version of tryAcquire.Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }

二、JMM
  • 根据《Jeff Dean在Google全体工程大会的报告》我们可以看到

    108|108 AbstractQueuedSynchronizer 与 JMM
    文章图片
    image.png
绝大多数的运算任务都不可能只靠处理器“计算”就能完成,处理器至少要与内存交互,如读取运算数据、存储运算结果等,这个I/O操作是基本上是无法消除的(无法仅靠寄存器来完成所有运算任务)。早期计算机中cpu和内存的速度是差不多的,但在现代计算机中,cpu的指令速度远超内存的存取速度,由于计算机的存储设备与处理器的运算速度有几个数量级的差距,所以现代计算机系统都不得不加入一层读写速度尽可能接近处理器运算速度的高速缓存(Cache)来作为内存与处理器之间的缓冲:将运算需要使用到的数据复制到缓存中,让运算能快速进行,当运算结束后再从缓存同步回内存之中,这样处理器就无须等待缓慢的内存读写了。
108|108 AbstractQueuedSynchronizer 与 JMM
文章图片
image.png
在计算机系统中,寄存器划是L0级缓存,接着依次是L1,L2,L3(接下来是内存,本地磁盘,远程存储)。越往上的缓存存储空间越小,速度越快,成本也更高;越往下的存储空间越大,速度更慢,成本也更低。从上至下,每一层都可以看做是更下一层的缓存,即:L0寄存器是L1一级缓存的缓存,L1是L2的缓存,依次类推;每一层的数据都是来至它的下一层,所以每一层的数据是下一层的数据的子集。
108|108 AbstractQueuedSynchronizer 与 JMM
文章图片
image.png
在现代CPU上,一般来说L0, L1,L2,L3都集成在CPU内部,而L1还分为一级数据缓存(Data Cache,D-Cache,L1d)和一级指令缓存(Instruction Cache,I-Cache,L1i),分别用于存放数据和执行数据的指令解码。每个核心拥有独立的运算处理单元、控制器、寄存器、L1、L2缓存,然后一个CPU的多个核心共享最后一层CPU缓存L3
Java内存模型(JMM)
【108|108 AbstractQueuedSynchronizer 与 JMM】从抽象的角度来看,JMM定义了线程和主内存之间的抽象关系:线程之间的共享变量存储在主内存(Main Memory)中,每个线程都有一个私有的本地内存(Local Memory),本地内存中存储了该线程以读/写共享变量的副本。本地内存是JMM的一个抽象概念,并不真实存在。它涵盖了缓存、写缓冲区、寄存器以及其他的硬件和编译器优化。

108|108 AbstractQueuedSynchronizer 与 JMM
文章图片
image.png
108|108 AbstractQueuedSynchronizer 与 JMM
文章图片
image.png
可见性
可见性是指当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。
由于线程对变量的所有操作都必须在工作内存中进行,而不能直接读写主内存中的变量,那么对于共享变量V,它们首先是在自己的工作内存,之后再同步到主内存。可是并不会及时的刷到主存中,而是会有一定时间差。很明显,这个时候线程 A 对变量 V 的操作对于线程 B 而言就不具备可见性了 。
要解决共享对象可见性这个问题,我们可以使用volatile关键字或者是加锁。
原子性
原子性:即一个操作或者多个操作 要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。
我们都知道CPU资源的分配都是以线程为单位的,并且是分时调用,操作系统允许某个进程执行一小段时间,例如 50 毫秒,过了 50 毫秒操作系统就会重新选择一个进程来执行(我们称为“任务切换”),这个 50 毫秒称为“时间片”。而任务的切换大多数是在时间片段结束以后,
那么线程切换为什么会带来bug呢?因为操作系统做任务切换,可以发生在任何一条CPU 指令执行完!注意,是 CPU 指令,CPU 指令,CPU 指令,而不是高级语言里的一条语句。比如count++,在java里就是一句话,但高级语言里一条语句往往需要多条 CPU 指令完成。其实count++包含了三个CPU指令!
三、volatile详解 volatile特性
可以把对volatile变量的单个读/写,看成是使用同一个锁对这些单个读/写操作做了同步

108|108 AbstractQueuedSynchronizer 与 JMM
文章图片
image.png 可以看成

108|108 AbstractQueuedSynchronizer 与 JMM
文章图片
image.png
所以volatile变量自身具有下列特性:
  • 可见性。对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入。
  • 原子性:对任意单个volatile变量的读/写具有原子性,但类似于volatile++这种复合操作不具有原子性。
    volatile虽然能保证执行完及时把变量刷到主内存中,但对于count++这种非原子性、多指令的情况,由于线程切换,线程A刚把count=0加载到工作内存,线程B就可以开始工作了,这样就会导致线程A和B执行完的结果都是1,都写到主内存中,主内存的值还是1不是2
volatile的实现原理
通过对OpenJDK中的unsafe.cpp源码的分析,会发现被volatile关键字修饰的变量会存在一个“lock:”的前缀
Lock前缀,Lock不是一种内存屏障,但是它能完成类似内存屏障的功能。Lock会对CPU总线和高速缓存加锁,可以理解为CPU指令级的一种锁。
同时该指令会将当前处理器缓存行的数据直接写会到系统内存中,且这个写回内存的操作会使在其他CPU里缓存了该地址的数据无效。

    推荐阅读