Thread|线程安全——线程池核心——ThreadPoolExecutor自定义线程池

【Thread|线程安全——线程池核心——ThreadPoolExecutor自定义线程池】若Executors工厂无法满足我们的需求,可以自己创建自定义线程池,其实Executors工厂类里面的创建线程方法其内部实现均是用了ThreadPoolExecutor这个类,这个类可以自定义线程。构造方法如下:
Thread|线程安全——线程池核心——ThreadPoolExecutor自定义线程池
文章图片

队列类型参数选择:
在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,若大于corePoolSize,则会将任务加入队列,若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。
在使用无界队列时:LinkedBlockingQueue。与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。当有新任务到来,系统的线程数小于corePoolSize时,则新建线程执行任务。当达到corePoolSize后,就不会继续增加。若后续仍有新的任务加入,而有没有空闲的线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。

package com.bfxy.thread.core.pool; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class UseThreadPoolExecutor { public static void main(String[] args) {ThreadPoolExecutor pool = new ThreadPoolExecutor(1, // corePoolSize: 核心线程数,线程池初始化的时候就会被创建 3, // maximumPoolSize: 线程池的最大上限 //在使用无界队列的时候, 此参数 不起作用 60, //线程的存活时间 TimeUnit.SECONDS, //workQueue:BlockingQueue接口下面的实现类 //new ArrayBlockingQueue<>(2), //使用有界队列: ArrayBlockingQueue new LinkedBlockingQueue<>(), //使用无界队列: LinkedBlockingQueue new ThreadFactory() { //threadFactory 线程工厂, 用于获取一个新的线程, 然后把该线程 投递到我们的线程池中去 @Override public Thread newThread(Runnable r) { Thread th = new Thread(r, "order-thread"); if(th.getPriority() != Thread.NORM_PRIORITY) { th.setPriority(Thread.NORM_PRIORITY); } if(th.isDaemon()) { th.setDaemon(false); } return th; } }, //使用无界队列时, 拒绝策略不起到作用 new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.err.println("当前的任务已经被拒绝: " + r.toString()); } }); Task t1 = new Task(1); Task t2 = new Task(2); Task t3 = new Task(3); Task t4 = new Task(4); Task t5 = new Task(5); Task t6 = new Task(6); /** //线程池提交任务的方法: pool.execute(t1); //execute: 如果你的任务没有返回值, 则使用该方法提交任务 pool.submit(t1); //submit: 如果你的任务有返回值, 则使用该方法提交任务, 返回一个Future对象(Future模式) *//** * * 在使用有界队列时: * 1 若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程 * 2 若大于corePoolSize,则会将任务加入队列 * 3 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程 * 4 若线程数大于maximumPoolSize,则执行拒绝策略。 */// 1 若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程 pool.execute(t1); //core size = 1t1任务会被核心线程执行 // 2 若大于corePoolSize,则会将任务加入队列 pool.execute(t2); // 有界队列容量为: 2 pool.execute(t3); // 3 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程, 并执行该任务 pool.execute(t4); // 线程池中的总线程数 2, maximumPoolSize = 3 pool.execute(t5); // 线程池中的总线程数 3, maximumPoolSize = 3 // 4 若线程数大于maximumPoolSize,则执行拒绝策略。 pool.execute(t6); pool.shutdown(); } }

//线程池提交任务的方法: pool.execute(t1); //execute: 如果你的任务没有返回值, 则使用该方法提交任务 pool.submit(t1); //submit: 如果你的任务有返回值, 则使用该方法提交任务, 返回一个Future对象(Future模式)

任务:
package com.bfxy.thread.core.pool; public class Task implements Runnable { private int taskId; public Task(int taskId) { this.taskId = taskId; } public int getTaskId() { return taskId; } public void setTaskId(int taskId) { this.taskId = taskId; } @Override public void run() { System.err.println("run task id : " + this.taskId); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } public String toString(){ return "当前线程DI: " + this.taskId; }}

队列类型参数选择: 在使用有界队列时, 若有新的任务需要执行,如果线程池实际线程数小于corePoolSize, 则优先创建线程, 若大于corePoolSize,则会将任务加入队列, 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程, 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。

队列类型参数选择: 在使用无界队列时:LinkedBlockingQueue。 与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。 当有新任务到来,系统的线程数小于corePoolSize时,则新建线程执行任务。 当达到corePoolSize后,就不会继续增加。 若后续仍有新的任务加入,而有没有空闲的线程资源,则任务直接进入队列等待。 若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。

拒绝策略:
JDK支持线程池的拒绝策略: AbortPolicy:直接抛出异常阻止系统正常工作。 CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。 DiscardOldestPolicy:丢弃最老的一个请求,尝试再次提交当前任务。 DiscardPolicy:丢弃无法处理的任务,不给予任何处理。 如果需要自定义拒绝策略可以实现RejectedExecutionHandler接口。

Thread|线程安全——线程池核心——ThreadPoolExecutor自定义线程池
文章图片

IO密集型:
线程个数大小的设置 就很好理解了,我们现在做的开发大部分都是WEB应用,涉及到大量的网络传输, 不仅如此,与数据库,与缓存间的交互也涉及到IO,一旦发生IO,线程就会处于 等待状态,当IO结束,数据准备好后,线程才会继续执行。因此从这里可以发现, 对于IO密集型的应用,我们可以多设置一些线程池中线程的数量,这样就能让在 等待IO的这段时间内,线程可以去做其它事,提高并发处理效率。那么这个线程 池的数据量是不是可以随便设置呢?当然不是的,请一定要记得,线程上下文切 换是有代价的。目前总结了一套公式,对于IO密集型应用: 线程数 = CPU核心数/(1-阻塞系数) 这个阻塞系数一般为0.8~0.9之间,也可以 取0.8或者0.9。 套用公式,对于双核CPU来说,它比较理想的线程数就是20,当然这都不是绝对的, 需要根据实际情况以及实际业务来调整:final int poolSize = (int)(cpuCore/(1-0.9))

计算密集型:
首先,我们要知道 计算机密集型和IO密集型这两个概念,小哥哥小姐姐,了解一下 ! 计算密集型: 顾名思义就是应用需要非常多的CPU计算资源,在多核CPU时代,我们要让每一个CPU核 心都参与计算,将CPU的性能充分利用起来,这样才算是没有浪费服务器配置,如果在 非常好的服务器配置上还运行着单线程程序那将是多么重大的浪费。对于计算密集型的 应用,完全是靠CPU的核数来工作,所以为了让它的优势完全发挥出来,避免过多的线程 上下文切换,比较理想方案是: 线程数 = CPU核数+1,也可以设置成CPU核数*2,但还要看JDK的版本以及CPU配置(服务 器的CPU有超线程)。

Hook:
package com.bfxy.thread.core.pool; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class UseCustomThreadPoolExecutor extends ThreadPoolExecutor { public UseCustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void beforeExecute(Thread t, Runnable r) { System.err.println("-------线程执行之前----------"); } @Override protected void afterExecute(Runnable r, Throwable t) { System.err.println("-------线程执行之后----------"); } public static void main(String[] args) {UseCustomThreadPoolExecutor uctpe = new UseCustomThreadPoolExecutor(1, 2, 10L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100)); uctpe.execute(new Task(1)); }}

beforeExecute:可以初始化threadlocal(); 记录日志

afterExecute:记录日志,统计

关闭线程池
内容当线程池不在被引用并且工作线程数为0的时候,线程池将被终止。我们也可以调用shutdown来手动终止线程池。如果我们忘记调用shutdown,为了让线程资源被释放,我们还可以使用keepAliveTime和allowCoreThreadTimeOut来达到目的! 当然,稳妥的方式是使用虚拟机Runtime.getRuntime().addShutdownHook方法,手工去调用线程池的关闭方法!

    推荐阅读