笛里谁知壮士心,沙头空照征人骨。这篇文章主要讲述Java多线程与线程池技术相关的知识,希望能为你提供帮助。
一、序言
java多线程编程线程池被广泛使用,甚至成为了标配。
线程池本质是池化技术
的应用,和连接池类似,创建连接与关闭连接属于耗时操作,创建线程与销毁线程也属于重操作,为了提高效率,先提前创建好一批线程,当有需要使用线程时从线程池取出,用完后放回线程池,这样避免了频繁创建与销毁线程。
// 任务
Runnable runnable = () ->
System.out.println(Thread.currentThread().getId());
在应用中优先选用线程池执行异步任务,根据不同的场景选用不同的线程池,提高异步任务执行效率。
1、普通执行
new Thread(runnable).start();
2、线程池执行
Executors.newSingleThreadExecutor().execute(runnable)
二、线程池基础
(一)核心参数 1、核心参数线程池的核心参数决定了池的类型,进而决定了池的特性。
参数 | 解释 | 行为 |
---|---|---|
corePoolSize | 核心线程数 | 池中长期维护的线程数量,不主动回收 |
maximumPoolSize | 最大线程数 | 最大线程数大于等于核心线程数 |
keepAliveTime | 线程最大空闲时间 | 非核心线程最大空闲时间,超时回收线程 |
workQueue | 工作队列 | 工作队列直接决定线程池的类型 |
线程池 | corePoolSize | maximumPoolSize | keepAliveTime | workQueue |
---|---|---|---|---|
newCachedThreadPool | 0 | Integer.MAX_VALUE | 60 | SynchronousQueue |
newSingleThreadExecutor | 1 | 1 | 0 | LinkedBlockingQueue |
newFixedThreadPool | N | N | 0 | LinkedBlockingQueue |
newScheduledThreadPool | N | Integer.MAX_VALUE | 0 | DelayedWorkQueue |
1、通用对比
线程池 | 特点 | 适用场景 |
---|---|---|
newCachedThreadPool | 超时未使用的线程回自动销毁,有新任务时自动创建 | 适用于低频、轻量级的任务。回收线程的目的是节约线程长时间空闲而占有的资源。 |
newSingleThreadExecutor | 线程池中有且只有一个线程 | 顺序执行任务 |
newFixedThreadPool | 线程池中有固定数量的线程,且一直存在 | 适用于高频的任务,即线程在大多数时间里都处于工作状态。 |
newScheduledThreadPool | 定时线程池 | 与定时调度相关联 |
private final static ExecutorService executor = Executors.newSingleThreadExecutor();
private final static ExecutorService executor = Executors.newFixedThreadPool(1);
(三)线程池原理
文章图片
线程池主要处理流程,任务提交之后是怎么执行的。大致如下:
- 判断核心线程池是否已满,如果不是,则创建线程执行任务
- 如果核心线程池满了,判断队列是否满了,如果队列没满,将任务放在队列中
- 如果队列满了,则判断线程池是否已满,如果没满,创建线程执行任务
- 如果线程池也满了,则按照拒绝策略对任务进行处理
1、无返回值任务
execute
用于提交不需要返回结果的任务。public static void main(String[] args)
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(() ->
System.out.println("hello"));
2、有返回值任务
submit()
用于提交一个需要返回果的任务。该方法返回一个
Future
对象,通过调用这个对象的get()
方法,我们就能获得返回结果。get()
方法会一直阻塞,直到返回结果返回。我们也可以使用它的重载方法
get(long timeout, TimeUnit unit)
,这个方法也会阻塞,但是在超时时间内仍然没有返回结果时,将抛出异常TimeoutException
。public static void main(String[] args) throws Exception
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<
Long>
future = executor.submit(() ->
System.out.println("task is executed");
return System.currentTimeMillis();
);
System.out.println("task execute time is: " + future.get());
(无)关闭线程池在线程池使用完成之后,我们需要对线程池中的资源进行释放操作,这就涉及到关闭功能。我们可以调用线程池对象的
shutdown()
和shutdownNow()
方法来关闭线程池。这两个方法都是关闭操作,又有什么不同呢?
shutdown()
会将线程池状态置为SHUTDOWN
,不再接受新的任务,同时会等待线程池中已有的任务执行完成再结束。shutdownNow()
会将线程池状态置为SHUTDOWN
,对所有线程执行interrupt()
操作,清空队列,并将队列中的任务返回回来。
isShutdown()
和isTerminated
,分别表示是否关闭和是否终止。三、Executors
Executors
是一个线程池工厂,提供了很多的工厂方法,我们来看看它大概能创建哪些线程池。// 创建单一线程的线程池
public static ExecutorService newSingleThreadExecutor();
// 创建固定数量的线程池
public static ExecutorService newFixedThreadPool(int nThreads);
// 创建带缓存的线程池
public static ExecutorService newCachedThreadPool();
// 创建定时调度的线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize);
// 创建流式(fork-join)线程池
public static ExecutorService newWorkStealingPool();
1、创建单一线程的线程池任何时候线程池中至多只有一个线程,当线程执行异常终止时会自动创建一个新线程替换。如果既有异步执行任务的需求又希望任务得以顺序执行,那么此类型线程池是首选。
若多个任务被提交到此线程池,那么会被缓存到队列。当线程空闲的时候,按照FIFO的方式进行处理。
2、创建固定数量的线程池创建核心线程与最大线程数相等的固定线程数的线程池,任何时刻至多有固定数目的线程,当线程因异常而终止时则会自动创建线程替换。
当有新任务加入时,如果池内线程均处于活跃状态,则任务进入等待队列中,直到有空闲线程,队列中的任务才会被顺序执行;如果池内有非活跃线程,则任务可以立刻得以执行。
- 如果线程的数量未达到指定数量,则创建线程来执行任务
- 如果线程池的数量达到了指定数量,并且有线程是空闲的,则取出空闲线程执行任务
- 如果没有线程是空闲的,则将任务缓存到队列(队列长度为
Integer.MAX_VALUE
)。当线程空闲的时候,按照FIFO的方式进行处理
Integer.MAX_VALUE
。由于本身使用SynchronousQueue
作为等待队列的缘故,导致往队列里面每插入一个元素,必须等待另一个线程从这个队列删除一个元素。- 线程池可维护0到Integer.MAX_VALUE个线程资源,空闲线程默认情况下超过60秒未使用则会被销毁,长期闲置的池占用较少的资源。
- 当有新任务加入时,如果池中有空闲且尚未销毁的线程,则将任务交给此线程执行;如果没有可用的线程,则创建一个新线程执行任务并添加到池中。
ScheduledThreadPoolExecutor
类型的线程池。平时我们实现定时调度功能的时候,可能更多的是使用第三方类库,比如:quartz等。但是对于更底层的功能,我们仍然需要了解。四、手动创建线程池
理论上,我们可以通过
Executors
来创建线程池,这种方式非常简单。但正是因为简单,所以限制了线程池的功能。比如:无长度限制的队列,可能因为任务堆积导致OOM,这是非常严重的bug,应尽可能地避免。怎么避免?归根结底,还是需要我们通过更底层的方式来创建线程池。抛开定时调度的线程池不管,我们看看
ThreadPoolExecutor
。它提供了好几个构造方法,但是最底层的构造方法却只有一个。那么,我们就从这个构造方法着手分析。public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<
Runnable>
workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler);
这个构造方法有7个参数,我们逐一来进行分析。
corePoolSize
,线程池中的核心线程数maximumPoolSize
,线程池中的最大线程数keepAliveTime
,空闲时间,当线程池数量超过核心线程数时,多余的空闲线程存活的时间,即:这些线程多久被销毁。unit
,空闲时间的单位,可以是毫秒、秒、分钟、小时和天,等等workQueue
,等待队列,线程池中的线程数超过核心线程数时,任务将放在等待队列,它是一个BlockingQueue
类型的对象threadFactory
,线程工厂,我们可以使用它来创建一个线程handler
,拒绝策略,当线程池和等待队列都满了之后,需要通过该对象的回调函数进行回调处理
workQueue
、threadFactory
和handler
,接下来我们将进一步分析。(一)等待队列-workQueue等待队列是
BlockingQueue
类型的,理论上只要是它的子类,我们都可以用来作为等待队列。同时,jdk内部自带一些阻塞队列,我们来看看大概有哪些。
ArrayBlockingQueue
,队列是有界的,基于数组实现的阻塞队列LinkedBlockingQueue
,队列可以有界,也可以无界。基于链表实现的阻塞队列SynchronousQueue
,不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作将一直处于阻塞状态。该队列也是Executors.newCachedThreadPool()
的默认队列PriorityBlockingQueue
,带优先级的无界阻塞队列
(二)线程工厂-threadFactory
ThreadFactory
是一个接口,只有一个方法。既然是线程工厂,那么我们就可以用它生产一个线程对象。来看看这个接口的定义。public interface ThreadFactory /**
* Constructs a new @code Thread.Implementations may also initialize
* priority, name, daemon status, @code ThreadGroup, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or @code null if the request to
*create a thread is rejected
*/
Thread newThread(Runnable r);
【Java多线程与线程池技术】
Executors
的实现使用了默认的线程工厂-DefaultThreadFactory
。它的实现主要用于创建一个线程,线程的名字为pool-poolNum-thread-threadNum
。static class DefaultThreadFactory implements ThreadFactory
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory()
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
public Thread newThread(Runnable r)
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
很多时候,我们需要自定义线程名字。我们只需要自己实现
ThreadFactory
,用于创建特定场景的线程即可。(三)拒绝策略-handler所谓拒绝策略,就是当线程池满了、队列也满了的时候,我们对任务采取的措施。或者丢弃、或者执行、或者其他...
jdk自带4种拒绝策略,我们来看看。
CallerRunsPolicy
// 在调用者线程执行AbortPolicy
// 直接抛出RejectedExecutionException
异常DiscardPolicy
// 任务直接丢弃,不做任何处理DiscardOldestPolicy
// 丢弃队列里最旧的那个任务,再尝试执行当前任务
DiscardPolicy
,但是这种策略有一个弊端就是任务执行的轨迹不会被记录下来。所以,我们往往需要实现自定义的拒绝策略, 通过实现RejectedExecutionHandler
接口的方式。五、其它
配置线程池的参数前面我们讲到了手动创建线程池涉及到的几个参数,那么我们要如何设置这些参数才算是正确的应用呢?实际上,需要根据任务的特性来分析。
- 任务的性质:CPU密集型、IO密集型和混杂型
- 任务的优先级:高中低
- 任务执行的时间:长中短
- 任务的依赖性:是否依赖数据库或者其他系统资源
通常来说,如果任务属于CPU密集型,那么我们可以将线程池数量设置成CPU的个数,以减少线程切换带来的开销。如果任务属于IO密集型,我们可以将线程池数量设置得更多一些,比如CPU个数*2。
线程池监控如果系统中大量用到了线程池,那么我们有必要对线程池进行监控。利用监控,我们能在问题出现前提前感知到,也可以根据监控信息来定位可能出现的问题。
那么我们可以监控哪些信息?又有哪些方法可用于我们的扩展支持呢?
首先,
ThreadPoolExecutor
自带了一些方法。long getTaskCount()
,获取已经执行或正在执行的任务数long getCompletedTaskCount()
,获取已经执行的任务数int getLargestPoolSize()
,获取线程池曾经创建过的最大线程数,根据这个参数,我们可以知道线程池是否满过int getPoolSize()
,获取线程池线程数int getActiveCount()
,获取活跃线程数(正在执行任务的线程数)
ThreadPoolExecutor
留给我们自行处理的方法有3个,它在ThreadPoolExecutor
中为空实现(也就是什么都不做)。protected void beforeExecute(Thread t, Runnable r)
// 任务执行前被调用protected void afterExecute(Runnable r, Throwable t)
// 任务执行后被调用protected void terminated()
// 线程池结束后被调用
- 尽量使用手动的方式创建线程池,避免使用
Executors
工厂类 - 根据场景,合理设置线程池的各个参数,包括线程池数量、队列、线程工厂和拒绝策略
推荐阅读
- Flink处理函数实战之四(窗口处理)
- MapReduce编程模型和计算框架
- 三高Mysql - Mysql索引和查询优化(偏实战部分)
- 浅析$nextTick和$forceUpdate
- 听说你在写Python爬虫,你对浏览器的开发者工具了解多少(多图预警)
- Spring认证指南(了解如何以最少的配置构建应用程序)
- MySQL优化的5个维度
- Prometheus监控Harbor实战(全网首发)
- 负载均衡四层跟七层的区别