9.Java中的线程池

Java并发编程的艺术笔记

  • 1.并发编程的挑战
  • 2.Java并发机制的底层实现原理
  • 3.Java内存模型
  • 4.Java并发编程基础
  • 5.Java中的锁的使用和实现介绍
  • 6.Java并发容器和框架
  • 7.Java中的12个原子操作类介绍
  • 8.Java中的并发工具类
  • 9.Java中的线程池
  • 10.Executor框架
前言
Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。
在开发过程中,合理地使用线程池能够带来3个好处。
  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用线程池,必须对其实现原理了如指掌。
线程池的实现原理 当向线程池提交一个任务之后,线程池是如何处理这个任务的呢?
本文来看一下线程池的主要处理流程,处理流程图下图所示。

9.Java中的线程池
文章图片
线程池的主要处理流程 从图中可以看出,当提交一个新任务到线程池时,线程池的处理流程如下。
  • 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。
  • 线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。
  • 线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。
ThreadPoolExecutor 执行 execute() 方法的示意图如下:

9.Java中的线程池
文章图片
ThreadPoolExecutor执行示意图
ThreadPoolExecutor执行execute方法分下面4种情况:
  • 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。上图1
  • 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue上图2
  • 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。上图3
  • 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。上图4
ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行 上图2 ,而 上图2 不需要获取全局锁。
  • 源码分析:上面的流程分析让我们很直观地了解了线程池的工作原理,让我们再通过源代码来看看是如何实现的,线程池执行任务的方法如下:
    public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { // 如果线程数小于基本线程数,则创建线程并执行当前任务 if (addWorker(command, true)) return; c = ctl.get(); }// 如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中。 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) // 如果线程池不处于运行中或任务无法放入队列, //并且当前线程数量小于最大允许的线程数量,则创建一个线程执行任务.// 抛出RejectedExecutionException异常 reject(command); }

  • 工作线程:线程池创建线程时,会将线程封装成工作线程WorkerWorker在执行完任务后,还会循环获取工作队列里的任务来执行。我们可以从Worker类的run()方法里看到这点。
    public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } }

ThreadPoolExecutor中线程执行任务的示意图如下:

9.Java中的线程池
文章图片
ThreadPoolExecutor中线程执行任务的示意图
线程池中的线程执行任务分两种情况:
  • execute()方法中创建一个线程时,会让这个线程执行当前任务。
  • 这个线程执行完上图中1的任务后,会反复从BlockingQueue获取任务来执行。
线程池的使用 线程池的创建 我们可以通过ThreadPoolExecutor来创建一个线程池。
ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);

创建一个线程池时需要输入几个参数:
  • corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。
    如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。
  • maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果。
  • keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。
  • TimeUnit(线程活动保持时间的单位),可选的单位有:
    • 天(DAYS
    • 小时(HOURS
    • 分钟(MINUTES
    • 毫秒(MILLISECONDS
    • 微秒(MICROSECONDS,千分之一毫秒)
    • 纳秒(NANOSECONDS,千分之一微秒)
  • workQueue(任务队列):用于保存等待执行的任务的阻塞队列。
    可以选择以下几个阻塞队列:
    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
    • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
    • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
  • ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线程设置有意义的名字,代码如下:
    new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();

  • RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。在JDK 1.5中Java线程池框架提供了以下4种策略。
    • AbortPolicy:直接抛出异常。
    • CallerRunsPolicy:只用调用者所在线程来运行任务。
    • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
    • DiscardPolicy:不处理,丢弃掉。
    当然,也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化存储不能处理的任务。
向线程池提交任务 可以使用两个方法向线程池提交任务,分别为execute()submit()方法。