java开发之ThreadPoolExecutor源码分析

线程池的状态
只有了解线程池的几个状态,才能读懂它的核心源码。所以先说说这几个状态
running:为线程池初始化时的默认状态,此状态会接收任务进行处理
shutdown: 该状态下的线程池不接收任何任务,但会等待正在运行的任务执行完。通常调用shutdown() 方法完成设置
stop: 该状态的线程池不接收任何任务,同时java培训不会等待正在运行的任务执行完毕。通常调用shutdownNow() 方法完成设置
tidying:该状态下的线程池内,没有任何线程和任务
terminated:该状态为线程池的终态,通常调用tryTerminate()方法完成设置
大多数情况下线程池的一个生命周期流转大概是 running -> (shutdown,stop)-> tidying -> terminated
这几个状态在ThreadPoolExecutor源码中,通过一个ctl的整型原子变量标识,高3位标识线程状态,低29位标识线程数量。翻看源码就能看到
java开发之ThreadPoolExecutor源码分析
文章图片

核心源码分析
? execute(Runnable command)
为线程池的核心方法,调用该方法任务就会执行,直接看下面代码注释吧
public void execute(Runnable command) {

if (command == null) throw new NullPointerException(); int c = ctl.get(); //获取ctl原子变量//如果当前线程池的线程数量小于corePoolSize,添加Worker对象。Worker对象是什么后面说 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; //返回,结束 c = ctl.get(); }// 如果当前线程池的线程数量 > corePoolSize // 且当前线程是否处于running ,则添加任务到队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 二次检查,当前线程不是处于running,则移除任务 if (! isRunning(recheck) && remove(command)) // 执行拒绝策略 reject(command); //线程数量等于零,那就在添加Worker对象呗 else if (workerCountOf(recheck) == 0) addWorker(null, false); }// 如果任务队列满,则添加Worker对象,如果添加失败执行拒绝策略 else if (!addWorker(command, false)) reject(command); }

以上为核心源码的分析,无非就是根据线程池情况添加Worker、任务入队、执行拒绝策略。可以看看下面这个流程图,可能会更清晰
java开发之ThreadPoolExecutor源码分析
文章图片

到这里,我们可以来讲讲addWorker 了。这个方法会封装成一个Worker对象,然后运行任务。看看Worker对象的类图:
java开发之ThreadPoolExecutor源码分析
文章图片

Worker实现Runnable接口、继承AbstractQueuedSynchronizer,持有一个Thread的成员变量。所以可以把Worker对象看成一个线程,同时拥有AbstractQueuedSynchronizer的属性和方法,因此它能够进行加锁和释放锁的操作。
ok,逐步跟进来看看addWorker方法里面的逻辑。
private boolean addWorker(Runnable firstTask, boolean core) {
retry: for (; ; ) { int c = ctl.get(); //当前线程池状态 int rs = runStateOf(c); // 如果当前线程池状态不合法就不让添加 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (; ; ) { //获取当前线程数量 int wc = workerCountOf(c); // 如果wc 大于ctl所能表示的最大线程数或者大于最大线程数则不让添加 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 通过CAS操作,增加线程池中的Worker数。如果添加成功结束双层循环 if (compareAndIncrementWorkerCount(c)) break retry; //如果CAS操作失败,内层循环继续执行 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } }boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //创建Worker对象,传入任务 w = new Worker(firstTask); // 获取Worker对象的线程变量 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; //加mainLock锁,防并发 mainLock.lock(); try { //当前线程池状态 int rs = runStateOf(ctl.get()); // 如果Worker对象的线程状态不合法,抛异常 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); // 如果合法添加到workers集合 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; // 一个变量标识,标明workers集合是否有添加新的worker对象 workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //启动线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }

整体还不算复杂,核心就是根据传入的任务创建一个Worker对象,然后启动Worker。
下面来看看Worker启动的逻辑,前面说过了Worker实现Runnable接口,所以启动将会触发执行run方法,而run方法最终调的是runWorker()方法。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //死循环获取任务,然后执行任务。这里getTask()方法会有阻塞情况的,我们这里知道一下就行,下面马上讲。 while (task != null || (task = getTask()) != null) { //获取w锁。前面说过了,Worker对象继承AbstractQueuedSynchronizer,所以本身就内置了一把锁 w.lock(); // 判断同一个时刻当前线程和线程池的状态是否合法,不合法结束呗 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //任务执行前的处理逻辑 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //任务执行后的处理逻辑 afterExecute(task, thrown); } } finally { task = null; //当前Worker完成的任务数量 w.completedTasks++; //释放w锁 w.unlock(); } } completedAbruptly = false; } finally { //处理Worker退出的逻辑 processWorkerExit(w, completedAbruptly); } }

整个方法的逻辑其实也不算复杂,就是当前Worker不断死循环获取队列里面是否有任务。有,就加锁然后执行任务。无,就阻塞等待获取任务。那什么情况下才会跳出整个死循环,执行processWorkerExit呢?这里就需要看下getTask() 方法逻辑了。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); // 判断线程池状态和任务队列的情况,不满足条件直接返回 null,结束。 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; }int wc = workerCountOf(c); // 超时时间的标识,[是否设置了核心线程数的超时时间 或者 当前线程数量是否大于核心线程数 ],

//因为我们知道线程池运行的线程数量如果大于核心线程数,多出来的那部分线程是需要被回收的。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; }try { // 如果timed为false,则一直阻塞等待,直到获取到元素,然后返回 // 如果timed为true,则一直阻塞等待keepAliveTime超时后返回, //到这里其实就知道如何结束runWorker方法的那个死循环了,也就意味着Worker它的线程生命周期结束了。 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }

最后,来看下processWorkerExit() 方法处理了哪些逻辑
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; //获取mainLock锁 mainLock.lock(); try { //添加任务数量,然后移除worker completedTaskCount += w.completedTasks; workers.remove(w); } finally { // 释放mainLock锁 mainLock.unlock(); } //尝试将线程池状态设置为 terminate tryTerminate(); //主要判断当前线程池的线程数是否小于corePoolSize,如果小于继续添加Worker对象 int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }

这个方法主要就是移除Worker对象,然后尝试将线程池的状态更改为terminate。这里需要讲一下tryTerminate方法逻辑,因为它和线程池awaitTermination()方法有一定的关联,来看看它的代码。
final void tryTerminate() { for (; ; ) { int c = ctl.get(); //判断线程池状态,还在运行或者已经是 terminate的状态直接结束了 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 就是中断空闲的Worker,后面讲shutDown方法的时候聊 if (workerCountOf(c) != 0) { interruptIdleWorkers(ONLY_ONE); return; }final ReentrantLock mainLock = this.mainLock; //获取mainLock锁 mainLock.lock(); try { //线程池设置成TIDYING状态 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { //钩子方法,线程池终止时执行的逻辑 terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); // termination为mainLock锁的condition实例,这个是来实现线程之间的通信。 //其实这里是来唤醒awaitTermination()方法,后面分析awaitTermination源码会看到。 termination.signalAll(); } return; } } finally { // 释放锁 mainLock.unlock(); }} }

到这里,线程池execute方法大致的逻辑就完了。可以再看看时序图,理清下几个方法和类之间的调用。
java开发之ThreadPoolExecutor源码分析
文章图片

? shutdown()
中断线程池的线程,会等待正在执行的线程结束执行,来看看源码它是怎么实现的
public void shutdown() { final ReentrantLock mainLock = this.mainLock; //获取mainLock锁,防止其他线程执行 mainLock.lock(); try { //检查权限,确保用户线程有关闭线程池的权限 checkShutdownAccess(); //通过CAS将线程池状态设置成 SHUTDOWN advanceRunState(SHUTDOWN); //中断所有空闲的Workers , 下面分析这个方法 interruptIdleWorkers(); //钩子方法,让子类进行收尾的逻辑 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { // 释放mainLock锁 mainLock.unlock(); } //execute方法,我们分析过了,主要就是尝试将线程池的状态设置为terminate tryTerminate(); }

【java开发之ThreadPoolExecutor源码分析】该方法我们比较关注的点是 interruptIdleWorkers方法,是怎样中断空闲Worker,然后是如何保证Worker执行完毕的?看看代码就知道了
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; //获取mainLock锁 mainLock.lock(); try { //轮询workers逐一中断 for (Worker w : workers) { Thread t = w.thread; //判断 如果当前线程未中断且能够获取w锁,则执行中断 // 如果当前线程未中断但不能获取w锁,不进行中断。 //这里的w锁,就是前面在分析execute时,有个死循环不断取任务,取到任务就会获取w锁。 //所以这边如果获取不到w锁,就证明还有任务没有执行完。 if (!t.isInterrupted() && w.tryLock()) { try { //中断线程 t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }

到这里,核心逻辑就是通过w这个锁来完成的。
? shutdownNow
public List shutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } }

源码和shutdown差不多,只不过将线程池状态设置为stop,然后调用interruptWorkers 方法,看看worker方法。
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } }

代码中并没有获取w锁的逻辑,所以这个方法会直接中断所有线程,并不会等待那些正在执行任务的worker把任务执行完。
? awaitTermination
调用awaitTermination方法会一直阻塞等待线程池状态变为 terminated 才返回 或者等待超时返回。来看看代码就明白了
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (; ; ) { //如果已经是terminated状态直接返回 if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; // (1)等待mainLock锁的condition实例来唤醒,不然持续阻塞。 nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }

(1)处的代码已经告诉了该方法什么时候返回,就是mainLock锁的termination条件变量被唤醒返回。在上面分析中termination条件变量被唤醒是在执行tryTerminate()时完成的,因为内部调用termination.signalAll()。而tryTerminate() 方法被shutDown() 和shutDownNow() 调用过,所以如果要让awaitTermination 返回,调用这2个方法就行。

    推荐阅读