面试官给我讲讲线程池(下)

前景回顾 大家好呀,我是小张,我们在上一篇中 面试官:给我讲讲线程池(中),通过线程池的常见API作为切入点,分析了execute方法的源码,其中学到了Doug Lea老爷子把一个变量拆成两个变量的骚操作、无锁开发(CAS)、如何并发控制线程池状态等,以及最后通过源码阅读给大家带来了我认为高效阅读源码的方法论。
如果还没有读过上一篇的小伙伴请先阅读上一篇,在本篇中将会围绕以下几个API的源码阅读中展开。

  • processWorkerExit
  • shutdown
  • shutdownNow
  • awaitTermination
工作线程退出 我们继续上一篇中worker的runWorker()的worker线程结束开始讲,下面我将代码中其他流程简化只展示了核心代码。
final void runWorker(Worker w) { boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

在上一篇中我们通过分析只有当getTask()方法返回null的时候,while循环才会结束,才会进入processWorkerExit()方法中。
其中getTask()会在线程池状态改变时(变为非Running),或者当获取任务超时且当前工作线程数大于核心线程数返回null。
那么理所应当的processWorkerExit方法就是执行工作线程的清理工作,下面将通过源码带大家看看是如何进行工作线程的清理的。
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果非正常结束,将工作线程数-1 if (completedAbruptly) decrementWorkerCount(); // 获取锁,该锁主要为了操作工作线程集合 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 汇总该worker线程完成的任务数 completedTaskCount += w.completedTasks; // 从worker集合中删除 workers.remove(w); } finally { mainLock.unlock(); }// 因为线程池的状态不知道在何时会被修改,所以需要尝试去结束线程池 tryTerminate(); int c = ctl.get(); // 当Running或SHUTDOWN状态 if (runStateLessThan(c, STOP)) { // 如果非异常中断,计算最小应存活工作线程数 if (!completedAbruptly) { // 如允许核心线程超时,最小值0 否则为核心线程数 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 当最小值为0时,任务队列还有任务时,至少还需要一个线程去处理 if (min == 0 && ! workQueue.isEmpty()) min = 1; // 判断当前工作线程数是否小于最小值,如是直接返回 if (workerCountOf(c) >= min) return; // replacement not needed } // 如果异常中断或当前工作线程数小于最小值,需要重新添加工作线程 addWorker(null, false); } }

【面试官给我讲讲线程池(下)】通过上方分析,我们可以推断出工作线程清理过程是先清理worker集合,再会执行尝试结束线程池,然后如果线程池状态处于Running或SHUTDOWN时,将会计算最小工作线程数min,保证当前工作线程数总是有min个存活。有小伙伴可能会好奇,为什么会去尝试去结束线程池,其实道理很简单,线程池何时被结束对于程序是未知的,所以需要在每个线程经过的地方来去判断状态是否变化。
关闭线程池 在实际场景中,关闭线程池有两种方式,一种调用shutdown一种调用shutdownNow,两者区别在于前者线程池还可以继续处理线程池中任务,后者将会中断所有任务并将未执行的任务返回。下面我将通过对比的方式,让大家更清楚的阅读其中的不同。
面试官给我讲讲线程池(下)
文章图片

通过上图红框中,首先两者要改变的状态不同,一个要改变为SHUTDOWN一个要改变为STOP状态。其次一个要中断空闲线程,一个要中断所有线程。如果有小伙伴不了解这里的线程池状态,请到上一篇文章阅读。
那么线程池是如何被改变状态,工作线程又是如何被中断的呢,阅读下面的源码将豁然开朗。
advanceRunState
// 推进线程池状态 private void advanceRunState(int targetState) { for (; ; ) { int c = ctl.get(); if ( // 如果当前状态大于等于目标状态则break runStateAtLeast(c, targetState) || // 如果当前状态小于目标状态则使用CAS尝试去修改,修改成功则break ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } }

上方的代码非常简单,在多线程中并发中,线程池的状态随时可能发生变化。所以这里需要死循环通过CAS的方式去修改线程池状态,保证原子性。
interruptIdleWorkers
private void interruptIdleWorkers() { interruptIdleWorkers(false); }private void interruptIdleWorkers(boolean onlyOne) { // 我们需要操作worker集合,所以上锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 遍历worker集合 for (Worker w : workers) { Thread t = w.thread; if ( // 线程非中断 !t.isInterrupted() // 尝试获取worker的锁,判断worker是否空闲 && w.tryLock()) { try { // 空闲worker,直接使用中断 t.interrupt(); } catch (SecurityException ignore) { } finally { // 释放worker的锁 w.unlock(); } } // 如只中断一个,直接break if (onlyOne) break; } } finally { // 释放操作worker集合的锁 mainLock.unlock(); } }

阅读下来,该方法阅读并不困难,但是在worker是否空闲处,使用了tryLock方法去判断worker是否空闲,小伙伴们可能会疑问为什么tryLock成功就是空闲的worker线程呢?这里我们就需要结合runWorker()方法去看。
面试官给我讲讲线程池(下)
文章图片

如上图,我们会发现工作线程池要么处于等待①处返回task任务,要么处于③执行任务。我们会发现在获取task返回时,就会进入②,这里不熟悉AQS的小伙伴可以把②处认为是标记worker线程被占用。任务执行完毕将会调用④方法,标记释放worker线程。所以我们可以得知,只要尝试占用成功的都是没有获取到task任务的worker线程,换言之这些线程就是空闲线程。
interruptWorkers 根据字面意思,该方法作用是中断所有工作线程池,不管是否正在执行任务。
private void interruptWorkers() { // 获取占用worker集合的锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 遍历worker集合 for (Worker w : workers) // 直接中断 w.interruptIfStarted(); } finally { // 释放锁 mainLock.unlock(); } }void interruptIfStarted() { Thread t; if ( // 0为未被占用、1为被占用 getState() >= 0 && // 线程不会空且未被标记为中断,则调用线程的中断 (t = thread) != null && !t.isInterrupted()) { try { // 线程中断 t.interrupt(); } catch (SecurityException ignore) { } } }

我们会发现中断所有线程和中断空闲线程的方法中,唯一不同的是这个方法使用了getState()大于等于0来判断线程状态。
那么为什么是大于等于0呢,我们回到worker中的lock和unlock方法中。
public void lock(){ acquire(1); } public boolean tryLock(){ return tryAcquire(1); } public void unlock(){ release(1); }Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }

熟悉AQS的小伙伴肯定会明白,调用lock方法就会将状态改为1,unlock方法将会把状态改为0。我们回到worker类的构造函数中,会发现第一条就是先把状态设置为-1,到这里是不是豁然开朗了,这就解释了为什么使用大于等0来判断线程状态,为了就是刚创建完成的worker线程是不需要被中断的。
tryTerminate 我们会发现在shutdown和shutdownNow方法的最后都调用了tryTerminate方法,那么其中都做了什么操作?
final void tryTerminate() { for (; ; ) { // 获取线程池状态 int c = ctl.get(); // 检查是否可以Terminate if ( // 状态处于Running isRunning(c) || // 大于等于TIDYING=TIDYING、TERMINATED runStateAtLeast(c, TIDYING) || // 处于SHUTDOWN状态且任务队列非空 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // ①执行到此说明状态处于SHUTDOWN且任务队列为空,说明可以开始终结线程池了 if (workerCountOf(c) != 0) { // 如果当前工作线程不为0,则中断1个工作线程,具体为什么是一个,下文会有解释 interruptIdleWorkers(ONLY_ONE); return; }// 执行至此,说明工作线程数已为0,且状态处于SHUTDOWN,任务队列为空 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 尝试CAS修改状态为TIDYING if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 钩子方法,子类实现 terminated(); } finally { // 钩子方法执行成功,设置状态为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); // 唤醒所有等待TERMINATED的线程,下文有会有解释 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }

阅读完该方法的小伙伴肯定不理解,①处代码为什么判断工作线程不等于0,只中断一个工作线程,而不是中断所有工作线程。我们先上结论,不中断所有线程的原因是为了,到时候还需要至少一个线程执行①后面的代码,也就是需要线程去修改线程状态,不然就没有线程就修改状态了。
听到这个结论,小伙伴肯定是一头雾水,当初我看到这段源码也是一头雾水,不知道老爷子为什么这么设计。直到我在IDE里点击该方法查看它的引用时,一切就豁然开朗了。
面试官给我讲讲线程池(下)
文章图片

如上图我们会发现,tryTerminate()方法,在processWorkerExit()方法中被引用到了,我们在 上一篇分析runWorker方法中会发现该方法处于,工作线程run的finally块中,也就是所有工作线程执行完成就会调用该方法。
到这里我们知道了工作线程的最后都将执行tryTerminate方法,但是还不能解释为什么工作线程数为0,而不是1呢,不是说保证至少一个工作线程存活执行就可以了吗?
面试官给我讲讲线程池(下)
文章图片

我们的条件是工作线程数等于0,那么肯定有什么地方进行数量减少,而且这个方法要处于runWorker方法里面。顺藤摸瓜,跟着这个思路我们找到了getTask()方法,我们会发现①处,当处于SHUTDOWN时工作队列为空或状态为STOP、TIDYING、TERMINATED其中之一,就将工作线程数减一。其次我们会发现当工作线程处于②处等待获取task 时被中断的中断异常将会被③处抓住,然后重新回到①处减少工作线程数,也就是说最后肯定会存在最后一个线程将此处工作线程数减为0,然后就去修改线程池状态到终态。
一图胜千言,下面我通过流程图向大家展示内部是如何运转的。
面试官给我讲讲线程池(下)
文章图片

我们将所有流程串联起来,左右两边是同时在运行的,所以这也就解释了为什么每次中断一个且工作线程等于0才会继续往下执行了。
等待线程池结束 经过上面长篇大论的论述,我们终于来到了最后一个方法中,最后一个方法也是一个非常简单的方法,我相信小伙伴们有了上面的基础,再来看这一段代码,简直是小巫见大巫。
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (; ; ) { // 如果当前状态为TERMINATED,就返回true if (runStateAtLeast(ctl.get(), TERMINATED)) return true; // 已超时,返回false if (nanos <= 0) return false; // 未超时,继续等待 nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }

状态为TERMINATED直接返回、超时直接返回这两种情况都很好理解,那假如我在等待期间线程状态变为TERMINATED了呢。是不是需要某个人来将我从等待中唤醒。
面试官给我讲讲线程池(下)
文章图片

我们发现在tryTerminate方法设置TERMINATED成功以后,马上就通知了所有在等待结束的线程了,那么这就串上了。
写在最后 看到这里的小伙伴,小张非常感谢你,谢谢你能够坚持看到这里。其次你也得感谢自己,感谢自己的坚持看到这里,将一个线程池核心源码盘然巨物给阅读完了。相信在这个过程,你也学习到了不少知识。在实际业务中,你可能不会写出那么复杂的代码,各种状态充斥在代码中,但是老爷子其中的不少思想都是我们可以借鉴学习。比如我们在第一篇中写到抽象模板方法、第二篇中CAS无锁编程、第三篇如何中断执行中的线程等等。

    推荐阅读