源码解读AQS中锁的获取与释放

1、前言 AQS(AbstractQueuedSynchronizer)是java.util.concurrent的基础。也是Doug Lea大神为广大java开发作出的卓越贡献。J.U.C中的工具类如Semaphore、CountDownLatch、ReentrantLock、ReentrantReadWriteLock等都极大程度依赖了AQS.

源码解读AQS中锁的获取与释放
文章图片
Paste_Image.png
【源码解读AQS中锁的获取与释放】我们就简单的看一下ReentrantLock的具体实现。

public void lock() {sync.lock(); } public void unlock() {sync.release(1); } public Condition newCondition() {return sync.newCondition(); } public boolean isLocked() {return sync.isLocked(); } public int getHoldCount() {return sync.getHoldCount(); }

对的,你没有看错,这些工具类都是这样凭借一个Sync的内部类做出的实现。而这样一个内部类继承了AQS,由此可见AQS对于J.U.S来说是基石般的存在,本文也将通过ReentrantLock带领大家深入的了解AQS。
2、AQS的简介 AQS的设计思路和原理等我高大上知识这里就不涉及了。想要了解的可以阅读Doug Lea大师的对这一部分的解读。大师原著
当然,笔者也大力推荐读者能把每个类前面的注释都能读一读,在研究类的源码之前能够对类有一个系统的视图。
如果大体的看一下AQS的话就能发现这个类有三个非常重要的属性.
private transient volatile Node head; private transient volatile Node tail; private volatile int state;

看到这里我们大胆的做一个猜想(其实我都已经知道了)
  • AQS维护了一个队列,并记录队列的头节点和尾节点
  • 队列中的节点应该是因请求资源而阻塞的线程
  • AQS同样维护了一个状态,这个状态应该是判断线程能否获取到锁的依据,如果不能,就加入到队列
接下来我们来看看这个Node具体是如何实现的.
volatile Node prev; //此节点的前一个节点。 volatile Node next; //此节点的后一个节点 volatile Thread thread; 节点绑定的线程。 volatile int waitStatus; //节点的等待状态,一个节点可能位于以下几种状态//该状态表示节点超时或被中断就会被踢出队列 static final int CANCELLED =1; //表示节点的继任节点需要成为BLOCKED状态(例如通过LockSupport.park()操作),因此一个节点一旦被释放(解锁)或者取消就需要唤醒他的继任节点. static final int SIGNAL= -1; //表明节点对应的线程因为不满足一个条件(Condition)而被阻塞。 static final int CONDITION = -2;

可以看出,队列是一个双向队列,并且队列中的节点有几种可以选择的状态值
再看的话就能看到AQS中定义的几个重要的放方法
public final void acquire(int arg); //请求获取独占式资源(锁) public final boolean release(int arg); //请求释放独占式资源(锁) public final void acquireShared(int arg); //请求获取共享式资源 public final boolean releaseShared(int arg); //请求释放共享式资源

//独占方式。尝试获取资源,成功则返回true,失败则返回false protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } //独占方式。尝试释放资源,成功则返回true,失败则返回false。 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } //共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } protected int tryReleaseShared(int arg) { throw new UnsupportOperationException(); }

可以看到AQS用acquire()和release()方法提供对资源的获取和释放
但是try**()结构的方法都是只抛出了异常,很显然这类方法是需要子类去实现的.
这也因为AQS定义了两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可以同时执行,如Semaphone/CountDownLatch), AQS负责获取资源(修改state的状态),而自定义同步器负责就要实现上述方法告诉AQS获取资源的规则.
两个重要的状态 1、AQS的state
  • state可以理解有多少线程获取了资源,即有多少线程获取了锁,初始时state=0表示没有线程获取锁.
  • 独占锁时,这个值通常为1或者0
  • 如果独占锁可重入时,即一个线程可以多次获取这个锁时,每获取一次,state就加1
  • 一旦有线程想要获得锁,就可以通过对state进行CAS增量操作,即原子性的增加state的值
  • 其他线程发现state不为0,这时线程已经不能获得锁(独占锁),就会进入AQS的队列中等待.
  • 释放锁是仍然是通过CAS来减小state的值,如果减小到0就表示锁完全释放(独占锁)
2、Node 中的waitStatus
  • Node的正常状态是0
  • 对于处在队列中的节点来说,前一个节点有唤醒后一个节点的任务
  • 所以对与当前节点的前一个节点来说
  1. 如果waitStatus > 0, 则节点处于cancel状态,应踢出队列
  2. 如果waitStatus = 0, 则将waitStatus改为-1(signal)
  • 因此队列中节点的状态应该为-1,-1,-1,0
源码解读 这一块开始解读源码的实现部分, 仍然只关心上面提到的几种方法.
1、acquire(int) 此方法是AQS实现独占式资源获取的顶层方法,这个方法和reentrantLock.lock()等有着相同的语义.下面我们开始看源码
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

这个函数共调用了4个方法, 其中tryAcquire(arg)是在子类Sync中实现, 其余在均是AQS中提供.
而这个方法的流程比较简单
  • tryAcquire()尝试获取资源,如果成功, 则方法结束, 否则
  • addWaiter()方法以独占方式将线程加入队列的尾部
  • acquireQueued()方法是线程在等待队列中等待获取资源
  • selfInterrupt(), 如果线程在等待过程中被中断过,在这里相应中断. 应该知道的是,线程在等待过程中是不响应中断的, 只有获取资源后才能自我中断.
    看不懂不要紧,你已经知道了acquire()的大致过程,下面我们一一解读这其中的4个方法
流程1、tryAcquire() 此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。tryAcquire()方法前面已经说过,这个方法是在子类中是实现的. 而在ReentrantLock中,这个方法也正是tryLock()的语义.如下是ReentrantLock对tryAcquire()实现的源码:
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {//等于0表示当前锁未被其他线程获取到 if (!hasQueuedPredecessors() //检查队列中是否有线程在当前线程的前面 && compareAndSetState(0, acquires)) {//CAS操作state,锁获取成功 setExclusiveOwnerThread(current); //设置当前线程为占有锁的线程 return true; } } else if (current == getExclusiveOwnerThread()) {//非0,锁已经被获取,并且是当前线程获取.支持可重入锁 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); //更改状态位, return true; } return false; //未能获取锁 }

public final boolean hasQueuedPredecessors() { Node t = tail; Node h = head; Node s; return h != t &&((s = h.next) == null || s.thread != Thread.currentThread()); }

  • 如果head=tail, 则队列未被初始化, 返回false, 否则
  • 如果队列中没有线程正在等待, 返回true, 否则
  • 如果当前线程是队列中的第一个元素, 返回true,否则返回false
流程2、addWaiter(int) 再看acquire()的第二个流程,获取锁失败, 则将线程加入队列尾部, 返回新加入的节点
private Node addWaiter(Node mode) { //以给定的模式构建节点,节点有共享和独占两种模式 Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) {//CAS更新尾部节点 pred.next = node; //双向队列 return node; } } enq(node); //如果队列没有初始化,程序就会到这一步. return node; }

private Node enq(final Node node) { for (; ; ) {//经典的CAS配合使用方式, 一直循环知道CAS更新成功. Node t = tail; if (t == null) {//队列为空, 没有初始化,必须初始化 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { //设置尾节点,此时的head是头节点,不存放数据 t.next = node; return t; } } } }

流程3、acquireQueued() addWaiter()完成后返回新加入队列的节点, 紧接着进入下一个流程acquireQueued(), 在这个方法中, 会实现线程节点的阻塞和唤醒. 所有节点在这个方法的处理下,等待资源
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; //是否拿到资源 try { boolean interrupted = false; //等待过程中是否被中断过 for (; ; ) {//又是一个自旋配合CAS设置变量 final Node p = node.predecessor(); //当前节点的前驱节点 if (p == head && tryAcquire(arg)) { //如果前驱节点是头节点, 则当前节点已经具有资格尝试获取资源 //(可能是头节点任务完成之后唤醒的后继节点, 当然也可能是被interrup了) setHead(node); //获取资源后,设置当前节点为头节点 p.next = null; // help GC failed = false; return interrupted; } //如果不能获取资源, 就进入waiting状态 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //获取前一个节点的状态,还记得waitStatus是上面意思呒? if (ws == Node.SIGNAL) /* *如果前驱节点完成任务后能够唤醒自己,那么当前节点就可以放心的睡觉了. *记住,唤醒当前节点的任务是前驱节点完成 */ return true; if (ws > 0) { //ws大于0表示节点已经被取消,应该踢出队列. do { //节点的前驱引用指向更前面的没有被取消的节点. 所以被取消的节点没有引用之后会被GC node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //找到了合适的前驱节点后,将其状态设置为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

接下来是 parkAndCheckInterrupt() 方法, 真正让节点进入waiting状态的方法,实在这个方法中调用的..
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); //JNI调用, 使线程进入waiting状态 return Thread.interrupted(); //检查是否被中断 }

流程4、selfInterrupt() 上面也说了, acquire()方法不是立即响应中断的. 而是在获取资源后进行自我中断处理
小结 到此,acquire()的过程已经分析完毕, 我们就知道reentrantLock.lock()的全部过程.总的来说, 就是尝试获取资源, 如果获取不到就进入等待队列变成等待状态.具体细节前面已经详细叙述过.
release(int) 讲了如何获取到资源,接下来就应该如何释放资源.这个方法会在独占的模式下释放指定的资源(减小state).这个语义也是reentrantLock.unlock();
public final boolean release(int arg) { if (tryRelease(arg)) {//尝试释放资源 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); //唤醒队列的下一个节点 return true; } return false; }

1 tryRelease() 释放指定量的资源,这个方法是在子类中实现的.我们以reentrantLock.unlock()为例解读资源释放的过程
protected final boolean tryRelease(int releases) { int c = getState() - releases; //state减去指定的量, if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) {//独占锁模式下,state为0时表示没有线程获取锁,这时才算是当前线程完全释放锁 free = true; setExclusiveOwnerThread(null); } setState(c); return free; }

2 unparkSuccessor(Node node) 用这个方法唤醒后继节点
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) {//waitStatus表示节点已经被取消,应该踢出队列 s = null; //从后想前找到最靠前的合法节点 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }

这个方法也比较简单,就是用一个JNI方法unpark()唤醒队列中下一个需要处理的节点,.
非公平锁
上述介绍完ReentrantLock中的公平锁,首先回顾一下公平锁的整个流程
1、ReentrantLock.lock(),这个请求回交由内部类Sync处理。因为有公平和非公平的区分,所以Sync转而把任务交给子类NonfairSync或者FairSync处理。
2、在具体来看看FairSycn中的lock具体怎么实现
final void lock() { acquire(1); }

acquire()的具体实现上面已经介绍,接下来登场的就是非公平锁的实现。
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }

在非公平锁中,线程请求资源是会先查看锁是否被占用,如果锁空闲就直接占用锁,否则究进行acquire()。而这个函数是有AQS具体实现,所以,如果当前锁被占用,非公平锁就享受和公平锁一样的待遇,就是老老实实的进入等待队列,等前任节点唤醒自己。
总结 到这里我们已经讲述了独占锁的获取和释放. 当然没有涉及的还很多.如共享模式我们还没有涉及.以及响应中断的acquireInterruptibly()也没有涉及.
至于响应中断,实现起来与上面介绍的并无太大区别.共享式锁获取与释放待后续篇章再来研究.
水平有限,水平有限,有任何不当之处,还请指正.本文也还在打磨之中
参考 深入浅出Java Concurrency
JUC.Lock(锁机制)学习

    推荐阅读