学生视角手把手带你写Java|学生视角手把手带你写Java 线程池改良版

目录

  • Java手写线程池(第二代)
    • 第二代线程池的优化
      • 线程池构造器
      • 线程池拒绝策略
      • execute方法
    • 手写线程池源码
      • MyExecutorService
      • MyRejectedExecutionException
      • MyRejectedExecutionHandle
      • 核心类MyThreadPoolExecutor
      • 线程池测试类

Java手写线程池(第二代)
第二代线程池的优化
1:新增了4种拒绝策略。分别为:MyAbortPolicy、MyDiscardPolicy、MyDiscardOldestPolicy、MyCallerRunsPolicy
2:对线程池MyThreadPoolExecutor的构造方法进行优化,增加了参数校验,防止乱传参数现象。
3:这是最重要的一个优化。
  • 移除线程池的线程预热功能。因为线程预热会极大的耗费内存,当我们不用线程池时也会一直在运行状态。
  • 换来的是在调用execute方法添加任务时通过检查workers线程集合目前的大小与corePoolSize的值去比较,再通过new MyWorker()去创建添加线程到线程池,这样好处就是当我们创建线程池如果不使用的话则对当前内存没有一点影响,当使用了才会创建线程并放入线程池中进行复用。

线程池构造器
public MyThreadPoolExecutor(){this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle); }public MyThreadPoolExecutor(int corePoolSize, BlockingQueue waitingQueue,ThreadFactory threadFactory) {this(corePoolSize,waitingQueue,threadFactory,defaultHandle); }public MyThreadPoolExecutor(int corePoolSize, BlockingQueue waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) {this.workers=new HashSet<>(corePoolSize); if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){this.corePoolSize=corePoolSize; this.waitingQueue=waitingQueue; this.threadFactory=threadFactory; this.handle=handle; }else {throw new NullPointerException("线程池参数不合法"); }}


线程池拒绝策略 策略接口:MyRejectedExecutionHandle
package com.springframework.concurrent; /** * 自定义拒绝策略 * @author 游政杰 */public interface MyRejectedExecutionHandle {void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor); }

策略内部实现类
/*** 实现自定义拒绝策略*///抛异常策略(默认)public static class MyAbortPolicy implements MyRejectedExecutionHandle{public MyAbortPolicy(){}@Overridepublic void rejectedExecution(Runnable r, MyThreadPoolExecutor t) {throw new MyRejectedExecutionException("任务-> "+r.toString()+"被线程池-> "+t.toString()+" 拒绝"); }}//默默丢弃策略public static class MyDiscardPolicy implements MyRejectedExecutionHandle{public MyDiscardPolicy() {}@Overridepublic void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {}}//丢弃掉最老的任务策略public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{public MyDiscardOldestPolicy() {}@Overridepublic void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {if(!threadPoolExecutor.isShutdown()){ //如果线程池没被关闭threadPoolExecutor.getWaitingQueue().poll(); //丢掉最老的任务,此时就有位置当新任务了threadPoolExecutor.execute(runnable); //把新任务加入到队列中}}}//由调用者调用策略public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{public MyCallerRunsPolicy(){}@Overridepublic void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {if(!threadPoolExecutor.isShutdown()){//判断线程池是否被关闭runnable.run(); }}}

封装拒绝方法
protected final void reject(Runnable runnable){this.handle.rejectedExecution(runnable, this); }protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){this.handle.rejectedExecution(runnable, threadPoolExecutor); }


execute方法
@Overridepublic boolean execute(Runnable runnable){if (!this.waitingQueue.offer(runnable)) {this.reject(runnable); return false; }else {if(this.workers!=null&&this.workers.size()
可以看出只有当往线程池放任务时才会创建线程对象。

手写线程池源码

MyExecutorService
package com.springframework.concurrent; import java.util.concurrent.BlockingQueue; /** * 自定义线程池业务接口 * @author 游政杰 */public interface MyExecutorService {boolean execute(Runnable runnable); void shutdown(); void shutdownNow(); boolean isShutdown(); BlockingQueue getWaitingQueue(); }


MyRejectedExecutionException
package com.springframework.concurrent; /** * 自定义拒绝异常 */public class MyRejectedExecutionException extends RuntimeException {public MyRejectedExecutionException() {}public MyRejectedExecutionException(String message) {super(message); }public MyRejectedExecutionException(String message, Throwable cause) {super(message, cause); }public MyRejectedExecutionException(Throwable cause) {super(cause); }}


MyRejectedExecutionHandle
package com.springframework.concurrent; /** * 自定义拒绝策略 * @author 游政杰 */public interface MyRejectedExecutionHandle {void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor); }


核心类MyThreadPoolExecutor
package com.springframework.concurrent; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; /** * 纯手撸线程池框架 * @author 游政杰 */public class MyThreadPoolExecutor implements MyExecutorService{private static final AtomicInteger taskcount=new AtomicInteger(0); //执行任务次数private static final AtomicInteger threadNumber=new AtomicInteger(0); //线程编号private static volatile int corePoolSize; //核心线程数private final HashSet workers; //工作线程private final BlockingQueue waitingQueue; //等待队列private static final String THREADPOOL_NAME="MyThread-Pool-"; //线程名称private volatile boolean isRunning=true; //是否运行private volatile boolean STOPNOW=false; //是否立刻停止private volatile ThreadFactory threadFactory; //线程工厂private static final MyRejectedExecutionHandle defaultHandle=new MyThreadPoolExecutor.MyAbortPolicy(); //默认拒绝策略private volatile MyRejectedExecutionHandle handle; //拒绝紫略public MyThreadPoolExecutor(){this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle); }public MyThreadPoolExecutor(int corePoolSize, BlockingQueue waitingQueue,ThreadFactory threadFactory) {this(corePoolSize,waitingQueue,threadFactory,defaultHandle); }public MyThreadPoolExecutor(int corePoolSize, BlockingQueue waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) {this.workers=new HashSet<>(corePoolSize); if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){this.corePoolSize=corePoolSize; this.waitingQueue=waitingQueue; this.threadFactory=threadFactory; this.handle=handle; }else {throw new NullPointerException("线程池参数不合法"); }}/*** 实现自定义拒绝策略*///抛异常策略(默认)public static class MyAbortPolicy implements MyRejectedExecutionHandle{public MyAbortPolicy(){}@Overridepublic void rejectedExecution(Runnable r, MyThreadPoolExecutor t) {throw new MyRejectedExecutionException("任务-> "+r.toString()+"被线程池-> "+t.toString()+" 拒绝"); }}//默默丢弃策略public static class MyDiscardPolicy implements MyRejectedExecutionHandle{public MyDiscardPolicy() {}@Overridepublic void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {}}//丢弃掉最老的任务策略public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{public MyDiscardOldestPolicy() {}@Overridepublic void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {if(!threadPoolExecutor.isShutdown()){ //如果线程池没被关闭threadPoolExecutor.getWaitingQueue().poll(); //丢掉最老的任务,此时就有位置当新任务了threadPoolExecutor.execute(runnable); //把新任务加入到队列中}}}//由调用者调用策略public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{public MyCallerRunsPolicy(){}@Overridepublic void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {if(!threadPoolExecutor.isShutdown()){//判断线程池是否被关闭runnable.run(); }}}//call拒绝方法protected final void reject(Runnable runnable){this.handle.rejectedExecution(runnable, this); }protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){this.handle.rejectedExecution(runnable, threadPoolExecutor); }/*** MyWorker就是我们每一个线程对象*/private final class MyWorker implements Runnable{final Thread thread; //为每个MyWorkerMyWorker(){Thread td = threadFactory.newThread(this); td.setName(THREADPOOL_NAME+threadNumber.getAndIncrement()); this.thread=td; this.thread.start(); workers.add(this); }//执行任务@Overridepublic void run() {//循环接收任务while (true){//循环退出条件://1:当isRunning为false并且waitingQueue的队列大小为0(也就是无任务了),会优雅的退出。//2:当STOPNOW为true,则说明调用了shutdownNow方法进行暴力退出。if((!isRunning&&waitingQueue.size()==0)||STOPNOW){break; }else {//不断取任务,当任务!=null时则调用run方法处理任务Runnable runnable = waitingQueue.poll(); if(runnable!=null){runnable.run(); System.out.println("task==>"+taskcount.incrementAndGet()); }}}}}//往线程池中放任务@Overridepublic boolean execute(Runnable runnable){if (!this.waitingQueue.offer(runnable)) {this.reject(runnable); return false; }else {if(this.workers!=null&&this.workers.size() getWaitingQueue() {return this.waitingQueue; }}


线程池测试类
package com.springframework.test; import com.springframework.concurrent.MyThreadPoolExecutor; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; public class ThreadPoolTest {public static void main(String[] args) {//MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor//(5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyAbortPolicy()); //MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor//(5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardPolicy()); //MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor//(5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardOldestPolicy()); MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor(5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyCallerRunsPolicy()); for(int i=0; i<11; i++){int finalI = i; myThreadPoolExecutor.execute(()->{System.out.println(Thread.currentThread().getName()+">>>>"+ finalI); }); }myThreadPoolExecutor.shutdown(); //myThreadPoolExecutor.shutdownNow(); }}

好了第二代线程池就优化到这了,后面可能还会出第三代,不断进行优化。
【学生视角手把手带你写Java|学生视角手把手带你写Java 线程池改良版】到此这篇关于学生视角手把手带你写Java 线程池改良版的文章就介绍到这了,更多相关Java 线程池内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

    推荐阅读