超详细讲解Java线程池
目录
- 池化技术
- 池化思想介绍
- 池化技术的应用
- 如何设计一个线程池
- Java线程池解析
- ThreadPoolExecutor使用介绍
- 内置线程池使用
- ThreadPoolExecutor解析
- 整体设计
- 线程池生命周期
- 任务管理解析
- woker对象
- Java线程池实践建议
- 不建议使用Exectuors
- 线程池大小设置
- 线程池监控
带着问题阅读
1、什么是池化,池化能带来什么好处
2、如何设计一个资源池
3、Java的线程池如何使用,Java提供了哪些内置线程池
4、线程池使用有哪些注意事项
池化技术
池化思想介绍
池化思想是将重量级资源预先准备好,在使用时可重复使用这些预先准备好的资源。
池化思想的核心概念有:
- 资源创建/销毁开销大
- 提前创建,集中管理
- 重复利用,资源可回收
池化技术的应用
常见的池化技术应用有:资源池、连接池、线程池等。
- 资源池
- 连接池
- 线程池
如何设计一个线程池
设计一个线程池,至少需要提供的核心能力有:
- 线程池容器:用于容纳初始化时预先创建的线程。
- 线程状态管理:管理池内线程的生命周期,记录每个线程当前的可服务状态。
- 线程请求管理:对调用端提供获取和归还线程的接口。
- 线程耗尽策略:提供策略以处理线程耗尽问题,如拒绝服务、扩容线程池、排队等待等。
Java线程池解析
ThreadPoolExecutor使用介绍
大象装冰箱总共分几步
// 1.创建线程池ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<>()); // 2.提交任务threadPool.execute(new Runnable() {@Overridepublic void run() {System.out.println("task running"); }}}); // 3.关闭线程池threadPool.shutDown();
Java通过
ThreadPoolExecutor
提供线程池的实现,如示例代码,初始化一个容量为1的线程池、然后提交任务、最后关闭线程池。ThreadPoolExecutor
的核心方法主要有- 构造函数:
ThreadPoolExecutor
提供了多个构造函数,以下对基础构造函数进行说明。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueueworkQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
- corePoolSize:线程池的核心线程数。池内线程数小于
corePoolSize
时,线程池会创建新线程执行任务。 - maximumPoolSize:线程池的最大线程数。池内线程数大于
corePoolSize
且workQueue
任务等待队列已满时,线程池会创建新线程执行队列中的任务,直到线程数达到maximumPoolSize
为止。 - keepAliveTime:非核心线程的存活时长。池内超过
corePoolSize
数量的线程可存活的时长。 - unit:非核心线程存活时长单位。与
keepAliveTime
取值配合,如示例代码表示1分钟。 - workQueue:任务提交队列。当无空闲核心线程时,存储待执行任务。
类型 | 作用 |
---|---|
ArrayBlockingQueue | 数组结构的有界阻塞队列 |
LinkedBlockingQueue | 链表结构的阻塞队列,可设定是否有界 |
SynchronousQueue | 不存储元素的阻塞队列,直接将任务提交给线程池执行 |
PriorityBlockingQueue | 支持优先级的无界阻塞队列 |
DelayQueue | 支持延时执行的无界阻塞队列 |
- threadFactory:线程工厂。用于创建线程对象。
- handler:拒绝策略。线程池线程数量达到
maximumPoolSize
且workQueue
已满时的处理策略。
类型 | 作用 |
---|---|
AbortPolicy | 拒绝并抛出异常。默认 |
CallerRunsPolicy | 由提交任务的线程执行任务 |
DiscardOldestPolicy | 抛弃队列头部任务 |
DiscardPolicy | 抛弃该任务 |
- 执行函数:
execute
和submit
,主要分别用于执行Runnable
和Callable
。
// 提交Runnablevoid execute(Runnable command); // 提交Callable并返回FutureFuture submit(Callable task); // 提交Runnable,执行结束后Future.get会返回result Future submit(Runnable task, T result); // 提交Runnable,执行结束后Future.get会返回nullFuture> submit(Runnable task);
- 停止函数:
shutDown
和shutDownNow
。
// 不再接收新任务,等待剩余任务执行完毕后停止线程池void shutdown(); // 不再接收新任务,并尝试中断执行中的任务,返回还在等待队列中的任务列表ListshutdownNow();
内置线程池使用
To be useful across a wide range of contexts, this class provieds many adjustable parameters and extensibility hooks. However, programmers are urged to use the more convenient {@link Executors} factory methods {@link Executors#newCachedThreadPool} (unbounded thread poll, with automatic thread reclamation), {@link Executors#newFixedThreadPool} (fixed size thread pool) and {@link Executors#newSingleThreadExecutor}(single background thread), that preconfigure settings for the most common usage scenarios.由于
ThreadPoolExecutor
参数复杂,Java提供了三种内置线程池newCachedThreadPool
、newFixedThreadPool
和newSingleThreadExecutor
应对大多数场景。Executors.newCachedThreadPool()
无界线程池,核心线程池大小为0,最大为Integer.MAX_VALUE
,因此严格来讲并不算无界。采用SynchronousQueue
作workQueue
,意味着任务不会被阻塞保存在队列,而是直接递交到线程池,如线程池无可用线程,则创建新线程执行。
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
Executors.newFixedThreadPool(int nThreads)
固定大小线程池,其中coreSize
和maxSize
相等,且过期时间为0,表示经过一定数量任务提交后,线程池将始终维持在nThreads
数量大小,不会新增也不会回收线程。
public static ExecutorService new FixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue()); }
Executors.newSingleThreadExecutor()
单线程池,参数与fixedThreadPool
类似,只是将数量限制在1,单线程池主要避免重复创建销毁线程对象,也可用于串行化执行任务。不同与其他线程池,单线程池采用FinallizableDelegatedExecutorService
对ThreadPoolExecutor
对象进行包装,感兴趣的同学可以看下源码,其方法实现仅仅是对被包装对象方法的直接调用。包装对象主要用于避免用户将线程池强制转换为ThreadPoolExecutor
来修改线程池大小。
public static ExecutorService newSingleThreadExecutor() {return new FinallizableDelegatedExecutorService((new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockQueue()))); }
ThreadPoolExecutor解析
整体设计
文章图片
ThreadPoolExecutor
基于ExecutorService
接口实现提交任务,未采取常规资源池获取/归还资源的形式,整个线程池和线程的生命周期都由ThreadPoolExecutor
进行管理,线程对象不对外暴露;ThreadPoolExecutor
的任务管理机制类似于生产者消费者模型,其内部维护一个任务队列和消费者,一般情况下,任务被提交到队列中,消费线程从队列中拉取任务并将其执行。线程池生命周期
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY= (1 << COUNT_BITS) - 1; private static int runStateOf(int c){ return c & ~CAPACITY; } //计算当前运行状态private static int workerCountOf(int c){ return c & CAPACITY; }//计算当前线程数量private static int ctlOf(int rs, int wc) { return rs | wc; }//通过状态和线程数生成ctl
TreadPoolExecutor
通过ctl
维护线程池的状态和线程数量,其中高3位存储运行状态,低29位存储线程数量。线程池设定了
RUNNING
、SHUTDOWN
、STOP
、TIDYING
和TERMINATED
五种状态,其转移图如下:文章图片
在这5种状态中,只有
RUNNING
时线程池可接收新任务,其余4种状态在调用shutDown
或shutDownNow
后触发转换,且在这4种状态时,线程池均不再接收新任务。任务管理解析
// 用于存放提交任务的队列private final BlockingQueueworkQueue; // 用于保存池内的工作线程,Java将Thread包装成Worker存储private final HashSet workers = new HashSet ();
ThreadPoolExecutor
主要通过workQueue
和workers
两个字段用于管理和执行任务。文章图片
线程池任务执行流程如图,结合
ThreadPoolExecutor.execute
源码,对任务执行流程进行说明:- 当任务提交到线程池时,如果当前线程数量小于核心线程数,则会将为该任务直接创建一个
worker
并将任务交由worker
执行。
if (workerCountOf(c) < corePoolSize) {// 创建新worker执行任务,true表示核心线程if (addWorker(command, true))return; c = ctl.get(); }
- 当已经达到核心线程数后,任务会提交到队列保存;
// 放入workQueue队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get(); // 这里采用double check再次检测线程池状态if (! isRunning(recheck) && remove(command))reject(command); // 避免加入队列后,所有worker都已被回收无可用线程else if (workerCountOf(recheck) == 0)addWorker(null, false); }
- 如果队列已满,则依据最大线程数量创建新
worker
执行。如果新增worker
失败,则依据设定策略拒绝任务。
// 接上,放入队列失败// 添加新worker执行任务,false表示非核心线程else if (!addWorker(command, false))// 如添加失败,执行拒绝策略reject(command);
woker对象
ThreadPoolExecutor
没有直接使用Thread
记录线程,而是定义了worker
用于包装线程对象。private final class Worker extends AbstractQueuedSynchronizer implements Runnable {...final Thread thread; Runnable firstTask; Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }// worker对象被创建后就会执行public void run() {runWorker(this); }}
worker
对象通过addWorker
方法创建,一般会为其指定一个初始任务firstTask
,当worker
执行完毕以后,worker
会从阻塞队列中读取任务,如果没有任务,则该worker
会陷入阻塞状态给出worker
的核心逻辑代码:private boolean addWorker(Runnable firstTask, boolean core) {...// 指定firstTask,可能为nullw = new Worker(firstTask); ...if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException(); workers.add(w); workerAdded = true; }...// 执行新添加的workerif (workerAdded) {t.start(); workerStarted = true; }}final void runWorker(Worker w) {// 等待workQueue的任务while (task != null || (task = getTask()) != null) {...}}private Runnable getTask() {...for (; ; ) {...// 如果是普通工作线程,则根据线程存活时间读取阻塞队列// 如果是核心工作线程,则直接陷入阻塞状态,等待workQueue获取任务Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take(); ...}}
如下图,任务提交后触发
addWorker
创建worker
对象,该对象执行任务完毕后,则循环获取队列中任务等待执行。文章图片
Java线程池实践建议
不建议使用Exectuors
线程池不允许使用
Executors
去创建,而是通过ThreadPoolExecutor
的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。《阿里巴巴开发手册》虽然Java推荐开发者直接使用
Executors
提供的线程池,但实际开发中通常不使用。主要考虑问题有:- 潜在的OOM问题
CachedThreadPool
将最大数量设置为Integer.MAX_VALUE
,如果一直提交任务,可能造成Thread
对象过多引起OOM
。FixedThreadPool
和SingleThreadPoo
的队列LinkedBlockingQueue
无容量限制,阻塞任务过多也可能造成OOM
。- 线程问题定位不便
ThreadFactory
,线程名称默认为pool-poolNumber-thread-thredNumber
,线程出现问题后不便定位具体线程池。- 线程池分散
线程池大小设置
通常按照IO繁忙型和CPU繁忙型任务分别采用以下两个普遍公式。
文章图片
在理论场景中,如一个任务IO耗时40ms,CPU耗时10ms,那么在IO处理期间,CPU是空闲的,此时还可以处理4个任务(40/10),因此理论上可以按照IO和CPU的时间消耗比设定线程池大小。
文章图片
《JAVA并发编程实践》中还考虑数量乘以目标CPU的利用率在实际场景中,我们通常无法准确测算IO和CPU的耗时占比,并且随着流量变化,任务的耗时占比也不能固定。因此可根据业务需求,开设线程池运维接口,根据线上指标动态调整线程池参数。
推荐参考第二篇美团线程池应用
线程池监控
ThreadPoolExecutor
提供以下方法监控线程池:getTaskCount()
返回被调度过的任务数量getCompletedTaskCount()
返回完成的任务数量getPoolSize()
返回当前线程池线程数量getActiveCount()
返回活跃线程数量getQueue()
获取队列,一般用于监控阻塞任务数量和队列空间大小
推荐阅读
- 为什么你的路演总会超时()
- 标签、语法规范、内联框架、超链接、CSS的编写位置、CSS语法、开发工具、块和内联、常用选择器、后代元素选择器、伪类、伪元素。
- 松软可口易消化,无需烤箱超简单,新手麻麻也能轻松成功~
- 原生家庭之痛与超越
- BNC公链|BNC公链 | Eth2.0测试网Topaz已质押超100万枚ETH
- 89㎡挤出小三房,电视墙装的超好看!
- Android超简单实现沉浸式状态栏
- 超级行动力第二次作业-行动大于学习的秘密
- Day10_要想看起来毫不费力,必须付出超乎常人的努力
- 演讲手势