Java并发编程|Java并发编程 - 深入剖析ReentrantLock之非公平锁加锁流程(第1篇)
这篇文章不是讲ReentrantLock的使用,而是通过调试,分析其实现原理。非公平锁 测试代码如下:
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockExample {private ReentrantLock lock = new ReentrantLock();
public void doSomething() {lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " has been acquired the lock...");
} finally {
lock.unlock();
System.out.println(Thread.currentThread().getName() + " has been released the lock...");
}}public static void main(String[] args) {ReentrantLockExample lockExample = new ReentrantLockExample();
Thread A = new Thread(()->{
lockExample.doSomething();
}, "thread-A");
Thread B = new Thread(()->{
lockExample.doSomething();
}, "thread-B");
Thread C = new Thread(()->{
lockExample.doSomething();
}, "thread-C");
Thread D = new Thread(()->{
lockExample.doSomething();
}, "thread-D");
A.start();
B.start();
C.start();
D.start();
}
}
代码测试的是4个线程争抢一把锁。
1. 加锁流程解析 通过IDEA多线程调试工具进行调试,调试模拟情景为:A-B-C-D依次获取锁,等所有的获取请求结束,再依次释放锁。
第一步:线程A请求锁
调用ReentrantLock的lock方法:
public void lock() {
sync.lock();
}
执行到此出,将处理交给sync,我们这里是非公平锁,则sync为NonfairSync,调用方法:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
通过CAS设置state变量,期望当前值是0,更新值是1。state初始值为0,无其他线程更改过,所以这里CAS设置成功,state值变为1。并且exclusiveOwnerThread设置为thread-A对象。
执行图:
thread-A加锁.png lock对象内容:
thread-A加锁后ReentrantLock对象内容.png 第二步:线程B请求锁
挂起A线程,执行B线程。
线程B执行到此代码:
ReentrantLock.java
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
此时state=1,此处CAS执行失败,接下来执行else语句代码。
【Java并发编程|Java并发编程 - 深入剖析ReentrantLock之非公平锁加锁流程(第1篇)】acquire方法在AbstractQueuedSynchronizer类中定义:
AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
if条件中有两处,若第一次为false,则第二处会执行。
先来看tryAcquire, tryAcquire在AbstractQueuedSynchronizer重定义,但是没有定义具体实现,具体实现交给子类,我们这里就是NonfairSync类实现。
NonfairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
nonfairTryAcquire在NonfairSync直接父类也就是Sync中定义:
Sync
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
此时state=1,exclusiveOwnerThread=thread-A, current = thread-B。此方法内逻辑不执行,返回false。
返回执行acquireQueued方法。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
执行这个方法前要先执行addWaiter方法,此方法用于创建即将入同步队列的Node(节点)。
AbstractQueueSynchronizer.java
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq;
backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
此时tail = null,执行enq方法。
AbstractQueueSynchronizer.java
private Node enq(final Node node) {
for (;
;
) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- 第一次循环:创建一个Node对象,设置给head。同时将head赋值给tail,这样head和tail指向同一个对象。两者不再为null。
- 第二次循环:将B节点的前置设置为t(t是中间变量指向的就是head/tail节点),然后将B设置为tail。执行完成后返回t,此时t和head执行同一个对象,其实返回的就是head节点。
文章图片
head_tail-1.png 注意一下,虽然addWaiter方法返回的是头节点,但是node还是指向的B节点。接着执行acquireQueued方法。
AbstractQueuedSynchronizer.java
final boolean acquireQueued(final Node node, int arg) {// Node此时是B节点
boolean failed = true;
try {
boolean interrupted = false;
for (;
;
) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
// help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 第一次循环:p为head节点,tryAcquire和前面一样返回false,第一个if语句不行。紧接着执行shouldParkAfterFailedAcquire方法,方法代码如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE.Indicate that we
* need a signal, but don't park yet.Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
此时pred=head, node = B, ws = 0, 执行:
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
这个执行完成后,head的waitStatus被设置成Node.SINGAL,这个表示head节点的后继者也就是B节点代表的线程需要被通知。返回false。
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
if代码块不执行。
- 第二次循环: p还是head节点,tryAcquire和第一次循环一样,第一个if依然不执行,继续执行shouldParkAfterFailedAcquire,此时shouldParkAfterFailedAcquire方法内pre是head节点,而head节点经过第一次循环waitStatus被设置为-1。shouldParkAfterFailedAcquire(p, node)返回true, 则接下来执行parkAndCheckInterrupt()。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
线程B执行到LockSupport.park(this)处被挂起。
执行图:
thread-B加锁.png 第三步:线程C请求锁
注意:此时线程A是我们主动(通过IDE操作)挂起的(RUNNING),线程B通过上一步的操作被动(park操作)挂起的(WAITTING)。
因为前面已经见到一些逻辑的执行了,这里就不再重复,直接讲关系到Node的操作。通过addWaiter操作,为线程C创建一个Node节点:
AbstractQueuedSynchronzier.java
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq;
backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
此时tail为B节点,然后将C节点的prev设置为B,通过CAS操作将C节点设置为tail,设置成功将B的next设置为C。然后直接返回C节点。
执行完成后head和tail的内容如下:
head_tail-2.png addWaiter返回C节点后,执行acquireQueued方法:
AbstractQueuedSynchronzier.java
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;
;
) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null;
// help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
此时node=C
- 第一次循环: C的前置节点是B,这里p=B,第一个if不执行,执行
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE.Indicate that we
* need a signal, but don't park yet.Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
pre=B,B的waitStatus=0,则执行compareAndSetWaitStatus(pred, ws, Node.SIGNAL),B的waitStatus被设为-1,表示后继节点,即C节点所代表的线程需要被唤醒。继续执行线程C被挂起。
执行图:
thread-C加锁.png 第四步:线程D请求锁
线程D请求锁和上面的流程几乎是一样了,这里就不再分别调试,执行图:
文章图片
thread-D加锁.png
推荐阅读
- JAVA(抽象类与接口的区别&重载与重写&内存泄漏)
- 事件代理
- Java|Java OpenCV图像处理之SIFT角点检测详解
- java中如何实现重建二叉树
- 数组常用方法一
- 【Hadoop踩雷】Mac下安装Hadoop3以及Java版本问题
- python青少年编程比赛_第十一届蓝桥杯大赛青少年创意编程组比赛细则
- Java|Java基础——数组
- RxJava|RxJava 在Android项目中的使用(一)
- java之static、static|java之static、static final、final的区别与应用