线程池拒绝策略-RejectedExecutionHandler

【线程池拒绝策略-RejectedExecutionHandler】当线程池中的线程数目达到maximumPoolSize,且任务缓存队列已满时,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

我们可以通过重写RejectedExecutionHandler来自定义一种拒绝或降级策略:
package com.example.demo.test.thread; import com.alibaba.fastjson.JSON; import com.example.demo.test.reflect.Student; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import javax.annotation.PostConstruct; import java.lang.reflect.Field; import java.util.concurrent.*; @Slf4j public class ThreadPoolTest {private ThreadPoolTaskExecutor asyncTaskExecutor; @PostConstruct private void init() { asyncTaskExecutor = new ThreadPoolTaskExecutor(); ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("threadName" + "%d").build(); asyncTaskExecutor.setThreadFactory(factory); asyncTaskExecutor.setCorePoolSize(1); asyncTaskExecutor.setMaxPoolSize(1); asyncTaskExecutor.setAllowCoreThreadTimeOut(true); asyncTaskExecutor.setQueueCapacity(10); asyncTaskExecutor.setKeepAliveSeconds(3); asyncTaskExecutor.setDaemon(true); asyncTaskExecutor.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { log.error("转MQ处理"); executorFailSendMQ(r); } }); asyncTaskExecutor.initialize(); }private void executorFailSendMQ(Runnable r){ FutureTask task = (FutureTask) r; Student student = null; //从线程池失败中获取student对象 try { Field callableField = task.getClass().getDeclaredField("callable"); callableField.setAccessible(true); Callable callable = (Callable) callableField.get(task); Field taskField = callable.getClass().getDeclaredField("task"); taskField.setAccessible(true); AsyncSaveTask asyncSaveTask = (AsyncSaveTask) taskField.get(callable); student = asyncSaveTask.getStudent(); }catch (Exception e){ log.error("executorFailSendMQ error",e); } if(student != null){ log.info("executorFailSendMQ send ={}",JSON.toJSONString(student)); //重新发送mq }else{ log.error("executorFailSendMQ null"); } }class AsyncSaveTask implements Runnable {private Student student; public AsyncSaveTask(Student student) { this.student = student; }public Student getStudent() { return student; }public void setStudent(Student student) { this.student = student; }@Override public void run() { System.out.println("AsyncSaveTask thread ={}" + Thread.currentThread().getName()); } } }

    推荐阅读