[第三篇]深入学习线程池之优雅的关闭线程池

通过 《深入学习线程池之线程池简介及工作原理》、《深入学习线程池之通过ThreadPoolExecutor创建线程池及工作原理》 两篇文章,相信大家已经了解怎么去创建一个线程池,并对线程池的工作原理有了认识,但你知道如何去关闭线程池么?直接调用shutdown()方法为什么关不掉线程池呢?shutdownNow()和shutdown()有什么区别?下面我们以ThreadPoolExecutor为例,来介绍下如何优雅的关闭线程池。在介绍线程池关闭之前,先介绍下线程中断。
一、线程中断
在程序中,我们不能随便中断一个线程,因为这是极其不安全的操作,我们无法知道这个线程正运行在什么状态,它可能持有某把锁,强行中断可能导致锁不能释放的问题;或者线程可能在操作数据库,强行中断导致数据不一致,从而混乱的问题。正因此,Java里将Thread的stop法?设置为过时,以禁止大家使用。
一个线程什么时候可以退出呢?当然只有线程自己才能知道。
所以我们这里要说的Thread的interrrupt方法,本质不是用来中断一个线程,而是将线程设置一个中断状态。当我们调用线程的interrupt方法,它有两个作用:
1、如果此线程处于阻塞状态(比如调用了wait方法,io等待),则会立刻退出阻塞,并抛出InterruptedException异常,线程就可以通过捕获InterruptedException来做一定的处理,然后让线程退出。
2、如果此线程正处于运行之中,则线程不受任何影响,继续运行,仅仅是线程的中断标记被设置为true。所以线程要在适当的位置通过调用isInterrupted方法来查看自己是否被中断,并做退出操作。
注:如果线程的interrupt方法先被调用,然后线程调用阻塞方法进入阻塞状态,InterruptedException异常依旧会抛出。如果线程捕获InterruptedException异常后,继续调用阻塞方法, 将不再触发InterruptedException异常。
二、线程池的两种关闭方式
线程池提供了两个关闭方法:shuwdown()shutdownNow() 方法。我们都知道这两个方法的处理逻辑,如下:
shutdown()方法处理逻辑是: 线程池不再接收新提交的任务,同时等待线程池?的任务执行完毕后关闭线程池。
【[第三篇]深入学习线程池之优雅的关闭线程池】shutdownNow()方法处理逻辑是: 线程池不再接收新提交的任务,同时立刻关闭线程池,线程池里的任务不再执行,并返回待所有未处理的线程list列表。
但是,调用shutdown()方法后,为什么正在执?任务的线程会继续执行完任务而不是立即停止?调用完shutdown() 或者 shutdownNow()方法后,线程池会立即关闭么?线程在什么情况下才会彻底退出?
如果不了解这些细节,在关闭线程池时就难免遇到,“线程池关闭不了”,“关闭线程池出现报错” 等情况。下面就结合线程池源码,分别说说这两个线程池关闭方法的一些实现细节。
1. 线程池中执行任务的方法runWorker()

/** * Main worker run loop.Repeatedly gets tasks from queue and * executes them, while coping with a number of issues: * * 1. We may start out with an initial task, in which case we * don't need to get the first one. Otherwise, as long as pool is * running, we get tasks from getTask. If it returns null then the * worker exits due to changed pool state or configuration * parameters.Other exits result from exception throws in * external code, in which case completedAbruptly holds, which * usually leads processWorkerExit to replace this thread. * * 2. Before running any task, the lock is acquired to prevent * other pool interrupts while the task is executing, and then we * ensure that unless pool is stopping, this thread does not have * its interrupt set. * * 3. Each task run is preceded by a call to beforeExecute, which * might throw an exception, in which case we cause thread to die * (breaking loop with completedAbruptly true) without processing * the task. * * 4. Assuming beforeExecute completes normally, we run the task, * gathering any of its thrown exceptions to send to afterExecute. * We separately handle RuntimeException, Error (both of which the * specs guarantee that we trap) and arbitrary Throwables. * Because we cannot rethrow Throwables within Runnable.run, we * wrap them within Errors on the way out (to the thread's * UncaughtExceptionHandler).Any thrown exception also * conservatively causes thread to die. * * 5. After task.run completes, we call afterExecute, which may * also throw an exception, which will also cause thread to * die. According to JLS Sec 14.20, this exception is the one that * will be in effect even if task.run throws. * * The net effect of the exception mechanics is that afterExecute * and the thread's UncaughtExceptionHandler have as accurate * information as we can provide about any problems encountered by * user code. * * @param w the worker */ final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted.This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

正常情况下,线程池里的线程,就是在这个while循环里不停地执行。其中代码task.run()就是在执行我们提交给线程池的任务,如下:
threadpool.execute(new Runnable() { @Override public void run() { // todo 具体的业务逻辑 } });

从runWorker()方法看得出来,如果getTask()方法返回null,会导致线程的退出。我们再来看看getTask()方法的实现:
/** * Performs blocking or timed wait for a task, depending on * current configuration settings, or returns null if this worker * must exit because of any of: * 1. There are more than maximumPoolSize workers (due to *a call to setMaximumPoolSize). * 2. The pool is stopped. * 3. The pool is shutdown and the queue is empty. * 4. This worker timed out waiting for a task, and timed-out *workers are subject to termination (that is, *{@code allowCoreThreadTimeOut || workerCount > corePoolSize}) *both before and after the timed wait, and if the queue is *non-empty, this worker is not the last thread in the pool. * * @return task, or null if the worker must exit, in which case *workerCount is decremented */ private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out?for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; }int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; }try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }

2. shutdown()方法 当我们调用shutdown()方法时,源码如下,我们看到,它先将线程池的状态修改为SHUTDOWN状态,然后调用interruptIdleWorkers()方法,来中断空闲的线程,为什么是空闲线程呢?
在上边runWorker方法的代码中,我们看到获取任务之后第一步是进行加锁操作,即,w.lock()。而,shutdown()方法调用的interruptIdleWorkers方法,会尝试进行w.tryLock()加锁操作,换言之,在runWorker方法中w.lockw.unlock之间的线程将因为加锁成功,就会导致interruptIdleWorkers方法的w.tryLock() 加锁失败,进而不会被调用interrupt方法,也就是说正在执行线程池里任务的线程不会被中断。
/** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. * Invocation has no additional effect if already shut down. * * This method does not wait for previously submitted tasks to * complete execution.Use {@link #awaitTermination awaitTermination} * to do that. * * @throws SecurityException {@inheritDoc} */ public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); // ① 将线程池状态置为SHUTDOWN interruptIdleWorkers(); // ② 停用线程池中的线程 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }

/** * Interrupts threads that might be waiting for tasks (as * indicated by not being locked) so they can check for * termination or configuration changes. Ignores * SecurityExceptions (in which case some threads may remain * uninterrupted). * * @param onlyOne If true, interrupt at most one worker. This is * called only from tryTerminate when termination is otherwise * enabled but there are still other workers.In this case, at * most one waiting worker is interrupted to propagate shutdown * signals in case all threads are currently waiting. * Interrupting any arbitrary thread ensures that newly arriving * workers since shutdown began will also eventually exit. * To guarantee eventual termination, it suffices to always * interrupt only one idle worker, but shutdown() interrupts all * idle workers so that redundant workers exit promptly, not * waiting for a straggler task to finish. */ private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) {// 线程没有被中断且worker获取到锁的时候才处理 try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }/** * Common form of interruptIdleWorkers, to avoid having to * remember what the boolean argument means. */ private void interruptIdleWorkers() { interruptIdleWorkers(false); }

3. shutdownNow()方法 当我们调用shutdownNow()方法时,源码如下,我们看到,它先将线程池的状态修改为STOP状态,然后调用interruptWorkers()方法,遍历中断线程,最后返回未执行的任务的线程list。
在runWorker方法中,代码task.run()就是在执行我们提交给线程池的任务,当我们调用shutdownNow方法时,task.run()里面正处于IO阻塞,即,我们提交任务的逻辑,涉及到IO阻塞,则会导致报错,如果task.run()里正在正常执行,则不受影响,继续执行完这个任务。
/** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks * that were awaiting execution. These tasks are drained (removed) * from the task queue upon return from this method. * * This method does not wait for actively executing tasks to * terminate.Use {@link #awaitTermination awaitTermination} to * do that. * * There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks.This implementation * cancels tasks via {@link Thread#interrupt}, so any task that * fails to respond to interrupts may never terminate. * * @throws SecurityException {@inheritDoc} */ public List shutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }

/** * Interrupts all threads, even if active. Ignores SecurityExceptions * (in which case some threads may remain uninterrupted). */ private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } }

总结
一、当我们调用线程池的shutdownNow方法时,会将线程池状态修改为STOP,当执行runWorker方法中while (task != null || (task = getTask()) != null)时,在getTask方法中,由于STOP状态值是大于SHUTDOWN状态,STOP也大于等于STOP,所以不管任务队列是否为空,都会进入if语句,即,
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; }

从而返回null,导致 (task = getTask()) != null条件不成立,进而执行线程退出。
二、当我们调用线程池的shuwdown方法时,会将线程池状态修改为SHUTDOWN,当执行runWorker方法中while (task != null || (task = getTask()) != null)时,在getTask方法中,SHUTDOWN大于等于SHUTDOWN成立没问题,但是SHUTDOWN不大于等于STOP状态,所以只有队列为空,getTask方法才会返回null,导致线程退出。如果线程正在执行线程池里的任务,即便任务处于阻塞状态,线程也不会被中断,而是继续执行。如果线程池阻塞等待从队列里读取任务,则会被唤醒,但是会继续判断队列是否为空,若不为空,则会继续从队列里读取任务,若为空则线程退出。
优雅的关闭线程池
使用shutdownNow?法,可能会引起报错,使用shutdown方法可能会导致线程关闭不了。
所以当我们使用shutdownNow?法关闭线程池时,一定要对任务里进行异常捕获。即,在我们提交的任务里有try{}catch{}处理
当我们使用shuwdown方法关闭线程池时,一定要确保任务里不会有永久阻塞等待的逻辑,否则线程池就关闭不了。
最后,一定要记得shutdownNow和shuwdown调用完,线程池并不是立刻就关闭了,要想等待线程池关闭,还需调用awaitTermination方法来阻塞等待。

    推荐阅读