Java线程池的使用

我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?在Java中可以通过线程池来达到这样的效果。今天我们就来了解一下Java线程池的相关知识。
线程池的使用 在Java1.5中提供了Executor框架用于把任务的提交和执行解耦,任务的提交交给Runnable或者Callable,而 Executor框架用来处理任务。Executor框架中最核心的成员就是 ThreadPoolExecutor,它是线程池的核心实现类。我们可以通过ThreadPoolExecutor来创建一个线程池。

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {...}

  • corePoolSize:核心线程池数量,默认情况下,核心线程会在线程池中一直存活,即使它们处于闲置状态。
  • maximumPoolSize:最大线程数量,当活动线程数达到这个数值后,后续的新任务将会被阻塞。
  • keepAliveTime:非核心线程闲置的超时时间,超过这个时间则回收。如果任务很多,并且每个任务的执行事件很短,则可以调大keepAliveTime来提高线程的利用率。如果设置allowCoreThreadTimeOut属性为true时,keepAliveTime也会应用到核心线程上。
  • TimeUnit:keepAliveTime参数的时间单位。可选的单位有天(DAYS)、小时(HOURS)、分钟 (MINUTES)、秒(SECONDS)、毫秒(MILLISECONDS)等。
  • workQueue:保存待执行任务的阻塞队列,如果当前线程数大于corePoolSize,则将任务添加到此任务队列中,也就是阻塞队列。
  • ThreadFactory:线程工厂,为线程池提供创建新线程的功能,ThreadFactory是一个接口,只有一个方法:newThread(Runnable r)。
  • RejectedExecutionHandler:饱和策略,当任务队列和线程池都满了时所采取的应对策略;
    (1). AbordPolicy 无法处理新任务,并抛出RejectedExecutionException异常
    (2). CallerRunsPolicy 用调用者所在的线程来处理任务。此策略提供简单的反馈控制机制,能够减缓 新任务的提交速度
    (3). DiscardPolicy 不能执行的任务,并将该任务删除。
    (4). DiscardOldestPolicy 丢弃队列最近的任务,并执行当前的任务。
ThreadPoolExecutor使用示例
/** * 通过ThreadPoolExecutor来创建一个线程池 */ private static void TestThreadPoolExecutor() { ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(5)); for(int i=0; i<15; i++){ MyTask myTask = new MyTask(i); executor.execute(myTask); System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+ executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount()); } executor.shutdown(); } private static class MyTask implements Runnable { private int taskNum; MyTask(int num) { this.taskNum = num; } @Override public void run() { System.out.println("正在执行task " + taskNum); try { Thread.currentThread().sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("task " + taskNum + "执行完毕"); } } //执行结果: //正在执行task 0 //线程池中线程数目:1,队列中等待执行的任务数目:0,已执行玩别的任务数目:0 //线程池中线程数目:2,队列中等待执行的任务数目:0,已执行玩别的任务数目:0 //线程池中线程数目:3,队列中等待执行的任务数目:0,已执行玩别的任务数目:0 //正在执行task 1 //线程池中线程数目:4,队列中等待执行的任务数目:0,已执行玩别的任务数目:0 //正在执行task 2 //线程池中线程数目:5,队列中等待执行的任务数目:0,已执行玩别的任务数目:0 //线程池中线程数目:5,队列中等待执行的任务数目:1,已执行玩别的任务数目:0 //线程池中线程数目:5,队列中等待执行的任务数目:2,已执行玩别的任务数目:0 //线程池中线程数目:5,队列中等待执行的任务数目:3,已执行玩别的任务数目:0 //线程池中线程数目:5,队列中等待执行的任务数目:4,已执行玩别的任务数目:0 //正在执行task 3 //线程池中线程数目:5,队列中等待执行的任务数目:5,已执行玩别的任务数目:0 //正在执行task 4 //线程池中线程数目:6,队列中等待执行的任务数目:5,已执行玩别的任务数目:0 //线程池中线程数目:7,队列中等待执行的任务数目:5,已执行玩别的任务数目:0 //正在执行task 10 //线程池中线程数目:8,队列中等待执行的任务数目:5,已执行玩别的任务数目:0 //线程池中线程数目:9,队列中等待执行的任务数目:5,已执行玩别的任务数目:0 //线程池中线程数目:10,队列中等待执行的任务数目:5,已执行玩别的任务数目:0 //正在执行task 11 //正在执行task 12 //正在执行task 13 //正在执行task 14 //task 14执行完毕 //task 2执行完毕 //task 13执行完毕 //task 12执行完毕 //task 11执行完毕 //task 10执行完毕 //正在执行task 8 //正在执行task 9 //正在执行task 7 //正在执行task 5 //task 4执行完毕 //task 3执行完毕 //正在执行task 6 //task 0执行完毕 //task 1执行完毕 //task 8执行完毕 //task 7执行完毕 //task 5执行完毕 //task 6执行完毕 //task 9执行完毕

从执行结果可以看出,当线程池中线程的数目大于5时,便将任务放入任务缓存队列里面,当任务缓存队列满了之后,便创建新的线程。如果上面程序中,将for循环中改成执行20个任务,就会抛出任务拒绝异常了(java.util.concurrent.RejectedExecutionException)。
线程池的种类 在java中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池。,其中有 4 种线程池比较常用,它们分别是 FixedThreadPool、CachedThreadPool、SingleThreadExecutor和 ScheduledThreadPool。
  1. FixedThreadPool
    是一个重用固定线程数的线程池,可控制线程最大并发数,超出的线程会在队列中等待。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }

示例
ExecutorService executorService = Executors.newFixedThreadPool(2); for (int i = 0; i < 5; i++) { MyTask myTask = new MyTask(i); executorService.execute(myTask); } executorService.shutdown(); //正在执行task 0 //正在执行task 1 //task 1执行完毕 //task 0执行完毕 //正在执行task 3 //正在执行task 2 //task 3执行完毕 //task 2执行完毕 //正在执行task 4 //task 4执行完毕

FixedThreadPool的corePoolSize和maximumPoolSize都设置为创建FixedThreadPool指定的参数nThreads, 也就意味着FixedThreadPool只有核心线程,并且数量是固定的,没有非核心线程。keepAliveTime设置为0L 意味着多余的线程会被立即终止。因为不会产生多余的线程,所以keepAliveTime是无效的参数。另外,任 务队列采用了无界的阻塞队列LinkedBlockingQueue。当执行execute方法时,如果当前运行的线程未达到corePoolSize(核心线程数)时 就创建核心线程来处理任务,如果达到了核心线程数则将任务添加到LinkedBlockingQueue中。 FixedThreadPool就是一个有固定数量核心线程的线程池,并且这些核心线程不会被回收。当线程数超过 corePoolSize 时,就将任务存储在任务队列中;当线程池有空闲线程时,则从任务队列中去取任务执行。
  1. CachedThreadPool
    newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }

示例:
ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 5; i++) { MyTask myTask = new MyTask(i); executorService.execute(myTask); } executorService.shutdown(); //正在执行task 1 //正在执行task 0 //正在执行task 2 //正在执行task 3 //正在执行task 4 //task 0执行完毕 //task 1执行完毕 //task 4执行完毕 //task 2执行完毕 //task 3执行完毕

CachedThreadPool的corePoolSize为0,maximumPoolSize设置为Integer.MAX_VALUE,这意味着 CachedThreadPool没有核心线程,非核心线程是无界的。keepAliveTime设置为60L,则空闲线程等待新任务 的最长时间为 60s。在此用了阻塞队列 SynchronousQueue,它是一个不存储元素的阻塞队列,每个插入操作 必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。
当执行execute方法时,首先会执行SynchronousQueue的offer方法来提交任务,并且查询线程池中是否 有空闲的线程执行SynchronousQueue的poll方法来移除任务。如果有则配对成功,将任务交给这个空闲的线 程处理;如果没有则配对失败,创建新的线程去处理任务。当线程池中的线程空闲时,它会执行 SynchronousQueue的poll方法,等待SynchronousQueue中新提交的任务。如果超过 60s 没有新任务提交到 SynchronousQueue,则这个空闲线程将终止。因为maximumPoolSize 是无界的,所以如果提交的任务大于线 程池中线程处理任务的速度就会不断地创建新线程。另外,每次提交任务都会立即有线程去处理。所以, CachedThreadPool 比较适于大量的需要立即处理并且耗时较少的任务。
  1. SingleThreadExecutor
    创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }

示例:
ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 5; i++) { MyTask myTask = new MyTask(i); executorService.execute(myTask); } executorService.shutdown(); //正在执行task 0 //task 0执行完毕 //正在执行task 1 //task 1执行完毕 //正在执行task 2 //task 2执行完毕 //正在执行task 3 //task 3执行完毕 //正在执行task 4 //task 4执行完毕

corePoolSize和maximumPoolSize都为1,意味着SingleThreadExecutor只有一个核心线程,其他的参数都 和FixedThreadPool一样。
当执行execute方法时,如果当前运行的线程数未达到核心线程数,也就是当前没有运行的线程,则创 建一个新线程来处理任务。如果当前有运行的线程,则将任务添加到阻塞队列LinkedBlockingQueue中。因 此,SingleThreadExecutor能确保所有的任务在一个线程中按照顺序逐一执行。
  1. ScheduledThreadPool
    ScheduledThreadPool是一个能实现定时和周期性任务的线程池。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } /** * 构造方法 */ public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }

这里创建了ScheduledThreadPoolExecutor,ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,它 主要用于给定延时之后的运行任务或者定期处理任务。
【Java线程池的使用】ScheduledThreadPoolExecutor 的构造方法最终调用的是ThreadPoolExecutor的 构造方法。corePoolSize是传进来的固定数值,maximumPoolSize的值是Integer.MAX_VALUE。因为采用的 DelayedWorkQueue是无界的,所以maximumPoolSize这个参数是无效的。
示例:
ExecutorService executorService = Executors.newScheduledThreadPool(2); for (int i = 0; i < 5; i++) { MyTask myTask = new MyTask(i); executorService.execute(myTask); } executorService.shutdown(); //正在执行task 1 //正在执行task 0 //task 1执行完毕 //task 0执行完毕 //正在执行task 2 //正在执行task 3 //task 2执行完毕 //task 3执行完毕 //正在执行task 4 //task 4执行完毕

当执行 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate 或者 scheduleWithFixedDelay方法时,会向 DelayedWorkQueue 添加一个 实现 RunnableScheduledFuture 接口的ScheduledFutureTask(任务的包装类), 并会检查运行的线程是否达到 corePoolSize。如果没有则新建线程并启动它,但并不是立即去执行任务,而 是去 DelayedWorkQueue 中取ScheduledFutureTask,然后去执行任务。如果运行的线程达到了corePoolSize 时,则将任务添加到DelayedWorkQueue中。DelayedWorkQueue会将任务进行排序,先要执行的任务放在队 列的前面。其跟此前介绍的线程池不同的是,当执行完任务后,会将ScheduledFutureTask中的time变量改为 下次要执行的时间并放回到DelayedWorkQueue中。
参考资料
1.Android进阶之光 刘望舒
2.https://www.cnblogs.com/dolphin0520/p/3932921.html

    推荐阅读