Thread|线程安全——线程池核心——ThreadPoolExecutor自定义线程池
【Thread|线程安全——线程池核心——ThreadPoolExecutor自定义线程池】若Executors工厂无法满足我们的需求,可以自己创建自定义线程池,其实Executors工厂类里面的创建线程方法其内部实现均是用了ThreadPoolExecutor这个类,这个类可以自定义线程。构造方法如下:
文章图片
队列类型参数选择:
在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,若大于corePoolSize,则会将任务加入队列,若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。
在使用无界队列时:LinkedBlockingQueue。与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。当有新任务到来,系统的线程数小于corePoolSize时,则新建线程执行任务。当达到corePoolSize后,就不会继续增加。若后续仍有新的任务加入,而有没有空闲的线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。
package com.bfxy.thread.core.pool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class UseThreadPoolExecutor {
public static void main(String[] args) {ThreadPoolExecutor pool = new ThreadPoolExecutor(1, // corePoolSize: 核心线程数,线程池初始化的时候就会被创建
3, // maximumPoolSize: 线程池的最大上限 //在使用无界队列的时候, 此参数 不起作用
60, //线程的存活时间
TimeUnit.SECONDS,
//workQueue:BlockingQueue接口下面的实现类
//new ArrayBlockingQueue<>(2), //使用有界队列: ArrayBlockingQueue
new LinkedBlockingQueue<>(), //使用无界队列: LinkedBlockingQueue
new ThreadFactory() { //threadFactory 线程工厂, 用于获取一个新的线程, 然后把该线程 投递到我们的线程池中去
@Override
public Thread newThread(Runnable r) {
Thread th = new Thread(r, "order-thread");
if(th.getPriority() != Thread.NORM_PRIORITY) {
th.setPriority(Thread.NORM_PRIORITY);
}
if(th.isDaemon()) {
th.setDaemon(false);
}
return th;
}
}, //使用无界队列时, 拒绝策略不起到作用
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.err.println("当前的任务已经被拒绝: " + r.toString());
}
});
Task t1 = new Task(1);
Task t2 = new Task(2);
Task t3 = new Task(3);
Task t4 = new Task(4);
Task t5 = new Task(5);
Task t6 = new Task(6);
/**
//线程池提交任务的方法:
pool.execute(t1);
//execute: 如果你的任务没有返回值, 则使用该方法提交任务
pool.submit(t1);
//submit: 如果你的任务有返回值, 则使用该方法提交任务, 返回一个Future对象(Future模式)
*//**
*
* 在使用有界队列时:
* 1 若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程
* 2 若大于corePoolSize,则会将任务加入队列
* 3 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程
* 4 若线程数大于maximumPoolSize,则执行拒绝策略。
*/// 1 若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程
pool.execute(t1);
//core size = 1t1任务会被核心线程执行
// 2 若大于corePoolSize,则会将任务加入队列
pool.execute(t2);
// 有界队列容量为: 2
pool.execute(t3);
// 3 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程, 并执行该任务
pool.execute(t4);
// 线程池中的总线程数 2, maximumPoolSize = 3
pool.execute(t5);
// 线程池中的总线程数 3, maximumPoolSize = 3
// 4 若线程数大于maximumPoolSize,则执行拒绝策略。
pool.execute(t6);
pool.shutdown();
}
}
//线程池提交任务的方法:
pool.execute(t1);
//execute: 如果你的任务没有返回值, 则使用该方法提交任务
pool.submit(t1);
//submit: 如果你的任务有返回值, 则使用该方法提交任务, 返回一个Future对象(Future模式)
任务:
package com.bfxy.thread.core.pool;
public class Task implements Runnable { private int taskId;
public Task(int taskId) {
this.taskId = taskId;
} public int getTaskId() {
return taskId;
} public void setTaskId(int taskId) {
this.taskId = taskId;
} @Override
public void run() {
System.err.println("run task id : " + this.taskId);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public String toString(){
return "当前线程DI: " + this.taskId;
}}
队列类型参数选择:
在使用有界队列时,
若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,
则优先创建线程,
若大于corePoolSize,则会将任务加入队列,
若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,
若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。
队列类型参数选择:
在使用无界队列时:LinkedBlockingQueue。
与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。
当有新任务到来,系统的线程数小于corePoolSize时,则新建线程执行任务。
当达到corePoolSize后,就不会继续增加。
若后续仍有新的任务加入,而有没有空闲的线程资源,则任务直接进入队列等待。
若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。
拒绝策略:
JDK支持线程池的拒绝策略:
AbortPolicy:直接抛出异常阻止系统正常工作。
CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。
DiscardOldestPolicy:丢弃最老的一个请求,尝试再次提交当前任务。
DiscardPolicy:丢弃无法处理的任务,不给予任何处理。
如果需要自定义拒绝策略可以实现RejectedExecutionHandler接口。
文章图片
IO密集型:
线程个数大小的设置
就很好理解了,我们现在做的开发大部分都是WEB应用,涉及到大量的网络传输,
不仅如此,与数据库,与缓存间的交互也涉及到IO,一旦发生IO,线程就会处于
等待状态,当IO结束,数据准备好后,线程才会继续执行。因此从这里可以发现,
对于IO密集型的应用,我们可以多设置一些线程池中线程的数量,这样就能让在
等待IO的这段时间内,线程可以去做其它事,提高并发处理效率。那么这个线程
池的数据量是不是可以随便设置呢?当然不是的,请一定要记得,线程上下文切
换是有代价的。目前总结了一套公式,对于IO密集型应用:
线程数 = CPU核心数/(1-阻塞系数) 这个阻塞系数一般为0.8~0.9之间,也可以
取0.8或者0.9。
套用公式,对于双核CPU来说,它比较理想的线程数就是20,当然这都不是绝对的,
需要根据实际情况以及实际业务来调整:final int poolSize = (int)(cpuCore/(1-0.9))
计算密集型:
首先,我们要知道 计算机密集型和IO密集型这两个概念,小哥哥小姐姐,了解一下 !
计算密集型:
顾名思义就是应用需要非常多的CPU计算资源,在多核CPU时代,我们要让每一个CPU核
心都参与计算,将CPU的性能充分利用起来,这样才算是没有浪费服务器配置,如果在
非常好的服务器配置上还运行着单线程程序那将是多么重大的浪费。对于计算密集型的
应用,完全是靠CPU的核数来工作,所以为了让它的优势完全发挥出来,避免过多的线程
上下文切换,比较理想方案是:
线程数 = CPU核数+1,也可以设置成CPU核数*2,但还要看JDK的版本以及CPU配置(服务
器的CPU有超线程)。
Hook:
package com.bfxy.thread.core.pool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class UseCustomThreadPoolExecutor extends ThreadPoolExecutor { public UseCustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.err.println("-------线程执行之前----------");
} @Override
protected void afterExecute(Runnable r, Throwable t) {
System.err.println("-------线程执行之后----------");
}
public static void main(String[] args) {UseCustomThreadPoolExecutor uctpe = new UseCustomThreadPoolExecutor(1, 2, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
uctpe.execute(new Task(1));
}}
beforeExecute:可以初始化threadlocal(); 记录日志
afterExecute:记录日志,统计
关闭线程池
内容当线程池不在被引用并且工作线程数为0的时候,线程池将被终止。我们也可以调用shutdown来手动终止线程池。如果我们忘记调用shutdown,为了让线程资源被释放,我们还可以使用keepAliveTime和allowCoreThreadTimeOut来达到目的! 当然,稳妥的方式是使用虚拟机Runtime.getRuntime().addShutdownHook方法,手工去调用线程池的关闭方法!
推荐阅读
- Linux下面如何查看tomcat已经使用多少线程
- NeuVector 会是下一个爆款云原生安全神器吗()
- 多线程NSOperation
- Java内存泄漏分析系列之二(jstack生成的Thread|Java内存泄漏分析系列之二:jstack生成的Thread Dump日志结构解析)
- 2019-08-16day20总结
- performSelectorOnMainThread:withObject:waitUntilDone:参数设置为NO或YES的区别
- 何以解忧,企业信息化、数字化选型焦虑之五·系统安全隐患大
- spring|spring boot中设置异步请求默认使用的线程池
- K8S|K8S 生态周报| Istio 即将发布重大安全更新,多个版本受影响
- Android中非UI主线程能不能操作UI()