1.AQS是什么
2.能干什么
3.AQS为什么是JUC内容中最重要的基石
4.AQS内部体系架构
5.从我们的ReentrantLock开始解读AQS
6.总结
1.AQS是什么
AQS————全称AbstractQueuedSynchronizer,抽象队列同步器。
我们可以看一下源码中的解释:
文章图片
意思就是说,它是用来构建锁或者其它同步器组件的重量级基础框架及整个JUC的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类变量表示持有锁的状态。
简而言之
1)这是一个基石框架(模板方法设计模式)
2)就是将获取资源的线程排了个队,让它有序获取资源。
2.能干什么
假设我们在使用ReentrantLock的时候,必然会有线程阻塞,有线程阻塞那就必须排队,就得需要队列。
抢占资源失败的线程继续去等待,但是等候线程任然保留获取锁的可能且获取锁的流程还在继续。
这就是AQS,题我们实现了构建锁和队列,以及让线程有序获取资源。
3.AQS为什么是JUC内容中最重要的基石
我们先来看看实现AQS的类具体有哪些:
文章图片
有ReentrantLock,CountDownLatch,ReentrantReadWriteLock,Semaphore等等。
此时我们要理清一个关系:
锁:面向的都是锁的使用者,定义了程序员和锁交互的应用层api,隐藏了实现细节,开发者调用即可。
同步器:面向锁的实现者,比如并发大神DougLee,提出一套规范,并简化了锁的实现,屏蔽了同步状态的管理,阻塞线程排队和通知,唤醒机制等等。
4.AQS内部体系架构
让我们点到源码里面看一下:
【JAVA并发编程——AbstractQueuedSynchronizer之AQS概念和源码阅读】我们搜一下下面这个变量
/**
* The synchronization state.
*/
private volatile int state;
AQS就是使用上面这一个volatile的int类型的变量来表示被占用的锁的同步状态
再搜一下下面这个变量:
static final class Node {//还有一些变量没有放出来,重点解释这几个volatile Node prev;
volatile Node next;
volatile Thread thread;
//....}
AQS使用这个链表的FIFO对垒来完成资源获取的排队工作,将每条要去抢占的线程封装成一个Node节点来实现锁的分配,通过CAS完成对state值的修改。
也就是说,我们通过队列(管理需要排队的线程)+state变量(管理公共资源类)实现了AQS的基本结构。
我们再来看一下Node内部:
volatile int waitStatus;
Node通过waitState的成员变量来阻塞或者唤醒队列中的线程,waitStatus的状态如图所示,不过也可以看源码注释
文章图片
最后吗,我们用一张图来展示,AQS的基本结构
文章图片
5.从我们的ReentrantLock开始解读AQS
我们从ReentrantLock作为一个突破点,来阅读一下AQS的源码。
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
TimeUnit.MINUTES.sleep(60);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
我们点击lock,可以看出,lock底层是调用了一个sync变量的一个lock方法
public void lock() {
sync.lock();
}
而sync:
abstract static class Sync extends AbstractQueuedSynchronizer {
//......
}
可以看出:
Lock接口的实现类,基本就是通过聚合了一个队列同步器的子类完成线程的访问控制的。
我们继续往lock里面点,会发现有一个公平锁和一个非公平锁:
文章图片
然后我们继续往里点击,查看抢占锁的acquire方法,会发现会有图上的区别
文章图片
非公平锁抢占资源的时候,不需要判定队列前面是否还有排队线程。
公平锁抢占资源的时候,需要判定度列前是否有排队线程。
那么其实,我们只需要看懂非公平锁的源码,其实也就看懂了公平锁的源码。
我们先梳理一下获取锁的整体流程:
文章图片
我们从NonfairSync的lock方法开始看:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock.Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//当前线程尝试着将锁资源的状态从0变为1
//如果成功
//将资源占用的线程设置为本线程
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//如果失败
//走acquire方法
acquire(1);
}protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
继续查看acquire(1)方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire:
protected final boolean tryAcquire(int acquires) {
//点击这个方法来到下面这个方法
return nonfairTryAcquire(acquires);
}final boolean nonfairTryAcquire(int acquires) {
//先获得当前线程
final Thread current = Thread.currentThread();
int c = getState();
//如果当前锁的状态为0(未被占用状态)
if (c == 0) {
//尝试使用cas去占用
if (compareAndSetState(0, acquires)) {
//设置占用该资源的线程为本线程
setExclusiveOwnerThread(current);
return true;
}
}
//如果当前锁的状态不为0,但是占用该锁的线程是本线程
else if (current == getExclusiveOwnerThread()) {
//相当于可重入锁了,然后状态标志位+1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
addWaiter:
private Node addWaiter(Node mode) {
//将当前线程设置为一个Node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq;
backup to full enq on failure
//如果尾节点不为空
Node pred = tail;
if (pred != null) {
//将本线程的上一个节点设置为尾节点
node.prev = pred;
//使用cas交换尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果尾节点为空,则初始化
enq(node);
return node;
}private Node enq(final Node node) {
//死循环
for (;
;
) {
Node t = tail;
//如果尾节点是空,则创建一个傀儡节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//如果尾节点不为空,则交换尾节点并返回,跳出循环
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;
;
) {
final Node p = node.predecessor();
//如果node的前一个节点是头节点(队列前再无其它节点),且抢锁成功
if (p == head && tryAcquire(arg)) {
//设置当前的节点为头结点
setHead(node);
//原来的头结点的下一个节点置空,帮助GC
p.next = null;
// help GC
failed = false;
return interrupted;
}
//如果node 的前一个节点不是投节点
//更改当前节点的运行状态
if (shouldParkAfterFailedAcquire(p, node) &&
//暂停该线程
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
selfInterrupt:
返回该线程的中断状态
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
再来看看unlock方法:
public final boolean release(int arg) {
//尝试释放锁,如果锁的状态是0,则成功释放
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒头结点
unparkSuccessor(h);
return true;
}
return false;
}
6.总结
今天我们了解了AQS概念和源码阅读,AQS作为整个JUC的基石框架,是非常重要的。