【线程池拒绝策略-RejectedExecutionHandler】当线程池中的线程数目达到maximumPoolSize,且任务缓存队列已满时,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
我们可以通过重写RejectedExecutionHandler来自定义一种拒绝或降级策略:
package com.example.demo.test.thread;
import com.alibaba.fastjson.JSON;
import com.example.demo.test.reflect.Student;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.annotation.PostConstruct;
import java.lang.reflect.Field;
import java.util.concurrent.*;
@Slf4j
public class ThreadPoolTest {private ThreadPoolTaskExecutor asyncTaskExecutor;
@PostConstruct
private void init() {
asyncTaskExecutor = new ThreadPoolTaskExecutor();
ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("threadName" + "%d").build();
asyncTaskExecutor.setThreadFactory(factory);
asyncTaskExecutor.setCorePoolSize(1);
asyncTaskExecutor.setMaxPoolSize(1);
asyncTaskExecutor.setAllowCoreThreadTimeOut(true);
asyncTaskExecutor.setQueueCapacity(10);
asyncTaskExecutor.setKeepAliveSeconds(3);
asyncTaskExecutor.setDaemon(true);
asyncTaskExecutor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.error("转MQ处理");
executorFailSendMQ(r);
}
});
asyncTaskExecutor.initialize();
}private void executorFailSendMQ(Runnable r){
FutureTask task = (FutureTask) r;
Student student = null;
//从线程池失败中获取student对象
try {
Field callableField = task.getClass().getDeclaredField("callable");
callableField.setAccessible(true);
Callable callable = (Callable) callableField.get(task);
Field taskField = callable.getClass().getDeclaredField("task");
taskField.setAccessible(true);
AsyncSaveTask asyncSaveTask = (AsyncSaveTask) taskField.get(callable);
student = asyncSaveTask.getStudent();
}catch (Exception e){
log.error("executorFailSendMQ error",e);
}
if(student != null){
log.info("executorFailSendMQ send ={}",JSON.toJSONString(student));
//重新发送mq
}else{
log.error("executorFailSendMQ null");
}
}class AsyncSaveTask implements Runnable {private Student student;
public AsyncSaveTask(Student student) {
this.student = student;
}public Student getStudent() {
return student;
}public void setStudent(Student student) {
this.student = student;
}@Override
public void run() {
System.out.println("AsyncSaveTask thread ={}" + Thread.currentThread().getName());
}
}
}
推荐阅读
- 使用CompletableFuture进行多任务并行处理
- 芯片|异构集成 与 异构计算
- mybatis|(附源码)计算机毕业设计ssm电影票购票系统
- mybatis|Mybatis—MappedStatement
- java|卧槽!迅雷的代码竟然被扒了精光!
- java|Java 内存泄漏的排查
- 程序员|29岁vivo员工吐槽(看完阿里P9大牛的“Java成长笔记”我悟了)
- ------【Java进阶】|小心踩雷,一次Java内存泄漏排查实战
- java|40道Java基础常见面试题及详细答案