并发编程从零开始(十三)-线程池

并发编程从零开始(十三)-线程池 第三部分:线程池与Future 9 线程池的实现原理
下图所示为线程池的实现原理:调用方不断地向线程池中提交任务;线程池中有一组线程,不断地从队列中取任务,这是一个典型的生产者—消费者模型。
并发编程从零开始(十三)-线程池
文章图片

要实现这样一个线程池,有几个问题需要考虑:

  1. 队列设置多长?如果是无界的,调用方不断地往队列中放任务,可能导致内存耗尽。如果是有界的,当队列满了之后,调用方如何处理?
  2. 线程池中的线程个数是固定的,还是动态变化的?
  3. 每次提交新任务,是放入队列?还是开新线程?
  4. 当没有任务的时候,线程是睡眠一小段时间?还是进入阻塞?如果进入阻塞,如何唤醒?
针对问题4,有3种做法:
  1. 不使用阻塞队列,只使用一般的线程安全的队列,也无阻塞/唤醒机制。当队列为空时,线程池中的线程只能睡眠一会儿,然后醒来去看队列中有没有新任务到来,如此不断轮询。
  2. 不使用阻塞队列,但在队列外部、线程池内部实现了阻塞/唤醒机制。
  3. 使用阻塞队列
很显然,做法3最完善,既避免了线程池内部自己实现阻塞/唤醒机制的麻烦,也避免了做法1的睡眠/轮询带来的资源消耗和延迟。正因为如此,接下来要讲的ThreadPoolExector/ScheduledThreadPoolExecutor都是基于阻塞队列来实现的,而不是一般的队列,至此,各式各样的阻塞队列就要派上用场了。
10 线程池的类继承体系
线程池的类继承体系如下图所示:
并发编程从零开始(十三)-线程池
文章图片

在这里,有两个核心的类: ThreadPoolExector 和ScheduledThreadPoolExecutor ,后者不仅可以执行某个任务,还可以周期性地执行任务。
向线程池中提交的每个任务,都必须实现 Runnable 接口,通过最上面的 Executor 接口中的execute(Runnable command) 向线程池提交任务。
然后,在 ExecutorService 中,定义了线程池的关闭接口 shutdown() ,还定义了可以有返回值的任务,也就是 Callable ,后面会详细介绍。
11 ThreadPoolExecutor
11.1 核心数据结构 基于线程池的实现原理,下面看一下ThreadPoolExector的核心数据结构。
并发编程从零开始(十三)-线程池
文章图片

每一个线程是一个Worker对象。Worker是ThreadPoolExector的内部类,核心数据结构如下:
并发编程从零开始(十三)-线程池
文章图片

由定义会发现,Worker继承于AQS,也就是说Worker本身就是一把锁。这把锁有什么用处呢?用于线程池的关闭、线程执行任务的过程中。
11.2 核心配置参数解释 ThreadPoolExecutor在其构造方法中提供了几个核心配置参数,来配置不同策略的线程池。
并发编程从零开始(十三)-线程池
文章图片

上面的各个参数,解释如下:
  1. corePoolSize:在线程池中始终维护的线程个数。
  2. maxPoolSize:在corePooSize已满、队列也满的情况下,扩充线程至此值。
  3. keepAliveTime/TimeUnit:maxPoolSize 中的空闲线程,销毁所需要的时间,总线程数收缩回corePoolSize。
  4. blockingQueue:线程池所用的队列类型。
  5. threadFactory:线程创建工厂,可以自定义,有默认值Executors.defaultThreadFactory() 。
  6. RejectedExecutionHandler:corePoolSize已满,队列已满,maxPoolSize 已满,最后的拒绝策略。
下面来看这6个配置参数在任务的提交过程中是怎么运作的。在每次往线程池中提交任务的时候,有如下的处理流程:
步骤一:判断当前线程数是否大于或等于corePoolSize。如果小于,则新建线程执行;如果大于,则进入步骤二。
步骤二:判断队列是否已满。如未满,则放入;如已满,则进入步骤三。
步骤三:判断当前线程数是否大于或等于maxPoolSize。如果小于,则新建线程执行;如果大于,则进入步骤四。
步骤四:根据拒绝策略,拒绝任务。
总结一下:首先判断corePoolSize,其次判断blockingQueue是否已满,接着判断maxPoolSize,最后使用拒绝策略。
很显然,基于这种流程,如果队列是无界的,将永远没有机会走到步骤三,也即maxPoolSize没有使用,也一定不会走到步骤四。
11.3 线程池的优雅关闭 线程池的关闭,较之线程的关闭更加复杂。当关闭一个线程池的时候,有的线程还正在执行某个任务,有的调用者正在向线程池提交任务,并且队列中可能还有未执行的任务。因此,关闭过程不可能是瞬时的,而是需要一个平滑的过渡,这就涉及线程池的完整生命周期管理。
1. 线程池的生命周期
在JDK 7中,把线程数量(workerCount)和线程池状态(runState)这两个变量打包存储在一个字段里面,即ctl变量。如下图所示,最高的3位存储线程池状态,其余29位存储线程个数。而在JDK 6中,这两个变量是分开存储的。
并发编程从零开始(十三)-线程池
文章图片

并发编程从零开始(十三)-线程池
文章图片

由上面的代码可以看到,ctl变量被拆成两半,最高的3位用来表示线程池的状态,低的29位表示线程的个数。线程池的状态有五种,分别是RUNNING、SHUTDOWN、STOP、TIDYING和TERMINATED。
下面分析状态之间的迁移过程,如图所示:
并发编程从零开始(十三)-线程池
文章图片

线程池有两个关闭方法,shutdown()和shutdownNow(),这两个方法会让线程池切换到不同的状态。在队列为空,线程池也为空之后,进入TIDYING 状态;最后执行一个钩子方法terminated(),进入TERMINATED状态,线程池才真正关闭。
这里的状态迁移有一个非常关键的特征:从小到大迁移,-1,0,1,2,3,只会从小的状态值往大的状态值迁移,不会逆向迁移。例如,当线程池的状态在TIDYING=2时,接下来只可能迁移到TERMINATED=3,不可能迁移回STOP=1或者其他状态。
除 terminated()之外,线程池还提供了其他几个钩子方法,这些方法的实现都是空的。如果想实现自己的线程池,可以重写这几个方法:
并发编程从零开始(十三)-线程池
文章图片

2. 正确关闭线程池的步骤
关闭线程池的过程为:在调用 shutdown()或者shutdownNow()之后,线程池并不会立即关闭,接下来需要调用 awaitTermination() 来等待线程池关闭。关闭线程池的正确步骤如下:
并发编程从零开始(十三)-线程池
文章图片

awaitTermination(...)方法的内部实现很简单,如下所示。不断循环判断线程池是否到达了最终状态TERMINATED,如果是,就返回;如果不是,则通过termination条件变量阻塞一段时间,之后继续判断。

3. shutdown()与shutdownNow()的区别
  1. shutdown()不会清空任务队列,会等所有任务执行完成,shutdownNow()清空任务队列。
  2. shutdown()只会中断空闲的线程,shutdownNow()会中断所有线程。
并发编程从零开始(十三)-线程池
文章图片

并发编程从零开始(十三)-线程池
文章图片

下面看一下在上面的代码里中断空闲线程和中断所有线程的区别。
shutdown()方法中的interruptIdleWorkers()方法的实现:
并发编程从零开始(十三)-线程池
文章图片

并发编程从零开始(十三)-线程池
文章图片

关键区别点在tryLock():一个线程在执行一个任务之前,会先加锁,这意味着通过是否持有锁,可以判断出线程是否处于空闲状态。tryLock()如果调用成功,说明线程处于空闲状态,向其发送中断信号;否则不发送。
tryLock()方法:
并发编程从零开始(十三)-线程池
文章图片

tryAcquire方法:
并发编程从零开始(十三)-线程池
文章图片

shutdownNow()调用了 interruptWorkers(); 方法:
并发编程从零开始(十三)-线程池
文章图片

interruptIfStarted() 方法的实现:
并发编程从零开始(十三)-线程池
文章图片

在上面的代码中,shutdown() 和shutdownNow()都调用了tryTerminate()方法,如下所示:
并发编程从零开始(十三)-线程池
文章图片

tryTerminate()不会强行终止线程池,只是做了一下检测:当workerCount为0,workerQueue为空时,先把状态切换到TIDYING,然后调用钩子方法terminated()。当钩子方法执行完成时,把状态从TIDYING 改为 TERMINATED,接着调用termination.sinaglAll(),通知前面阻塞在awaitTermination的所有调用者线程。
所以,TIDYING和TREMINATED的区别是在二者之间执行了一个钩子方法terminated(),目前是一个空实现。
11.4 任务的提交过程分析 提交任务的方法如下:
并发编程从零开始(十三)-线程池
文章图片

并发编程从零开始(十三)-线程池
文章图片

并发编程从零开始(十三)-线程池
文章图片

11.5 任务的执行过程分析 在上面的任务提交过程中,可能会开启一个新的Worker,并把任务本身作为firstTask赋给该Worker。但对于一个Worker来说,不是只执行一个任务,而是源源不断地从队列中取任务执行,这是一个不断循环的过程。
下面来看Woker的run()方法的实现过程。
并发编程从零开始(十三)-线程池
文章图片

并发编程从零开始(十三)-线程池
文章图片

并发编程从零开始(十三)-线程池
文章图片

1. shutdown()与任务执行过程综合分析
把任务的执行过程和上面的线程池的关闭过程结合起来进行分析,当调用 shutdown()的时候,可能
出现以下几种场景:
  1. 当调用shutdown()的时候,所有线程都处于空闲状态。这意味着任务队列一定是空的。此时,所有线程都会阻塞在 getTask()方法的地方。然后,所有线程都会收到interruptIdleWorkers()发来的中断信号,getTask()返回null,所有Worker都会退出while循环,之后执行processWorkerExit。
  2. 当调用shutdown()的时候,所有线程都处于忙碌状态。此时,队列可能是空的,也可能是非空的。interruptIdleWorkers()内部的tryLock调用失败,什么都不会做,所有线程会继续执行自己当前的任务。之后所有线程会执行完队列中的任务,直到队列为空,getTask()才会返回null。之后,就和场景1一样了,退出while循环。
  3. 当调用shutdown()的时候,部分线程忙碌,部分线程空闲。有部分线程空闲,说明队列一定是空的,这些线程肯定阻塞在 getTask()方法的地方。空闲的这些线程会和场景1一样处理,不空闲的线程会和场景2一样处理。
下面看一下getTask()方法的内部细节:
并发编程从零开始(十三)-线程池
文章图片

并发编程从零开始(十三)-线程池
文章图片

2. shutdown()与任务执行过程综合分析
shutdownNow则是将线程池的状态设置为STOP,正在执行的任务则被停止,迫使当前执行的所有任务停止工作。没被执行任务的则返回。和上面的 shutdown()类似,只是多了一个环节,即清空任务队列。
当一个Worker最终退出的时候,会执行清理工作:
并发编程从零开始(十三)-线程池
文章图片

并发编程从零开始(十三)-线程池
文章图片

11.6 线程池的4种拒绝策略 在execute(Runnable command)的最后,调用了reject(command)执行拒绝策略,代码如下所示:
并发编程从零开始(十三)-线程池
文章图片

并发编程从零开始(十三)-线程池
文章图片

handler就是我们可以设置的拒绝策略管理器:
并发编程从零开始(十三)-线程池
文章图片

RejectedExecutionHandler 是一个接口,定义了四种实现,分别对应四种不同的拒绝策略,默认是AbortPolicy。
并发编程从零开始(十三)-线程池
文章图片

ThreadPoolExecutor类中默认的实现是第二种(线程池抛异常):
并发编程从零开始(十三)-线程池
文章图片

并发编程从零开始(十三)-线程池
文章图片

四种策略的实现代码如下:
策略1:调用者直接在自己的线程里执行,线程池不处理,比如到医院打点滴,医院没地方了,到你家自己操作吧:
并发编程从零开始(十三)-线程池
文章图片

策略2:线程池抛异常:
并发编程从零开始(十三)-线程池
文章图片

策略3:线程池直接丢掉任务,神不知鬼不觉:
并发编程从零开始(十三)-线程池
文章图片

策略4:删除队列中最早的任务,将当前任务入队列:
【并发编程从零开始(十三)-线程池】并发编程从零开始(十三)-线程池
文章图片

    推荐阅读