Android日记之线程池

前言 在编程中经常会使用线程来异步处理任务,但是每个线程的创建和销毁都需要一定的开销。如果每次执行一个任务都需要一个新进程去执行,则这些线程的创建和销毁将消耗大量的资源;并且线程都是“各自为政”的,很难对其进行控制,更何况有一堆的线程在执行。这时候就需要线程池来对线程进行管理。在Java 1.5中提供了Executor框架用于把任务的提交和执行解耦。任务的提交交给RUnnable或者Callable,而Executor框架用来处理任务。Executor框架中最核心的成员就是ThreadPoolExecutor,它是线程池的核心实现类。本篇文章就着重讲解ThreadPoolExecutor。


ThreadPoolExecutor介绍 可以通过ThreadPoolExecutor开创建一个线程池,ThreadPoolExecutor类一共有四个构造方法。下面展示的都是拥有最多参数的的构造方法。

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

  • corePoolSize:核心线程数。默认情况下线程池是空的,只有任务提交时才会创建线程。如果当前运行的线程数少于corePoolSize,则会创建新线程来处理任务;如果等于或者多于corePoolSize,则不会创建,如果调用线程池的prestartAllcoreThread()方法,线程池会提前创建并启动所有核心线程来等待任务。
  • maximumPoolSize:线程池允许创建的最大线程数,如果任务队列满了并且线程数小于maximumPoolSize时,则线程池仍旧会创建新的线程来处理任务。
  • keepAliveTime:非核心线程闲置的超时时间,超过这个事件则回收,如果任务很多,并且每个任务的执行事件很短,则可以调用keepAliveTime来提高线程的利用率。另外,如果设置allowCoreThreadTimeOut属性为true时,keepAliveTime也会应用到核心线程上。
  • TimeUnit:keepAliveTime参数的时间单位,可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、秒(SECONDS)、毫秒(MILLOSECONDS)等。
  • BlockingQueue任务队列,如果当前线程数大于corePoolSize,则将任务添加到此任务队列中。该任务队列是BlockiingQueue类型,也就是阻塞队列。
  • ThreadFactory:线程工厂。可以用线程工厂给每个创建出来的线程设置名字。一般情况下无须设置参数。
  • RejectedExecutionHandler :饱和策略,这是当任务队列中和线程池都满了时所采取的对应策略,默认是ABordPolicy,表示无法处理新任务,并抛出RejetctedExecutionException异常。此外还有3种策略,它们分别如下:
(1)CallerRunsPolicy:用调用者所在的线程来处理任务,此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
(2)DiscardPolicy:不能执行的任务,并将该任务删除。
(3)DiscardOldestPolicy:丢弃队列最近的任务,并执行当前的任务。


线程池的处理流程和原理 Android日记之线程池
文章图片
图1,线程池的处理流程 从上图1中可以得知线程的处理流程主要分为3个步骤:
  • 提交任务后,线程池先判断线程数是否达到核心线程数(corePoolSize)。如果未核心线程数,则创建核心线程处理任务;否则就执行下一步操作。
  • 接着线程池判断任务队列是否满了。如果没满,则将任务添加到任务队列中,否则,就执行下一步操作。
  • 接着因为任务队列满了,线程池就会判断线程数是否达到了最大线程数,如果未达到,则创建非核心线程1处理任务;否则,就执行饱和策略,默认会抛出RejectedExecutionException异常。
虽然上面介绍了线程池的处理流程,但还不是很直观。我们结合下面的图2来更好的了解线程池的原理。

Android日记之线程池
文章图片
图2,线程池执行
从图2中可以看到,如果我们执行ThreadPoolExecutor的execute方法,会遇到各种情况:
(1)如果线程池中的线程数未达到核心线程数,则创建核心线程处理任务。
(2)如果线程数大于或者等于核心线程数,则将任务加入任务队列,线程池中的空闲线程会不断地从任务队列中取出任务进行处理。
(3)如果任务队列满了,并且线程数没有达到最大线程数,则创建非核心线程去处理任务。
(4)如果线程数超过了最大线程数,则执行饱和策略。


ThreadPoolExecutor的基本使用

package com.ju.executordemo; import android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.util.Log; import android.view.View; import android.widget.Button; import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class MainActivity extends AppCompatActivity{private Button btnStart; private final int CORE_POOL_SIZE = 4; //核心线程数 private final int MAX_POOL_SIZE = 5; //最大线程数 private final long KEEP_ALIVE_TIME = 10; //空闲线程超时时间 private ThreadPoolExecutor executorPool; private int songIndex = 0; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); initView(); //创建线程池 initExec(); }private void initView() { btnStart = findViewById(R.id.btn_start); btnStart.setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { begin(); } }); }public void begin() { songIndex++; try { executorPool.execute(new WorkerThread("歌曲" + songIndex)); } catch (Exception e) { Log.e("threadtest", "AbortPolicy...已超出规定的线程数量,不能再增加了...."); }// 所有任务已经执行完毕,我们在监听一下相关数据 new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(20 * 1000); } catch (Exception e) {} sout("monitor after"); } }).start(); }private void sout(String msg) { Log.i("threadtest", "monitor " + msg + " CorePoolSize:" + executorPool.getCorePoolSize() + " PoolSize:" + executorPool.getPoolSize() + " MaximumPoolSize:" + executorPool.getMaximumPoolSize() + " ActiveCount:" + executorPool.getActiveCount() + " TaskCount:" + executorPool.getTaskCount() ); }private void initExec() { executorPool = new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingDeque(), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); }class WorkerThread implements Runnable {private String threadName; public WorkerThread (String name){ threadName = name; }@Override public void run() { boolean flag = true; try { while (flag){ String tn= Thread.currentThread().getName(); //模拟耗时操作 Random random = new Random(); long time = (random.nextInt(5) + 1) * 1000; Thread.sleep(time); Log.e("threadtest","线程\"" + tn + "\"耗时了(" + time / 1000 + "秒)下载了第<" + threadName + ">"); //下载完毕跳出循环 flag = false; } }catch (Exception e){ e.printStackTrace(); } } }}


Android日记之线程池
文章图片
图3,运行结果
上述代码模拟一个下载音乐的例子来演示ThreadPoolExecutor的基本使用,启动ThreadPoolExecutor的函数是 execute()方法,然后他需要一个Runnable的参数来进行启动。


ThreadPoolExecutor的其它种类 通过直接或者间接地配置ThreadPoolExecutor的参数可以创建不同类型的ThreadPoolExecutor,其中有 4 种线程池比较常用,它们分别是 FixedThreadPool、CachedThreadPool、SingleThreadExecutor和 ScheduledThreadPool。下面分别介绍这4种线程池。
  • FixedThreadPool
FixedThreadPool 是可重用固定线程数的线程池。在 Executors 类中提供了创建FixedThreadPool的方法, 如下所示:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }

FixedThreadPool的corePoolSize和maximumPoolSize都设置为创建FixedThreadPool指定的参数nThreads,也就意味着FixedThreadPool只有核心线程,并且数量是固定的,没有非核心线程。keepAliveTime设置为0L 意味着多余的线程会被立即终止。因为不会产生多余的线程,所以keepAliveTime是无效的参数。另外,任 务队列采用了无界的阻塞队列LinkedBlockingQueue。FixedThreadPool的execute方法的执行示意图如图4所 示。

Android日记之线程池
文章图片
图4,FixedThreadPool流程图
当执行 execute()方法时,如果当前运行的线程未达到corePoolSize(核心线程数)时 就创建核心线程来处理任务,如果达到了核心线程数则将任务添加到LinkedBlockingQueue中。 FixedThreadPool就是一个有固定数量核心线程的线程池,并且这些核心线程不会被回收。当线程数超过corePoolSize时,就将任务存储在任务队列中;当线程池有空闲线程时,则从任务队列中去取任务执行


  • CachedThreadPool
CachedThreadPool是一个根据需要创建线程的线程池,创建CachedThreadPool的代码如下所示:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }

CachedThreadPool的corePoolSize为0,maximumPoolSize设置为Integer.MAX_VALUE,这意味着 CachedThreadPool没有核心线程,非核心线程是无界的。keepAliveTime设置为60L,则空闲线程等待新任务 的最长时间为 60s。在此用了阻塞队列 SynchronousQueue,它是一个不存储元素的阻塞队列,每个插入操作 必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。CachedThreadPool 的execute方法的执行示意图如图5所示。

Android日记之线程池
文章图片
图5,CachedThreadPool流程图
当执行 execute()方法时,首先会执行SynchronousQueue的 offer()方法来提交任务,并且查询线程池中是否有空闲的线程执行SynchronousQueue的 poll()方法来移除任务。如果有则配对成功,将任务交给这个空闲的线程处理;如果没有则配对失败,创建新的线程去处理任务。当线程池中的线程空闲时,它会执行 SynchronousQueue的 poll()方法,等待SynchronousQueue中新提交的任务。如果超过 60s 没有新任务提交到 SynchronousQueue,则这个空闲线程将终止。因为maximumPoolSize 是无界的,所以如果提交的任务大于线 程池中线程处理任务的速度就会不断地创建新线程。另外,每次提交任务都会立即有线程去处理。所以,CachedThreadPool比较适于大量的需要立即处理并且耗时较少的任务。


  • SingleThreadExecutor
SingleThreadExecutor是使用单个工作线程的线程池,其创建源码如下所示:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }

corePoolSize和maximumPoolSize都为1,意味着SingleThreadExecutor只有一个核心线程,其他的参数都 和FixedThreadPool一样,这里就不赘述了。SingleThreadExecutor的execute()方法的执行示意图如图5所示。

Android日记之线程池
文章图片
图5,SingleThreadExecutor流程图
当执行 execute()方法时,如果当前运行的线程数未达到核心线程数,也就是当前没有运行的线程,则创建一个新线程来处理任务。如果当前有运行的线程,则将任务添加到阻塞队列LinkedBlockingQueue中。因此,SingleThreadExecutor能确保所有的任务在一个线程中按照顺序逐一执行。


  • ScheduledThreadPool
ScheduledThreadPool是一个能实现定时和周期性任务的线程池,它的创建源码如下所示:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }

这里创建了ScheduledThreadPoolExecutor,ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,它主要用于给定延时之后的运行任务或者定期处理任务。ScheduledThreadPoolExecutor 的构造方法如下所示:
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); }

从上面的代码可以看出,ScheduledThreadPoolExecutor 的构造方法最终调用的是ThreadPoolExecutor的 构造方法。corePoolSize是传进来的固定数值,maximumPoolSize的值是Integer.MAX_VALUE。因为采用的 DelayedWorkQueue是无界的,所以maximumPoolSize这个参数是无效的。ScheduledThreadPoolExecutor的 execute方法的执行示意图如图6所示。

Android日记之线程池
文章图片
图6,ScheduledThreadPoolExecutor流程图
当执行ScheduledThreadPoolExecutor的 scheduleAtFixedRate()或者 scheduleWithFixedDelay()方法时,会向DelayedWorkQueue添加一个 实现RunnableScheduledFuture接口的ScheduledFutureTask(任务的包装类),并会检查运行的线程是否达到corePoolSize。如果没有则新建线程并启动它,但并不是立即去执行任务,而是去DelayedWorkQueue中取ScheduledFutureTask,然后去执行任务。如果运行的线程达到了corePoolSize时,则将任务添加到DelayedWorkQueue中。DelayedWorkQueue会将任务进行排序,先要执行的任务放在队列的前面。其跟此前介绍的线程池不同的是,当执行完任务后,会将ScheduledFutureTask中time变量改为下次要执行的时间并放回到DelayedWorkQueue中。

【Android日记之线程池】
参考
  • [刘望舒]Android进阶之光
  • Android线程池(一)简单使用

    推荐阅读