后端开发|ThreadPoolExecutor源码剖析

一、Java构建线程的方式

  • 继承Thread(也是实现的Runnable接口)
  • 实现Runnable
  • 实现Callable
  • 线程池方式(可以实现以上兼容)
    • Java提供了Executors可以创建(阿里规约不允许直接创建线程池)
    • 推荐手动创建线程池(new ThreadPoolExecutor的方式)
二、线程池的7个参数
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量 int maximumPoolSize,//线程池的最大线程数 long keepAliveTime,//当线程数大于核心线程数时,多余的空 闲线程存活的最长时间 TimeUnit unit,//时间单位 BlockingQueue workQueue,//任务队列,用来储 存等待执行任务的队列 ThreadFactory threadFactory,//线程工厂,用来创建线程, 一般默认即可 RejectedExecutionHandler handler//拒绝策略,当提交的 任务过多而不能及时处理时,我们可以定制策略来处理任务 )

三、线程池的执行流程 后端开发|ThreadPoolExecutor源码剖析
文章图片

为什么要先进行阻塞在去尝试创建非核心线程:
**举例解释:**饭店(线程池)-厨子(线程)-排队(阻塞)、招厨子(创建线程)?-今日客满(拒绝策略)
四、线程池的属性标识 4.1、线程池的核心属性
//ct1:1:声明当前线程池的状态,2:声明当前线程池中的线程数 //高3位:线程池状态,低29位是:线程池中的线程个数 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; //29,方便后面做位运算 private static final int CAPACITY= (1 << COUNT_BITS) - 1; //通过位运算得出最大容量// runState is stored in the high-order bits private static final int RUNNING= -1 << COUNT_BITS; //111 代表正常接受任务 private static final int SHUTDOWN=0 << COUNT_BITS; //000 代表不接受新任务,但还会处理内部阻塞队列中的任务,正在进行的任务也正常处理 private static final int STOP=1 << COUNT_BITS; //001 代表线程池立马停止,不接受新任务,不处理阻塞队列中的任务,同时终端执行中的任务 private static final int TIDYING=2 << COUNT_BITS; //010 代表过渡状态,代表当前线程池即将结束 private static final int TERMINATED =3 << COUNT_BITS; //011 代表线程池为TREMINATED,线程池已经结束// Packing and unpacking ctl private static int runStateOf(int c){ return c & ~CAPACITY; }//得到线程池的状态 private static int workerCountOf(int c){ return c & CAPACITY; }//得到当前线程池的线程数 private static int ctlOf(int rs, int wc) { return rs | wc; }//获取ctl

4.2、线程池的状态变化
【后端开发|ThreadPoolExecutor源码剖析】后端开发|ThreadPoolExecutor源码剖析
文章图片

五、线程池的exectute方法 从execute方法开始(对应4.2的线程变化图):
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task.The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread.If it fails, we know we are shut down or saturated * and so reject the task. */ //拿到32位的int int c = ctl.get(); //工作线程数 < 核心线程数 if (workerCountOf(c) < corePoolSize) { //进入if,代表可以创建 核心 线程数 if (addWorker(command, true)) //在此结束 return; //如果没进入if,代表创建核心线程数失败,重新获取 ctl c = ctl.get(); } //判断线程池为Running状态,将任务添加入阻塞队列 if (isRunning(c) && workQueue.offer(command)) { //在此获取ctl int recheck = ctl.get(); //再次判断是否为Running状态,若不是Running状态,remove任务 if (! isRunning(recheck) && remove(command)) reject(command); //如果线程池在Running状态,线程池数量为0 else if (workerCountOf(recheck) == 0) //阻塞队列有任务,但是没有工作线程,添加一个任务为空的工作线程处理阻塞队列中的任务 addWorker(null, false); } //阻塞队列已满,创建非核心线程,拒绝策略 else if (!addWorker(command, false)) reject(command); }

通过上述源码,线程池的执行流程,同4.2,以下将进行addWorker方法
六、addWorker方法剖析
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (; ; ) { //获取ctl int c = ctl.get(); //获取线程池状态 int rs = runStateOf(c); // Check if queue empty only if necessary. //除了RUNNING的其他状态 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null &&! workQueue.isEmpty()) //rs==SHUTDOWN,如果不是SHUTDOWN,就代表是STOP或者更高的状态,此时不需要添加线程处理任务 //任务为空,,如果任务为null,并且线程池状态不是RUNNING,不需要处理 //阻塞队列不为null,如果阻塞队列为空,返回flase,取反,获取true,不需要处理 ) //创建工作线程失败 return false; for (; ; ) { //获取工作线程个数 int wc = workerCountOf(c); if (wc >= CAPACITY ||//如果当前线程大于线程池最大容量,不创建线程 //判断wc是否超过核心线程或者最大线程数 wc >= (core ? corePoolSize : maximumPoolSize)) return false; //将工作线程数+1,采用CAS的方式 if (compareAndIncrementWorkerCount(c)) //成功退出外侧for循环 break retry; c = ctl.get(); // Re-read ctl //判断线程池状态,如果有变化;如果没变化,走内侧循环 if (runStateOf(c) != rs) //continue外侧循环 continue retry; // else CAS failed due to workerCount change; retry inner loop } }boolean workerStarted = false; boolean workerAdded = false; //Worker为工作线程 Worker w = null; try { w = new Worker(firstTask); //从Worker中获取线程t final Thread t = w.thread; if (t != null) { //获取线程池的全局锁,避免我添加任务时,其他线程干掉了线程池,干掉线程池需要先获取这个锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); //线程状态为RUNNING状态 if (rs < SHUTDOWN || //是SHUTDOWN状态,创建空任务工作线程,先处理阻塞队列中的任务 (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //将工作线程添加到集合(hashSet)中 workers.add(w); //获取工作线程个数 int s = workers.size(); //如果线程工作线程数,大于之前记录的最大线程数,做替换操作 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; //添加工作线程成功 } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); //启动工作线程 workerStarted = true; //启动工作线程成功 } } } finally { //如果启动工作线程失败,调用addWorkerFailed方法 if (! workerStarted) addWorkerFailed(w); } return workerStarted; //返回工作线程启动 }

七、Worker的封装
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in.Null if factory fails. */ final Thread thread; /** Initial task to run.Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }/** Delegates main run loop to outer runWorker*/ public void run() { runWorker(this); }

runWorker方法:
final void runWorker(Worker w) { //获取当前线程 Thread wt = Thread.currentThread(); //拿到任务 Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //任务不为空,执行任务;如果任务为空,通过getTask从阻塞队列中获取任务! while (task != null || (task = getTask()) != null) { w.lock(); //枷锁,避免shutdown,任务也不会中单 // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted.This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //执行任务前的操作 beforeExecute(wt, task); Throwable thrown = null; try { //开始执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //执行任务后的操作 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

可以继续深入getTask方法,以及processWorkerExit方法
未完待续

    推荐阅读