CountDownLatch和ReentrantLock有很多相似的地方,因为相似部分内容已经在博客《ReentrantLock详解》中讨论过了,本文不会再次详细讨论。
CountDownLatch是一个计数(构造函数中指定此数值)的锁,当通过countDown方法将此计数值减为0时会唤醒之前调用await的线程。一般用于当某些任务执行完后,在执行其他任务的场景中。
一 CountDownLatch是什么
1 原理分析 CountDownLatch是一个同步的辅助类,它能够使一个线程等待其他线程完成各自的工作后再执行。
CountDownLatch是基于AbstractQueuedSynchronizer(AQS)实现的,其通过state作为计数器。构造CountDownLatch时初始化一个state,以后每调用countDown方法一次,state减1;当state=0时,唤醒在await上被挂起的线程。
CountDownLatch的计数器state不能被重置,如果需要一种能重置count的版本,可以考虑使用CyclicBarrier。
2 用途 CountDownlatch是一个多功能的同步工具,可以被用于各种目的。
一个CountDownLatch通过一个值为1的count被初始化,来作为一个开/关的门或门闩:所有调用了await()的线程都会在门前等待,直到门被一个线程通过调用countDown()打开。
一个被初始化为N的CountDownLatch可以被用来“在N个线程都完成了某种操作(或者一些操作已经被完成了N次)之后创建一个线程”。
二 源码分析
1 构造函数
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
CountDownLatch是通过一个计数器来实现的,计数器的初始值为等待线程数量。
2 等待信号 1) await
此方法将导致当前线程等待,直到CountDownLatch通过countDown()方法使计数器值为0
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);
}
2) acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();
if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}
3) tryAcquireShared
如果state=0,即计数器的值为0,返回1,表示不用等待;否则返回-1,可表示还需要等待。
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;
}
4) doAcquireSharedInterruptibly
此方法的主要作用是让获取不到锁的线程挂起,通过向等待队列中添加一个Node(此Node与当前线程相关联);接着判断是应该唤醒还是挂起。如果node是队列中第一个有效节点,那么唤醒对应的线程;否则通过LockSupport挂起线程。具体逻辑如下:
创建一个SHARED类型的节点,加入到等待队列中。
接下来无限循环,尝试进行以下操作,直到获取到锁或者因为取消获取锁而被唤醒。
如果node是队列中第一个等待线程,那么尝试获取读锁,获取成功后更新队列的head,如果后一个节点也是等待读锁,那么后面一个节点的线程线程。这一过程实现读锁共享。
【java|CountDownLatch】判断是否应该挂起当前节点对应的线程,如果应该挂起,则通过LockSupport挂起线程。
线程被唤醒以后,设置中断标志位。
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {for (;
;
) {final Node p = node.predecessor();
if (p == head) {int r = tryAcquireShared(arg);
if (r >= 0) {setHeadAndPropagate(node, r);
p.next = null;
// help GCfailed = false;
return;
}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();
}} finally {if (failed)cancelAcquire(node);
}}
3 计数器减1 计数器减1操作,当计数器减到0时,调用await的线程将被唤醒。
1) countDown
countDown是将计数器减1,其本质是释放一次锁。
public void countDown() {sync.releaseShared(1);
}
2) releaseShared
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();
return true;
}return false;
}
3) tryReleaseShared
释放锁,即将计数器-1;如果减-1后state=0,则表示全部的锁已经释放完了,当且仅当此时返回true。
protected boolean tryReleaseShared(int releases) {// Decrement count;
signal when transition to zerofor (;
;
) {int c = getState();
if (c == 0)return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))return nextc == 0;
}}
4) doReleaseShared
唤醒head后的一个处于挂起状态的线程。主要逻辑如下:
- 如果head的waitStatus=SIGNAL那么修改head.waitStatus=0,然后唤醒head后面的一个等待被唤醒的线程。
- 如果head的waitStatus=0那么修改head.waitStatus=PROPAGATE
- 循环进行以上两个操作直到成功、并且head不曾变动过方才退出循环。
private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases.This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;
;
) {Node h = head;
if (h != null && h != tail) {int ws = h.waitStatus;
if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;
// loop to recheck casesunparkSuccessor(h);
}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;
// loop on failed CAS}if (h == head)// loop if head changedbreak;
}}
4 PROPAGATE行为 当通过countDown操作将计数器的值改为0以后,通过doReleaseShared()方法中unparkSuccessor(h)代码的唤醒等待队列中的一个等待线程。
当第一个线程被唤醒以后,继续执行doAcquireSharedInterruptibly()方法的for循环,在此循环中执行到setHeadAndPropagate方法时,会将更新head,传播唤醒操作。
setHeadAndPropagate传播行为的逻辑如下:
- 设置将此次唤醒Node的线程信息清空,然后设置为新的head,
- 唤醒head后的Node对应的线程。唤醒时重新执行setHeadAndPropagate。
三 示例 以下示例中,我们初始化CountDownLatch的信号量是3,那么只有调用三次countDownLatch.countDown()时,之前通过countDownLatch.await()的线程才能被唤醒。
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(3);
for (int i = 0;
i < 3;
i++) {
String name = "T-"+i;
Thread thread = new Thread(() -> {
try {
System.out.println("时间="+DateUtil.getCurrentTime()+" -工作线程- thread="+name+" 开始获取资源");
doTask(name);
System.out.println("时间="+DateUtil.getCurrentTime()+" -工作线程- thread="+name+" 资源释放完成");
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
});
thread.start();
}
try {
System.out.println("时间="+DateUtil.getCurrentTime()+" 准备等待工作线程执行");
countDownLatch.await();
System.out.println("时间="+DateUtil.getCurrentTime()+" 工作线程已经执行完");
} catch (InterruptedException e) {
e.printStackTrace();
}
}private static void doTask(String name){
System.out.println("时间="+DateUtil.getCurrentTime()+" -工作线程- thread="+name+" 开始使用资源,执行任务");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
以下是输出结果,可以看到主线程在等待3个工作线程执行完以后才开始执行。
时间=11:05:01:952 -工作线程- thread=T-2 开始获取资源
时间=11:05:01:991 -工作线程- thread=T-2 开始使用资源,执行任务
时间=11:05:01:952 -工作线程- thread=T-0 开始获取资源
时间=11:05:01:994 -工作线程- thread=T-0 开始使用资源,执行任务
时间=11:05:01:952 准备等待工作线程执行
时间=11:05:01:952 -工作线程- thread=T-1 开始获取资源
时间=11:05:01:997 -工作线程- thread=T-1 开始使用资源,执行任务
时间=11:05:02:993 -工作线程- thread=T-2 资源释放完成
时间=11:05:02:998 -工作线程- thread=T-0 资源释放完成
时间=11:05:02:998 -工作线程- thread=T-1 资源释放完成
时间=11:05:02:999 工作线程已经执行完
推荐阅读
- Java|Java基础——数组
- 人工智能|干货!人体姿态估计与运动预测
- java简介|Java是什么(Java能用来干什么?)
- Java|规范的打印日志
- Linux|109 个实用 shell 脚本
- 程序员|【高级Java架构师系统学习】毕业一年萌新的Java大厂面经,最新整理
- Spring注解驱动第十讲--@Autowired使用
- SqlServer|sql server的UPDLOCK、HOLDLOCK试验
- jvm|【JVM】JVM08(java内存模型解析[JMM])
- 技术|为参加2021年蓝桥杯Java软件开发大学B组细心整理常见基础知识、搜索和常用算法解析例题(持续更新...)