一、AbstractQueuedSynchronizer简介
1、概述
1、AbstractQueuedSynchronizer(抽象队列同步器),来自于JDK1.5,位于JUC包下,简称AQS;AQS作为一个抽象类,是构建JUC包中的锁或者其它同步组件的底层基础框架及JUC体系的基石。 2、在每一个同步组件(2、AQS的核心思想如:ReentrantLock、CountDownLatch、Semaphore、CyclicBarrier、ReentrantReadWriteLock等
)的实现类中,都具有AQS的实现类作为内部类。 【JUC|十一、AbstractQueuedSynchronizer源码分析】
文章图片
3、锁和同步组件关系:
- 锁(如ReentrantLock)是面向锁的使用者:定义了程序员和锁交互的使用层API,隐藏了实现细节,调用即可。
- 同步组件是面向锁的实现者:如AQS简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、线程的等待与唤醒等更加底层的操作。我们如果想要自己写一个“锁”,那么就可以直接利用AQS框架实现,而不需要去关心那些更加底层的东西。
1、3、AQS类的设计AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中
。 2、CLH(Craig,Landin,and Hagersten)队列,是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(Node),通过CAS、自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的效果
。
文章图片
3、它的head引用指向的头结点作为哨兵结点,不存储任何与等待线程相关的信息,或者可以看成已经获得锁的结点。第二个结点开始才是真正的等待线程构建的结点,后续的结点会加入到链表尾部。 4、将新结点添加到链表尾部的方法是compareAndSetTail(Node expect,Node update)
方法,该方法是一个CAS方法,能够保证线程安全。 5、同步队列遵循先进先出(FIFO),头结点的next结点是将要获取到锁的结点,线程在释放锁的时候将会唤醒后继结点,然后后继结点会尝试获取锁。
1、AbstractQueuedSynchronizer被设计为一个抽象类,它使用了一个volatile int类型的成员变量state来表示同步状态(或者说资源),通过一个内置的FIFO双向同步队列来完成资源获取线程的排队等待工作,通过一个或者多个ConditionObject条件队列来实现线程之间的通信(等待和唤醒)
。 2、Lock中“锁”的状态使用state变量来表示,一般来说0表示锁没被占用,大于0表示所已经被占用了
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {/**
* 共享变量,使用volatile修饰保证线程可见性
*/
private volatile int state;
/**
* 返回同步状态的当前值
*/
protected final int getState() {
return state;
}/**
* 设置同步状态的值,此操作具有volatile写的内存语义,因此每次写数据都是写回主存并导致其它缓存实效
*/
protected final void setState(int newState) {
state = newState;
}/**
* 如果当前同步状态的值等于expect(期望值),则以原子方式将同步状态设置为给定的update(更新值)
* stateOffset为state内存偏移地址,底层通过
* unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"))方法
* 获取state内存偏移地址
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
}
4、AQS对资源的共享方式
1、独占(Exclusive):只有一个线程能执行,如ReentrantLock,又可分为公平锁和非公平锁5、AQS底层使用了模板方法模式2、共享(Share):多个线程可同时执行,如Semaphore/CountDownLatch。Semaphore、CountDownLatCh、 CyclicBarrier、ReadWriteLock等 3、注:ReentrantReadWriteLock可以看成是组合式,因为ReentrantReadWriteLock也就是读写锁允许多个线程同时对某一资源进行读。
- 公平锁:按照线程在队列中的排队顺序,先到者先拿到锁
- 非公平锁:当线程要获取锁时,无视队列顺序直接去抢锁,谁抢到就是谁的
1、同步器的设计是基于模板方法模式的,如果需要自定义同步器需要继承AbstractQueuedSynchronizer并重写指定的方法(这些重写方法很简单,无非是对于共享资源state的获取和释放),需要重写方法如下:6、父类AbstractOwnableSynchronizer2、以上的方法在AQS中并没有提供实现,如果子类不重写就直接调用还会抛出异常。
isHeldExclusively()
:该线程是否正在独占资源,只有用到condition才需要去实现它。tryAcquire(int arg)
:尝试以独占模式获取资源,成功则返回true,失败则返回false。tryRelease(int arg)
:尝试以独占模式释放资源,成功则返回true,失败则返回false。tryAcquireShared(int arg)
:尝试以共享模式获取资源,负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。tryReleaseShared(int arg)
:尝试以共享模式释放资源,成功则返回true,失败则返回false。
1、AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer抽象类,并且实现了Serializable接口,可以进行序列化。
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {/** Use serial ID even though all fields transient. */
private static final long serialVersionUID = 3737899427754241961L;
/**
* 空参构造
*/
protected AbstractOwnableSynchronizer() { }/**
* 独占模式下的线程
*/
private transient Thread exclusiveOwnerThread;
/**
* 设置独占线程
*/
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}/**
* 获取独占线程
*/
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
7、AQS的属性
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {private static final long serialVersionUID = 7373984972572414691L;
// 头结点
private transient volatile Node head;
// 尾结点
private transient volatile Node tail;
// 状态
private volatile int state;
// 自旋时间
static final long spinForTimeoutThreshold = 1000L;
// Unsafe类实例
private static final Unsafe unsafe = Unsafe.getUnsafe();
// state(状态)内存偏移地址
private static final long stateOffset;
// head(头结点)内存偏移地址
private static final long headOffset;
// tail(尾结点)内存偏移地址
private static final long tailOffset;
// waitStatus(等待状态在Node类中)内存偏移地址
private static final long waitStatusOffset;
// next(后继结点在Node类中)内存偏移地址
private static final long nextOffset;
// 静态初始化块
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex);
}
}
}
9、类的内部类 - Node类
1、每个被阻塞的线程都会被封装成一个Node结点,放入队列。每个结点包含了一个Thread类型的引用,并且每个结点都存在一个状态(waitStatus)
,具体状态如下:
CANCELLED
:值为1,指示当前结点(线程)需要取消等待,由于在同步队列中等待的线程发生等待超时、中断、异常,即放弃获取锁,需要从同步队列中取消等待,就会变成这个状态,如果结点进入该状态,那么不会再变成其他状态。SIGNAL
:值为-1,指示当前结点(线程)的后续结点(线程)需要取消等待(被唤醒);如果一个结点状态被设置为SIGNAL,那么后继结点的线程处于挂起或者即将挂起的状态,当前结点的线程如果释放了锁或者放弃获取锁并且结点状态为SIGNAL,那么将会尝试唤醒后继结点的线程以运行,这个状态通常是由后继结点给前驱结点设置的。一个结点的线程将被挂起时,会尝试设置前驱结点的状态为SIGNAL。CONDITION
:值为-2,线程在等待队列里面等待,waitStatus值表示线程正在等待条件,原本结点在等待队列中,结点线程等待在Condition上,当其他线程对Condition调用了signal()方法之后,该结点会从从等待队列中转移到同步队列中,进行同步状态的获取。PROPAGATE
:值为-3,释放共享资源时需要通知其他结点,waitStatus值表示下一个共享式同步状态的获取应该无条件传播下去。值为0
:表示当前结点在同步队列中,等待着获取资源
static final class Node {
/**
* AQS支持共享模式和独占模式两种类型
* 共享模式下构造的结点,用来标记该线程是获取共享资源时被阻塞挂起后放入AQS队列的
*/
static final Node SHARED = new Node();
/**
* 独占模式下构造的结点,用来标记该线程是获取独占资源时被阻塞挂起后放入AQS队列的
*/
static final Node EXCLUSIVE = null;
// 线程结点的等待状态,用来表示该线程所处的等待锁的状态/**
* 指示当前结点(线程)需要取消等待
*/
static final int CANCELLED = 1;
/**
* 指示当前结点(线程)的后续结点(线程)需要取消等待(被唤醒)
*/
static final int SIGNAL = -1;
/**
* 线程在等待队列里面等待Condition唤醒
*/
static final int CONDITION = -2;
/**
* 共享式同步状态的获取应该无条件传播下去
*/
static final int PROPAGATE = -3;
/**
* 记录当前线程等待状态值,包括上面4种的状态,还有0(表示初始化状态)
*/
volatile int waitStatus;
/**
* 前驱结点,当结点加入同步队列将会被设置前驱结点信息
*/
volatile Node prev;
/**
* 后继结点
*/
volatile Node next;
/**
* 当前结点所对应的线程
*/
volatile Thread thread;
/**
* 等待队列中的后继结点,如果当前结点是共享模式的,那么这个字段是一个SHARED常量
* 在独占锁模式下永远为null,仅仅起到一个标记作用
*/
Node nextWaiter;
/**
* 如果结点在共享模式下等待,则返回true,因为nextWaiter字段在共享模式下是一个SHARED常量
*/
final boolean isShared() {
return nextWaiter == SHARED;
}/**
* 获取前驱结点,若前驱结点为空,抛出异常
*/
final Node predecessor() throws NullPointerException {
// 保存前驱结点
Node p = prev;
// 前驱结点为空,抛出异常,否则直接返回
if (p == null)
throw new NullPointerException();
else
return p;
}/**
* 无参构造方法,用于建立初始头结点或SHARED标记
*/
Node() {
}Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
二、通过ReentrantLock分析AQS源码 1、ReentrantLock概述
1、ReentrantLock来自于JDK1.5,位于JUC包的Locks子包,独占(互斥)式可重入锁。Synchronized的功能他都有,并且具有更加强大的功能。 2、实现了Lock接口,具有通用的操作锁的方法。由于来自JUC包,内部是使用AQS队列同步器来辅助实现的,重写了AQS的独占式获取锁的方法,并实现了可重入性。 3、ReentrantLock还具有公平与非公平两个获取锁模式,这个并非AQS提供的,而是ReentrantLock基于AQS自己实现的。 4、ReentrantLock有三个内部类,2、公平与非公平Sync、NonfairSync、FairSync
。NonfairSync(非公平)与FairSync(公平)类继承自Sync类,Sync类继承自AbstractQueuedSynchronizer抽象类,类图如下:
文章图片
1、公平锁:是指多个线程按照申请锁的顺序来获取锁,先来后到,先来先服务就是公平的,也就是队列。 2、非公平锁:是指在多个线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取到锁,在高并发的情况下,有可能造成优先级反转或者饥饿现象(也就是某个线程一直得不到锁即为饥饿)。3、构造方法对于synchronized而言,也是一种非公平锁
。 3、ReentrantLock既可以是公平锁又可以是非公平锁
。当此类的构造方法ReentrantLock(boolean fair) 接收true作为参数时,ReentrantLock就是公平锁,线程依次排队获取公平锁,即锁将被等待最长时间的线程占有。默认是非公平锁。 4、与使用非公平锁相比,使用公平锁的程序在多线程环境下效率比较低。而且即使是公平锁也不一定能保证线程调度的公平性,后来的线程调用tryLock方法同样可以不经过排队而获得该锁。
1、public ReentrantLock()
:创建一个ReentrantLock实例。等同于使用ReentrantLock(false)。即默认无参构造器,是非公平模式。 2、public ReentrantLock(boolean fair)
:创建一个具有给定公平策略的ReentrantLock。false表示不公平,true表示公平。
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
// 同步队列
private final Sync sync;
/**
* 创建一个非公平的ReentrantLock实例,默认非公平模式
*/
public ReentrantLock() {
//内部初始化了一个NonfairSync对象
sync = new NonfairSync();
}/**
* 创建一个具有给定公平策略的 ReentrantLock
* @param fair false表示不公平,true表示公平。
*/
public ReentrantLock(boolean fair) {
// 根据参数选择初始化不同的AQS对象
sync = fair ? new FairSync() : new NonfairSync();
}
}
4、内部类 - Sync类
1、ReentrantLock类的sync非常重要,对ReentrantLock类的操作大部分都直接转化为对Sync和AbstractQueuedSynchronizer类的操作。 2、类中方法及作用:
方法名 作用 lock
锁定,并没有具体的实现,留给子类实现 nonfairTryAcquire(int acquires)
非公平方式获取锁,acquires获取锁参数,锁状态每次加1 tryRelease(int releases)
尝试释放锁,releases释放参数,锁状态每次减1 isHeldExclusively()
判断资源是否被当前线程占有 newCondition()
新生一个条件 getOwner()
返回资源的占用线程 getHoldCount()
如果当前线程占有资源,就返回当前状态,否则返回0 isLocked()
资源是否被占用(是否已经获取锁,true表示被占用) readObject
自定义序列化逻辑
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* 获取锁
*/
abstract void lock();
/**
* 非公平方式获取锁
*/
final boolean nonfairTryAcquire(int acquires) {
// 当前线程
final Thread current = Thread.currentThread();
// 当前线程的同步(锁)状态
int c = getState();
// 表示没有线程正在竞争资源或锁,0表示锁没被占用,大于0表示所已经被占用了
if (c == 0) {
// 如果当前同步状态的值等于0(期望值),则以原子方式将同步状态设置为给定的acquires(更新值)
if (compareAndSetState(0, acquires)) {
// 设置当前线程独占
setExclusiveOwnerThread(current);
// 成功
return true;
}
}
else if (current == getExclusiveOwnerThread()) {// 当前线程拥有该锁
// 增加重入次数(可重入锁)
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 设置同步状态
setState(nextc);
// 成功
return true;
}
// 失败
return false;
}/**
* 尝试释放锁
* 如果当前线程是这个锁的持有者,那么持有计数就会递减。如果保持计数现在为零,则释放锁。
* 如果当前线程不是该锁的持有者,则抛出IllegalMonitorStateException
* @param releases:释放参数,每次减1
*/
protected final boolean tryRelease(int releases) {
// 获取当前线程的同步状态
int c = getState() - releases;
// 如果当前线程不为独占线程,则抛出错误
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 释放标识false
boolean free = false;
// 如果同步状态为0(即锁没有被占用,也就是释放)
if (c == 0) {
// 释放标识为true
free = true;
// 已经释放,清空独占
setExclusiveOwnerThread(null);
}
// 设置同步状态
setState(c);
// 返回释放标识
return free;
}/**
* 判断资源是否被当前线程占有
*/
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}// 新生一个条件
final ConditionObject newCondition() {
return new ConditionObject();
}// Methods relayed from outer class
// 返回资源的占用线程
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}// 如果当前线程占有资源,就返回当前状态,否则返回0
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}// 资源是否被占用(是否已经获取锁,true表示被占用)
final boolean isLocked() {
return getState() != 0;
}/**
* 自定义序列化逻辑
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0);
// reset to unlocked state
}
}
5、内部类 - NonfairSync类
1、NonfairSync类继承了Sync类,表示采用非公平策略获取锁,其实现了Sync类中抽象的lock方法
2、从lock方法的源码可知,每一次都尝试获取锁,而并不会按照公平等待的原则进行等待,让等待时间最久的线程获得锁。
static final class NonfairSync extends Sync {
// 版本号
private static final long serialVersionUID = 7316153563782823691L;
/**
* 获得锁
*/
final void lock() {
// 比较并设置状态成功,状态0表示锁没有被占用
if (compareAndSetState(0, 1))
// 将当前线程设置独占了锁
setExclusiveOwnerThread(Thread.currentThread());
else // 抢占锁失败,前面线程还没有释放锁,后面线程进入等待队列
// 以独占模式获取对象,忽略中断
acquire(1);
}/**
* 尝试以独占模式非公平获取锁
*/
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
5、内部类 - FairSync类
1、FairSync类继承了Sync类,表示采用公平策略获取锁,其实现了Sync类中的抽象lock方法
2、从lock方法的源码可知,不会先尝试获取资源。这也是和NonfairSync最大的区别,NonfairSync每一次都会尝试去获取资源,如果此时该资源恰好被释放,则会被当前线程获取,这就造成了不公平的现象,当获取不成功,再加入队列尾部。
static final class FairSync extends Sync {
// 版本号
private static final long serialVersionUID = -3000897897090466540L;
// 获得锁
final void lock() {
acquire(1);
}/**
* 尝试公平获取锁
*/
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 当前线程的同步(锁)状态
int c = getState();
// 表示没有线程正在竞争资源或锁,0表示锁没被占用,大于0表示所已经被占用了
if (c == 0) {
// 查询是否有任何线程等待获取的时间超过当前线程
if (!hasQueuedPredecessors() &&
// 如果当前同步状态的值等于0(期望值),则以原子方式将同步状态设置为给定的acquires(更新值)
compareAndSetState(0, acquires)) {
// 设置当前线程独占
setExclusiveOwnerThread(current);
// 成功
return true;
}
}
// 状态不为0,即资源已经被线程占据,并判断当前线程是否独占锁
else if (current == getExclusiveOwnerThread()) {
// (可重入锁)
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 设置同步状态
setState(nextc);
// 成功
return true;
}
// 失败
return false;
}
}
三、ReentrantLock非公平模式加锁原理及源码分析 1、lock方法加锁
JUC中只要是实现Lock的锁,那么加锁的方法,一般都统一调用开放给外部的lock()方法。
/**
* @Date: 2022/9/4
* ReentrantLock加锁流程
*/
public class ReentrantLockTest {
public static void main(String[] args) {
// 默认非公平锁
ReentrantLock lock = new ReentrantLock();
new Thread(() -> {
lock.lock();
try {
Thread.sleep( 20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("A获取到锁");
lock.unlock();
}, "A").start();
new Thread(() -> {
lock.lock();
System.out.println("B获取到锁");
lock.unlock();
}, "B").start();
new Thread(() -> {
lock.lock();
System.out.println("C获取到锁");
lock.unlock();
}, "C").start();
}
}
当有多个线程调用lock时候,跟踪lock()方法的源码,NonfairSync类的lock的方法调用主流程如下:2、NonfairSync类的lock方法
lock.lock()
→ReentrantLock类的lock()
→Sync类的lock()
→NonfairSync类的lock()
→AQS类的acquire()
→AQS类的tryAcquire()
→NonfairSync类的tryAcquire()
→Sync类的nonfairTryAcquire
- 其中多个线程的时候,如果某个线程占锁时间较长,那么
Sync类的nonfairTryAcquire
返回false。- 此时
AQS类的tryAcquire()
返回值也是false,AQS类的acquire()
方法中!tryAcquire(arg)
就为true,也就执行后面一段逻辑acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
。- 调用
addWaiter(Node.EXCLUSIVE), arg)
为当前线程创建排队结点,进入队列中等待唤醒。
1、线程A调用3、AQS类的acquire方法lock.lock()
方法,由于默认是非公平模式,所以进入到NonfairSync类的lock()
方法。 2、lock方法流程:
- 首先尝试获取锁,即CAS将state的值从0更新为1,表示此时没有线程获取到锁,然后线程A尝试获取锁,如果CAS成功,那么说明该锁没有被任何线程获取,此时获取锁成功,将当前线程标记为持有锁的线程(即线程A),加锁结束。
- 若获取锁失败,表示该锁此前已经被某条线程获取到了,或者被别的线程抢先CAS成功,那么执行acquire方法继续尝试获取锁。B、C线程再次进入的时候,获取尝试获取锁失败,那么就执行acquire方法。
文章图片
1、B、C线程尝试获取锁失败,进入到acquire方法。 2、acquire方法就是AQS提供的模版方法,用于独占式获取锁,该方法不会响应中断,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出。 3、在acquire方法中会继续尝试获取锁,直到成功才能返回。最终调用的是NonfairSync类的tryAcquire方法,而它又是调用Sync类的nonfairTryAcquire方法。 4、nonfairTryAcquire方法流程:5、B、C线程进入此方法后,发现锁已经被其他线程获取,tryAcquire方法最终执行结果为false,
- 再次判断state,如果state=0,表示当前锁未被任何线程持有,那么尝试CAS获取锁,若CAS获取成功则设置当前线程,返回true,方法结束。
- 否则,表示此前已经有线程获取锁了,那么判断是否就是当前线程获取到了锁,如果是,执行重入逻辑,将state加上1再写回,表示重入次数+1,返回true,方法结束。
- 若CAS失败(有其他线程已经获取了锁),或者若当前锁已经被其他线程持有,则直接返回false,方法结束。
!tryAcquire(arg)
的值为true,那么执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
方法
/**
* AQS类中的acquire
* 独占式的尝试获取锁,获取不成功就进入同步队列等待
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}/**
* AQS类中的模板方法:tryAcquire,在AQS中的实现是抛出异常,需要子类自己重写
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}/**
* NonfairSync类重写tryAcquire方法,底层调用Sync类的nonfairTryAcquire方法实现
*/
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}/**
* Sync类的nonfairTryAcquire方法实现,非公平方式获取锁
*/
final boolean nonfairTryAcquire(int acquires) {
// 当前线程
final Thread current = Thread.currentThread();
// 当前线程的同步(锁)状态
int c = getState();
// 表示没有线程正在竞争资源或锁,0表示锁没被占用,大于0表示所已经被占用了
if (c == 0) {
// 如果当前同步状态的值等于0(期望值),则以原子方式将同步状态设置为给定的acquires(更新值)
if (compareAndSetState(0, acquires)) {
// 设置当前线程独占
setExclusiveOwnerThread(current);
// 成功
return true;
}
}
else if (current == getExclusiveOwnerThread()) {// 当前线程拥有该锁
// 增加重入次数(可重入锁)
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 设置同步状态
setState(nextc);
// 成功
return true;
}
// 失败
return false;
}
4、AQS类的addWaiter方法
1、addWaiter方法是AQS提供的,不需要我们重写,可以说是锁的通用方法。通过addWaiter方法将线程按照独占模式Node.EXCLUSIVE
构造同步结点,并添加到同步队列的尾部。 2、addWaiter方法流程:
- 按照给定模式,构建新结点。
- 如果同步队列不为null,则尝试将新结点添加到队列尾部(只尝试一次),如果添加成功则返回新结点,方法返回。
- 如果队列为null或者添加失败,则调用enq方法循环尝试添加,直到成功,返回新结点,方法结束。
/**
* 将获取锁失败的线程按给定模式构建成结点加入到同步队列的尾部
* @param mode 模式,独占模式传入的是一个Node.EXCLUSIVE,即null;
*共享模式传入的是一个Node.SHARED,即一个静态结点对象(共享的、同一个)
*/
private Node addWaiter(Node mode) {
// 按给定模型创建结点
Node node = new Node(Thread.currentThread(), mode);
// 获取同步器的尾结点,使用pred来保存
Node pred = tail;
// 如果pred不为null,也就是队列不为null,那么使用CAS方式将当前结点设为尾结点
if (pred != null) {
// 将当前结点的前驱结点指向结点pred
node.prev = pred;
// 通过CAS方式将当前结点设为尾结点
if (compareAndSetTail(pred, node)) {
// 将结点pred的后继结点指向当前结点
pred.next = node;
// 返回结点
return node;
}
}
/**
* 如果pred为null,也就是队列为null,也可能是compareAndSetTail(pred, node)失败
* 调用enq方法,采用自旋方式保证构造的新结点成功添加到同步队列中
*/
enq(node);
return node;
}/**
* addWaiter方法中使用到的Node构造器
* @param thread 当前线程
* @param mode模式
*/
Node(Thread thread, AbstractQueuedSynchronizer.Node mode) {
//等待队列中的后继结点 就等于该结点的模式
//共享模式该值为Node.SHARED结点常量,独占模式该值为null
this.nextWaiter = mode;
//当前线程
this.thread = thread;
}
5、AQS类的enq方法
1、enq方法用在同步队列为null或者一次CAS操作添加失败的时候,enq方法要保证结点最终必定添加成功。 2、enq方法流程:3、此时,B、C线程的结点就保存到队列中,等待唤醒,队列结构如下图:
- 开启一个死循环,保证结点一定添加进去。
- 如果队列为空,那么初始化队列,添加一个哨兵结点,结束本次循环,继续下一次循环。
- 如果队列不为空,则尝试将新结点添加到队列尾部,如果添加成功则返回新结点的前驱,循环结束;如果不成功,结束本次循环,继续下一次循环,直到添加成功。
文章图片
/**
* 将节点插入队列,必要时进行初始化
* @param node 要插入的结点
* @return 节点的前驱结点
*/
private Node enq(final Node node) {
// 死循环直到添加成功
for (;
;
) {
// 直接获取尾结点
Node t = tail;
// 如果结点为null,则初始化同步队列
if (t == null) { // Must initialize
/*
* 调用compareAndSetHead方法,初始化同步队列
* 注意:这里是新建了一个空白结点,也就是所谓的哨兵结点
* CAS成功之后,head将指向该哨兵结点,返回true
*/
if (compareAndSetHead(new Node()))
// 尾结点指向头结点(哨兵结点),方法并没有结束,继续循环,再次执行时t不为null
tail = head;
} else {// 队列不为null,调用compareAndSetTail方法,新建的新结点插入到同步队列尾部
// 将当前结点的前驱结点指向t
node.prev = t;
// 通过CAS方式将当前结点设为尾结点
if (compareAndSetTail(t, node)) {
// 将结点t的后继结点指向当前结点
t.next = node;
// 返回前驱结点
return t;
}
}
}
}/**
* CAS添加头结点. 仅仅在enq方法中用到
* @param update 头结点
* @return true 成功;false 失败
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}/**
* CAS添加尾结点. 仅仅在enq方法中用到
* @param expect 预期原尾结点
* @param update 新尾结点
* @return true 成功;false 失败
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
6、AQS类的acquireQueued方法
1、执行到acquireQueued方法,说明tryAcquire和addWaiter
两个方法都执行成功,表示该线程结点获取锁失败,并且被放入同步队列尾部了。 2、acquireQueued方法表示结点进入同步队列之后的动作,实际上就进入了一个自旋的过程,自旋过程中,当条件满足,获取到了锁,就可以从这个自旋中退出并返回,否则可能会阻塞该结点的线程,后续即使阻塞被唤醒,还是会自旋尝试获取锁,直到成功或者而抛出异常。 3、acquireQueued方法流程:
- 开启一个死循环,如果当前结点的前驱是头结点,尝试获取锁,如果获取锁成功,那么当前结点设置为头结点head,将当前结点线程引用与前驱结点设置为空,表示当前线程已经获取到了锁,然后返回是否被中断标志,结束循环,进入finally。
- 如果当前结点的前驱不是头结点或者尝试获取锁失败,那么判断当前线程是否应该被挂起,如果返回true,那么调用parkAndCheckInterrupt挂起当前结点的线程(LockSupport.park方法挂起线程,线程处于WAITING状态),此时不再执行后续的步骤、代码。
- 如果当前线程不应该被挂起,即返回false,那本次循环结束,继续下一次循环。
- 如果线程被其他线程唤醒,那么判断是否是因为中断而被唤醒并修改标志位,同时继续循环,直到在步骤1获得锁,才能跳出循环
- 最终,线程获得了锁跳出循环,或者发生异常跳出循环,那么会执行finally语句块,finally中判断线程是否是因为发生异常而跳出循环,如果是,那么执行cancelAcquire方法取消该结点获取锁的请求;如果不是,即因为获得锁跳出循环,则finally中什么也不做。
/**
* 以独占不间断模式获取已在队列中的线程
* @param node 新创建的结点
* @param arg 获取参数
* @return 如果在等待时中断,则返回true
*/
final boolean acquireQueued(final Node node, int arg) {
// 获取锁是否失败的标记
boolean failed = true;
try {
// 是否被中断的标记
boolean interrupted = false;
// 死循环
for (;
;
) {
// 获取当前结点的前驱结点,位于Node类中的方法
final Node p = node.predecessor();
// 判断前驱结点是否为头结点,只有前驱结点是头结点的时候才能尝试获取锁,调用tryAcquire方法获取锁
if (p == head && tryAcquire(arg)) {
// 获取到锁之后,将当前接结点设置为头结点(哨兵结点),线程出队列
setHead(node);
// 断开p结点(原哨兵结点)的后继结点指向,由JVM回收
p.next = null;
// help GC
// 获取锁是否失败改成false,表示成功获取到了锁
failed = false;
// 返回线程是否被中断
return interrupted;
}
/**
* 前驱结点不是头结点,或者是tryAcquire尝试获取锁失败的情况下,执行此逻辑
* shouldParkAfterFailedAcquire检测线程是否应该被挂起
* 如果返回true,则调用parkAndCheckInterrupt用于将线程挂起,否则重新开始循环
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
/*
* 到这里,说明是当前结点(线程)因为被中断而唤醒,那就改变自己的中断标志位状态信息为true
* 又从新开始循环,直到获取到锁,才能返回
* */
interrupted = true;
}
} finally { // 线程获取到锁或者发生异常之后都会执行的finally语句块
/**
* 如果failed为true,表示获取锁失败,即对应发生异常的情况
* 这里发生异常的情况只有在tryAcquire方法和predecessor方法中可能会抛出异常,此时还没有获得锁,failed=true
* 那么执行cancelAcquire方法,该方法用于取消该线程获取锁的请求,将该结点的线程状态改为CANCELLED,并尝试移除
* 结点(如果是尾结点);
* 如果failed为false,表示获取到了锁,那么该方法直接结束,继续往下执行
*/
if (failed)
// 取消获取锁请求,将当前结点从队列中移除
cancelAcquire(node);
}
}/**
* head指向node新结点,该方法是在tryAcquire获取锁之后调用,不会产生线程安全问题
*/
private void setHead(Node node) {
// 头结点指向新结点
head = node;
/**
* 将新结点的thread和prev属性置空,成为新的哨兵结点
* 虽然线程引用置空了,但是一般在tryAcquire方法中记录了获取到锁的线程,因此不担心找不到是哪个线程获取到了锁,
* 因此哨兵结点也可以叫做"获取到锁的结点"
*/
node.thread = null;
node.prev = null;
}
7、AQS的shouldParkAfterFailedAcquire方法
1、shouldParkAfterFailedAcquire方法在没有获取到锁之后调用,用于判断当前结点是否需要被挂起。 2、shouldParkAfterFailedAcquire方法流程:3、
- 获取前驱结点的等待状态,如果前驱结点已经是
SIGNAL(-1)状态
,即表示当前结点可以挂起,返回true,方法结束。- 如果前驱结点状态大于0(
Node.CANCELLED(1)状态
),表示前驱结点放弃了锁的等待,那么由该前驱向前查找,直到找到一个状态小于等于0的结点,再将当前结点排在该结点(状态小于0的)后面,返回false,方法结束。- 如果前驱结点的状态既不是SIGNAL(-1),也不是CANCELLED(1),尝试CAS设置前驱结点的状态为SIGNAL(-1),返回false,方法结束。
注:只有前驱结点状态为SIGNAL时,当前结点才能安心挂起,否则一直自旋
。 4、结论:一个结点SIGNAL状态的设置,一般都是由它的后继结点设置的,但是这个状态却是表示后继结点的状态,表示的意思就是前驱结点如果释放了锁,那么就有义务唤醒后继结点
。
/**
* 检测当前结点(线程)是否应该被挂起
* @param pred 当前结点的前驱
* @param node 当前结点
* @return 如果前驱结点已经是SIGNAL状态,当前结点才能挂起,返回true;否则,
*可能会查找新的前驱结点或者尝试将前驱结点设置为SIGNAL状态,返回false
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱结点的waitStatus(等待状态)
// 最初创建结点时候,并没有给waitStatus赋值,因此每一个结点最开始的时候waitStatus的值都为0
int ws = pred.waitStatus;
// 判断前驱结点的等待状态是否为-1,如果前驱结点已经是SIGNAL状态,即表示当前结点可以挂起
if (ws == Node.SIGNAL)
// 返回true
return true;
// 如果前驱结点状态大于0,即Node.CANCELLED表示前驱结点放弃了锁的等待
if (ws > 0) {
/**
* 由该前驱向前查找,直到找到一个状态小于等于0的结点(即没有被取消的结点),当前结点成为该结点的
* 后继,这一步很重要,可能会清理一段被取消了的结点,并且如果该前驱释放了锁,还会唤醒它的后继
*/
do {
// 当前结点的前驱结点 = 前驱结点的前驱结点;前驱结点 = 前驱结点的前驱结点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 直到前驱结点的等待状态小于0,结束循环
// 找到一个状态小于0的结点,并将它的后继结点指向当前结点,重新连接队列
pred.next = node;
// 否则,前驱结点的状态既不是SIGNAL(-1),也不是CANCELLED(1)
} else {
// 通过CAS操作将当前结点的前驱结点的状态设置为SIGNAL(-1),用于后续唤醒操作;
// 可能会失败,但没关系,因为失败之后acquireQueued方法会一直循环
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 返回false
return false;
}
8、AQS的parkAndCheckInterrupt方法
1、shouldParkAfterFailedAcquire方法返回true之后,将会调用parkAndCheckInterrupt方法挂起线程并且后续判断中断状态。线程中断方法参考《LockSupport与线程中断》篇 2、parkAndCheckInterrupt方法流程:3、此时,由于A线程获取锁到释放锁的间隔时间较长,B、C线程在队列中等待,对应的哨兵结点和线程B结点的状态都为-1,并且进入阻塞状态,等待A释放锁,B再执行,队列结构如下图:
- 使用LockSupport.park(this)挂起该线程,不再执行后续的步骤、代码。直到该线程被中断或者被唤醒(unpark)。
- 如果该线程被中断或者唤醒,那么返回Thread.interrupted()方法的返回值,该方法用于判断前线程的中断状态,并且清除该中断状态。如果该线程因为被中断而唤醒,则中断状态为true,将中断状态重置为false,并返回true。如果该线程不是因为中断被唤醒,则中断状态为false,并返回false。
文章图片
/**
* 挂起线程
*/
private final boolean parkAndCheckInterrupt() {
// 使用LockSupport.park(this)挂起该线程
LockSupport.park(this);
// 判断线程是否被中断,并清除当前中断标识位
return Thread.interrupted();
}
四、ReentrantLock释放锁原理及源码分析 1、unlock方法释放锁
1、JUC中只要是实现Lock的锁,那么解锁的方法,一般都统一调用开放给外部的unlock()方法。 2、2、AQS类的release方法ReentrantLock在公平和非公平模式下释放锁的逻辑都是一样的,该实现在Sync类中
。 3、每个lock.lock()操作必然对应一个lock.unlock()操作
。方法调用流程如下:
lock.unlock()
→ReentrantLock类的unlock()
→AQS类的release()
→AQS类的tryRelease()
→Sync类的tryRelease()
1、当前线程获取到锁并执行了相应逻辑之后,就需要释放锁,使得后续结点能够继续获取锁。通过调用AQS的release(int arg)模版方法可以独占式的释放锁。 2、release方法流程:3、此时A线程逻辑执行完毕,调用释放锁逻辑。
- 尝试使用tryRelease(arg)释放锁,该方法在
Sync类中
,是自己实现的方法,将state值为0或者减少、清除当前获得锁的线程,如果符合自己的逻辑,锁释放成功则返回true,否则返回false。- 如果tryRelease释放成功返回true,如果头结点head不为null并且head的等待状态不为0,那么尝试调用unparkSuccessor方法唤醒头结点之后的一个非取消状态(非CANCELLED状态)的后继结点,让其可以进行锁获取。返回true,方法结束。
- 如果tryRelease释放失败,那么返回false,方法结束。
/**
* 独占式的释放同步状态
* @param arg 释放参数
* @return 释放成功返回true, 否则返回false
*/
public final boolean release(int arg) {
// 调用ReentrantLock重写的tryRelease方法(位于内部类Sync中),释放同步状态,释放成功将返回true,否则返回false
if (tryRelease(arg)) {
// 获取头结点
Node h = head;
// 如果头结点不为null,并且状态不等于0
if (h != null && h.waitStatus != 0)
// 唤醒头结点的一个处于等待锁状态的后继结点
unparkSuccessor(h);
return true;
}
return false;
}
3、AQS的unparkSuccessor方法
1、unparkSuccessor方法用于唤醒参数结点的某个非取消的后继结点,该方法在很多地方法都被调用。 2、unparkSuccessor方法流程如下:3、此时线程B被唤醒了,重新执行
- 如果当前结点的状态小于0,那么CAS设置为0,表示后继结点可以继续尝试获取锁。
- 如果当前结点的后继s为null或者状态为取消CANCELLED(1),则将s先指向null;然后从尾结点开始到node之间倒序向前查找,找到离尾结点最远的非取消结点赋给s。需要从后向前遍历,因为同步队列只保证结点前驱关系的正确性。
- 如果s不为null,那么状态肯定不是取消CANCELLED(1),则直接唤醒s的线程,调用LockSupport.unpark方法唤醒,
被唤醒的结点将从被park的位置继续执行
。acquireQueued
方法中的逻辑,线程B仍然需要尝试获取锁,此时有两种情况:
线程B尝试获取锁失败
:因为是非公平的实现方式,有可能线程A刚执行完,线程D刚进入且顺利拿到锁,那么线程B仍然需要继续等待。线程B尝试获取锁成功
:执行acquireQueued
方法中的如下逻辑
文章图片
4、此时队列结构如下:
文章图片
/**
* 唤醒指定结点的后继结点
* @param node 指定结点
*/
private void unparkSuccessor(Node node) {
// 获取指定结点的等待状态
int ws = node.waitStatus;
// 判断等待状态是否小于0,如果小于0,那么CAS设置为0,表示后继结点线程可以先尝试获锁,而不是直接挂起
if (ws < 0)
// 通过CAS操作将指定结点的等待状态更新为0
compareAndSetWaitStatus(node, ws, 0);
// 获取指定结点的后继结点
Node s = node.next;
/**
* 如果后继结点为null,或者状态为取消CANCELLED(1),则从尾结点开始到指定结点之间倒序向前查找,
* 找到离尾结点最远的非取消结点赋给s
*/
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail;
t != null && t != node;
t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果后继结点不为null,则唤醒s结点的线程,调用LockSupport.unpark方法唤醒,被唤醒的结点将从被park的位置向后执行
if (s != null)
LockSupport.unpark(s.thread);
}
4、AQS的finally代码块中的cancelAcquire方法
1、在acquireQueued方法中有一段finally代码块的逻辑为:2、
- 如果failed为true,表示获取锁失败,即对应发生异常的情况,这里发生异常的情况只有在tryAcquire方法和predecessor方法中可能会抛出异常,此时还没有获得锁,failed=true
- 如果failed为false,表示已经获取到了锁,那么finally中代码都不会执行,acquireQueued方法结束。
cancelAcquire方法用于取消结点获取锁的请求,参数为需要取消的结点node
3、cancelAcquire方法流程:
- 将node结点记录的线程thread设置为null。
- 由node结点向前查找,直到找到一个状态小于等于0的结点pred (即找一个没有取消的结点),更新node.prev为找到的pred。
- 将node结点的等待状态waitStatus置为CANCELLED,即取消请求锁。
- 如果node结点是尾结点,那么尝试CAS更新头结点指向pred,成功之后继续CAS设置pred.next为null。
- 否则,说明node结点不是尾结点或者CAS失败可能存在对尾结点的并发操作
- 如果node结点不是头结点,并且 (pred的状态为SIGNAL或者将pred的waitStatus置为SIGNAL成功),并且pred记录的线程不为null。那么设置pred.next指向node.next,最后node.next指向node自己。
- 否则,说明node结点是头结点或者pred状态设置失败或者pred记录的线程为null。那么调用unparkSuccessor唤醒node的一个没取消的后继结点。最后node.next指向node自己。
/**
* 取消指定结点获取锁的请求
* @param node 指定结点
*/
private void cancelAcquire(Node node) {
// 如果指定结点为null,则直接返回
if (node == null)
return;
// 将指定结点node记录的线程thread设置为null
node.thread = null;
// 获取指定结点的前驱结点
Node pred = node.prev;
// 循环判断前驱结点的等待状态是否大于0(即取消:CANCELLED(1))
// 由指定结点node向前查找,直到找到一个状态小于等于0的结点(即没有被取消的结点),作为前驱
// 类似于shouldParkAfterFailedAcquire方法中查找有效前驱的代码
while (pred.waitStatus > 0)
// 指定结点的前驱结点 = 前驱结点的前驱结点;前驱结点 = 前驱结点的前驱结点,向前排查
node.prev = pred = pred.prev;
// 获取到一个非取消状态的前驱结点pred,并获取它的后继结点
Node predNext = pred.next;
// 指定结点node的等待状态设置为CANCELLED,即取消请求锁
node.waitStatus = Node.CANCELLED;
// 如果指定结点是尾结点,那么尝试CAS更新尾结点指向pred,成功之后继续CAS设置后继结点pred.next为null
if (node == tail && compareAndSetTail(node, pred)) {
// 新尾结点pred的next结点设置为null,即使失败了也没关系,说明有其它新入队线程或者其它取消线程更新了
compareAndSetNext(pred, predNext, null);
// else说明node不是尾结点或者CAS失败(可能存在对尾结点的并发操作),这种情况就要把pred和node的后继非取消结点拼起来
} else {
int ws;
// 判断是否为头结点
if (pred != head &&
// 并且pred的状态为SIGNAL或者将pred的waitStatus置为SIGNAL成功
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
// 并且pred记录的线程不为null
pred.thread != null) {
// 指定结点的后继结点
Node next = node.next;
// 指定结点的后继结点不为null,并且node的后继结点的等待状态不为取消状态(CANCELLED)
if (next != null && next.waitStatus <= 0)
// 通过CAS操作将指定结点前的一个非取消状态的前驱结点pred的后继结点指向next
compareAndSetNext(pred, predNext, next);
// 上面判断存在失败的情况,此时需要调用unparkSuccessor方法尝试唤醒node结点的后继结点
} else {
unparkSuccessor(node);
}
// 指定结点的后继结点设置为自己,方便后续GC时直接销毁无效结点
node.next = node;
// help GC
}
}
5、总结
1、acquire流程图
文章图片
2、release流程图
文章图片
3、cancelAcquire流程图
推荐阅读
- Java基础|高并发线程/锁/内存处理模型
- JUC并发编程|JUC并发编程——线程池介绍和底层实现
- #yyds干货盘点# JUC锁: ReentrantLock详解
- java|Java中volatile关键字详解
- java学习|JUC并发编程汇总彻底搞懂JUC
- Java|(JUC 下典型的类)Java 并发包中线程同步器