java|CountDownLatch

CountDownLatch和ReentrantLock有很多相似的地方,因为相似部分内容已经在博客《ReentrantLock详解》中讨论过了,本文不会再次详细讨论。
CountDownLatch是一个计数(构造函数中指定此数值)的锁,当通过countDown方法将此计数值减为0时会唤醒之前调用await的线程。一般用于当某些任务执行完后,在执行其他任务的场景中。
一 CountDownLatch是什么 1 原理分析 CountDownLatch是一个同步的辅助类,它能够使一个线程等待其他线程完成各自的工作后再执行。
CountDownLatch是基于AbstractQueuedSynchronizer(AQS)实现的,其通过state作为计数器。构造CountDownLatch时初始化一个state,以后每调用countDown方法一次,state减1;当state=0时,唤醒在await上被挂起的线程。
CountDownLatch的计数器state不能被重置,如果需要一种能重置count的版本,可以考虑使用CyclicBarrier。

2 用途 CountDownlatch是一个多功能的同步工具,可以被用于各种目的。
一个CountDownLatch通过一个值为1的count被初始化,来作为一个开/关的门或门闩:所有调用了await()的线程都会在门前等待,直到门被一个线程通过调用countDown()打开。
一个被初始化为N的CountDownLatch可以被用来“在N个线程都完成了某种操作(或者一些操作已经被完成了N次)之后创建一个线程”。

二 源码分析 1 构造函数

public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }

CountDownLatch是通过一个计数器来实现的,计数器的初始值为等待线程数量。

2 等待信号 1) await
此方法将导致当前线程等待,直到CountDownLatch通过countDown()方法使计数器值为0
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1); }


2) acquireSharedInterruptibly
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException(); if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg); }


3) tryAcquireShared
如果state=0,即计数器的值为0,返回1,表示不用等待;否则返回-1,可表示还需要等待。
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1; }

4) doAcquireSharedInterruptibly
此方法的主要作用是让获取不到锁的线程挂起,通过向等待队列中添加一个Node(此Node与当前线程相关联);接着判断是应该唤醒还是挂起。如果node是队列中第一个有效节点,那么唤醒对应的线程;否则通过LockSupport挂起线程。具体逻辑如下:
创建一个SHARED类型的节点,加入到等待队列中。
接下来无限循环,尝试进行以下操作,直到获取到锁或者因为取消获取锁而被唤醒。
如果node是队列中第一个等待线程,那么尝试获取读锁,获取成功后更新队列的head,如果后一个节点也是等待读锁,那么后面一个节点的线程线程。这一过程实现读锁共享。
【java|CountDownLatch】判断是否应该挂起当前节点对应的线程,如果应该挂起,则通过LockSupport挂起线程。
线程被唤醒以后,设置中断标志位。
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED); boolean failed = true; try {for (; ; ) {final Node p = node.predecessor(); if (p == head) {int r = tryAcquireShared(arg); if (r >= 0) {setHeadAndPropagate(node, r); p.next = null; // help GCfailed = false; return; }}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException(); }} finally {if (failed)cancelAcquire(node); }}


3 计数器减1 计数器减1操作,当计数器减到0时,调用await的线程将被唤醒。
1) countDown
countDown是将计数器减1,其本质是释放一次锁。
public void countDown() {sync.releaseShared(1); }


2) releaseShared

public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared(); return true; }return false; }


3) tryReleaseShared
释放锁,即将计数器-1;如果减-1后state=0,则表示全部的锁已经释放完了,当且仅当此时返回true。
protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (; ; ) {int c = getState(); if (c == 0)return false; int nextc = c-1; if (compareAndSetState(c, nextc))return nextc == 0; }}

4) doReleaseShared
唤醒head后的一个处于挂起状态的线程。主要逻辑如下:
  • 如果head的waitStatus=SIGNAL那么修改head.waitStatus=0,然后唤醒head后面的一个等待被唤醒的线程。
  • 如果head的waitStatus=0那么修改head.waitStatus=PROPAGATE
  • 循环进行以上两个操作直到成功、并且head不曾变动过方才退出循环。
private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases.This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (; ; ) {Node h = head; if (h != null && h != tail) {int ws = h.waitStatus; if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h); }else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head)// loop if head changedbreak; }}


4 PROPAGATE行为 当通过countDown操作将计数器的值改为0以后,通过doReleaseShared()方法中unparkSuccessor(h)代码的唤醒等待队列中的一个等待线程。
当第一个线程被唤醒以后,继续执行doAcquireSharedInterruptibly()方法的for循环,在此循环中执行到setHeadAndPropagate方法时,会将更新head,传播唤醒操作。
setHeadAndPropagate传播行为的逻辑如下:
  • 设置将此次唤醒Node的线程信息清空,然后设置为新的head,
  • 唤醒head后的Node对应的线程。唤醒时重新执行setHeadAndPropagate。

三 示例 以下示例中,我们初始化CountDownLatch的信号量是3,那么只有调用三次countDownLatch.countDown()时,之前通过countDownLatch.await()的线程才能被唤醒。
public static void main(String[] args) { CountDownLatch countDownLatch = new CountDownLatch(3); for (int i = 0; i < 3; i++) { String name = "T-"+i; Thread thread = new Thread(() -> { try { System.out.println("时间="+DateUtil.getCurrentTime()+" -工作线程- thread="+name+" 开始获取资源"); doTask(name); System.out.println("时间="+DateUtil.getCurrentTime()+" -工作线程- thread="+name+" 资源释放完成"); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } }); thread.start(); } try { System.out.println("时间="+DateUtil.getCurrentTime()+" 准备等待工作线程执行"); countDownLatch.await(); System.out.println("时间="+DateUtil.getCurrentTime()+" 工作线程已经执行完"); } catch (InterruptedException e) { e.printStackTrace(); } }private static void doTask(String name){ System.out.println("时间="+DateUtil.getCurrentTime()+" -工作线程- thread="+name+" 开始使用资源,执行任务"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } }


以下是输出结果,可以看到主线程在等待3个工作线程执行完以后才开始执行。
时间=11:05:01:952 -工作线程- thread=T-2 开始获取资源
时间=11:05:01:991 -工作线程- thread=T-2 开始使用资源,执行任务
时间=11:05:01:952 -工作线程- thread=T-0 开始获取资源
时间=11:05:01:994 -工作线程- thread=T-0 开始使用资源,执行任务
时间=11:05:01:952 准备等待工作线程执行
时间=11:05:01:952 -工作线程- thread=T-1 开始获取资源
时间=11:05:01:997 -工作线程- thread=T-1 开始使用资源,执行任务
时间=11:05:02:993 -工作线程- thread=T-2 资源释放完成
时间=11:05:02:998 -工作线程- thread=T-0 资源释放完成
时间=11:05:02:998 -工作线程- thread=T-1 资源释放完成
时间=11:05:02:999 工作线程已经执行完

    推荐阅读