博观而约取,厚积而薄发。这篇文章主要讲述大厂面试官:JDK 线程池中如何不超最大线程数快速消费任务?相关的知识,希望能为你提供帮助。
01、线程池参数我们这里讲解以 ThreadPoolExecutor#execute(Runnable runnable) 举例, 这里先说下线程池的一些参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<
Runnable>
workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {...}
corePoolSize
线程池中的核心线程数量, 如果没有全局设置池内线程的过期时间, 池内会维持此数量线程
maximumPoolSize
线程池中的最大线程数量, 当核心线程都在运行任务, 并且阻塞队列中任务数量已满, 此时会创建非核心线程
keepAliveTime & unit
线程池中线程过期时间以及时间单位
workQueue
存放线程池内任务的阻塞队列, 如 ArrayBlockingQueue、LinkedBlockingQueue...
threadFactory
创建线程池中线程的线程工厂, 可以在创建线程时初始化优先级、名称、守护状态...
handler
当线程池中全部线程都在运行, 阻塞队列也满的时候, 会将添加的任务执行拒绝策略, JDK 线程池中实现了四种拒绝策略, 默认 AbortPolicy, 抛出异常
02、线程池任务添加流程相信大家在网上看到过许多类似的线程池执行流程图哈, 这里还是简要赘述下, 源码如下:
public void execute(Runnable command) {
...
int c = ctl.get();
if (workerCountOf(c) <
corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) &
&
workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) &
&
remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
} else if (!addWorker(command, false))
reject(command);
}
1、线程池提交任务首先判断当前线程数是否大于核心线程数, 否则创建核心线程执行任务
2、如果当前线程超过了核心线程数, 判断阻塞队列是否已满, 否则将任务添加到队列中
3、如果阻塞队列已满, 判断当前线程是否大于最大线程数, 否则创建非核心线程执行任务
4、如果当前线程大于或等于最大线程数, 执行拒绝策略
文章图片
线程池任务提交流程
这道问题的意图就是要将第二步进行改写
如果当前线程大于核心线程数, 不将任务放入阻塞队列, 而是创建非核心线程执行任务
举例说明一下:
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(1, 3, 60,
TimeUnit.SECONDS,
new ArrayBlockingQueue(10));
for (int i = 0;
i <
7;
i++) {
threadPoolExecutor.execute(() ->
{
System.out.println(Thread.currentThread().getName() + "-执行任务");
LockSupport.park();
});
}
threadPoolExecutor.shutdown();
/**
* pool-1-thread-1执行任务
*/
}
看到这段代码, 正常情况下只会有一个任务会被执行, 其余任务会被放置阻塞队列中
而我们需要做的就是, 发现池内线程大于核心线程数, 不放入阻塞队列, 而是创建非核心线程进行消费任务
本地代码实现参考 Dubbo 源码中 EagerThreadPoolExecutor, 确实能实现对应效果, 这里就不演示了, 一起看一下 Dubbo 如何做的
03、Dubbo 中实现的快速消费Dubbo 中涉及到的类有两个, EagerThreadPoolExecutor[2]、TaskQueue[3]
这里贴一下重点代码
3.1 TaskQueue
public class TaskQueue<
R extends Runnable>
extends LinkedBlockingQueue<
Runnable>
{
...
// 队列中持有线程池的引用
private EagerThreadPoolExecutor executor;
public TaskQueue(int capacity) {
super(capacity);
}public void setExecutor(EagerThreadPoolExecutor exec) {
executor = exec;
}@Override
public boolean offer(Runnable runnable) {
...
// 获取线程池中线程数
int currentPoolThreadSize = executor.getPoolSize();
// 如果有核心线程正在空闲, 将任务加入阻塞队列, 由核心线程进行处理任务
if (executor.getSubmittedTaskCount() <
currentPoolThreadSize) {
return super.offer(runnable);
}/**
*【重点】当前线程池线程数量小于最大线程数
* 返回false, 根据线程池源码, 会创建非核心线程
*/
if (currentPoolThreadSize <
executor.getMaximumPoolSize()) {
return false;
}// 如果当前线程池数量大于最大线程数, 任务加入阻塞队列
return super.offer(runnable);
}
}
存在一个疑点, getSubmittedTaskCount() 是如何获取提交任务数量的?
这里就需要看一下 EagerThreadPoolExecutor 实现了, 也比较简单, 只是 重写了线程池的两个方法: afterExecute()、execute()
3.2 EagerThreadPoolExecutor
public class EagerThreadPoolExecutor extends ThreadPoolExecutor {/**
* task count
*/
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
/**
* @return current tasks which are executed
*/
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTaskCount.decrementAndGet();
}@Override
public void execute(Runnable command) {
if (command == null) {
throw new NullPointerException();
}
// do not increment in method beforeExecute!
submittedTaskCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
// retry to offer the task into queue.
final TaskQueue queue = (TaskQueue) super.getQueue();
try {
if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.", rx);
}
} catch (InterruptedException x) {
submittedTaskCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} catch (Throwable t) {
// decrease any way
submittedTaskCount.decrementAndGet();
throw t;
}
}
}
EagerThreadPoolExecutor 继承了 ThreadPoolExecutor, 在 execute() 上做了个性化设计
并在线程池内新增了一个任务数量的字段, 是一个原子类, 添加任务时自增, 任务异常及结束时递减
【大厂面试官(JDK 线程池中如何不超最大线程数快速消费任务())】这样就能保证 TaskQueue#offer(Runnable runnable) 做出逻辑处理
推荐阅读
- Office365混合部署实战系列教程二(制定混合部署的迁移方案)
- 递归问题闲究——汉诺塔问题
- 用vue+eslint+vscode实现代码规范化
- 看动画学算法之:栈stack
- 网络基础之重TCP/UDP三次握手和四次挥手
- 首发Citrix/view虚拟桌面禁用普通用户更改或删除网卡权限
- 深入剖析 Spring WebFlux
- 信息流推荐系统智能交付解决方案探索
- 计算机网络基础