语言|Java线程池

线程池的好处
线程使应用程序能充分利用CPU、内存、网络等资源,然而,频繁的创建与销毁线程会浪费大量的系统资源,增加并发编程的风险。所以需要通过线程池协调多个线程,实现类似主次线程隔离、定期执行、周期执行等任务。线程池的作用包括:
利用线程池管理并复用线程,控制最大并发数
利用线程池实现任务线程队列缓存策略与拒绝机制
实现某些与时间相关的功能,如定期执行、周期执行等
隔离线程环境
Java中线程池是如何创建的?
ThreadPoolExecutor 这个基础类讲起

public ThreadPoolExecutor(int corePoolSize, // 1 int maximumPoolSize,// 2 long keepAliveTime,// 3 TimeUnit unit,// 4 BlockingQueue workQueue, // 5 ThreadFactory threadFactory,// 6 RejectedExecutionHandler handler ) { //7 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

几个基础概念:
corePoolSize核心线程数:核心线程不会被销毁
maximumPoolSize线程池能容纳同时执行的最大线程数
keepAliveTime线程池中线程活跃的最大时间(只针对非核心线程),超过这个时间线程会被销毁,直到剩下corePoolSize个线程为止
TimeUnit:时间单位
workQueue:缓存队列,当线程数目超过核心线程数时用于保存任务的阻塞队列。阻塞队列缓存达到上限后,如果还有新任务需要处理,就创建新的线程,直到线程数=maximumPoolSize
threadFactory线程工厂,用来创建线程。
为了统一在创建线程时设置一些参数,如是否守护线程,线程一些特性等,如优先级。通过这个TreadFactory创建出来的线程能保证有相同的特性。
它是一个接口类,而且方法只有一个,就是创建一个线程。
handler:拒绝策略,默认是AbortPolicy,会抛出异常。
当线程数已经达到maxPoolSize,且队列已满,会拒绝新任务。
当线程池被调用shutdown()后,会等待线程池里的任务执行完毕再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务。
ThreadPoolExecutor类分析
语言|Java线程池
文章图片

ThreadPoolExecutor被初始化好之后便可以提交线程任务,线程的提交方法主要是execute和submit。先看execute,
分三步进行:
1.如果运行的线程少于corePoolSize,尝试启动一个新线程对任务进行处理,addWorker方法的调用会自动检查运行状态和workerCount,在不应该增加新的线程的情况下,返回false。
2.否则将其加入等待执行队列中,如果加入成功,则需要再检查一下是否需要添加新线程或是线程池状态已经发生了变化。
3.如果队列已满,则再尝试添加新的线程。
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //如果线程数小于corePoolSize,则向线程池中添加一个新线程 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //否则将其加入等待执行队列中,如果加入成功,则需要再检查一下是否需要添加新线程或是线程池状态已经发生了变化 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //如果队列已满,则再尝试添加新的线程 else if (!addWorker(command, false)) reject(command); }

【语言|Java线程池】重点分析addWorker方法
private boolean addWorker(Runnable firstTask, boolean core) { retry: // 每次for循环都需要获取最新的ctl值 for (int c = ctl.get(); ; ) { /* * 这个地方的判断为true时可以分成三种情况 * 1、线程池的状态是大于等于 STOP的 这个判断为true * 2、线程池状态是SHUTDOWN 但是 firstTask 是null 对应execute 的 else if (!addWorker(command, false)) 状态大于等于SHUTDOWN时不接受任务 * 3、线程池状态是SHUTDOWN 工作队列已经空了 对应execute的 在状态大于等于SHUTDOWN时用addWorker(null, false) 执行队列里面的任务 */ if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty()) ) return false; for (; ; ) { // 如果core是true使用的核心线程数配置 否则使用maximumPoolSize if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) return false; // 使用CAS的方法给ctl的worker的数量加1 成功则跳出最外层循环 if (compareAndIncrementWorkerCount(c)) break retry; // CAS不成功则重新获取ctl的值 因为不成功ctl的值一定变了 CAS嘛 c = ctl.get(); // 如果不成功的原因是状态变了 就重新进行外层循环 if (runStateAtLeast(c, SHUTDOWN)) continue retry; } }// 能走到这里一定是说明CAS成功了 那么就可以进行创建worker执行任务了 // woker是否执行了start boolean workerStarted = false; // worker 是否添加到workers成功 boolean workerAdded = false; Worker w = null; try { // 构建一个worker thread的target是worker所以调用t.start会执行worker的run方法,最后调用到runWorker方法 w = new Worker(firstTask); // worker 的thread是 线程工厂的newThread方法创建的 final Thread t = w.thread; if (t != null) { // 需要操作workers 加锁执行 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int c = ctl.get(); // 如果线程池状态是running 或者 是shutdown但是firstTask是null(kanexecute方法 防止没有worker执行队列里面的任务) if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { // 如果线程的状态不是NEW说明线程不是经过线程池开启的 抛异常 if (t.getState() != Thread.State.NEW) throw new IllegalThreadStateException(); // 把worker添加到workers里面 workers.add(w); workerAdded = true; // 更新当前最大线程数量 maximumPoolSize 和 corePoolSize可以在线程池创建之后动态修改的 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; } } finally { mainLock.unlock(); } if (workerAdded) { /* * 如果添加成功 就启动这个线程 * 上面线程池状态判断没通过 或者 线程的状态不是NEW 就不会执行 workerAdded = true; * 线程就不会启动 workerStarted 就是false */ t.start(); workerStarted = true; } } } finally { if (! workerStarted) // 如果没有执行过t.start() 就要把这个woker从workers里面剔除并且 ctl里面worker数量减一 addWorkerFailed(w); } return workerStarted; }

    推荐阅读