场景回顾
今天回到寝室小叶唱着:快把我删了吧~,吓得我赶快了解情况,我还以为和女朋友闹变扭呢,结果说今天面试又被面试官虐了,要把面试官删了。
面试官:我看简历上写了精通并发编程,那么线程池在平时的工作中肯定需要用到吧,你一般是用在什么场景下呢?
小叶:嗯嗯,线程池平时用过的,我一般用在爬虫的场景,可以将多个网络请求通过线程池的方式去并行处理,这样子可以提高系统的吞吐量。
面试官:嗯嗯,线程池用在爬虫的场景是很常见的一种,那么你是怎么创建线程池的呢?
小叶:我一般用的是JDK自带一个工厂方法里面提供的直接创建线程池的方法,具体名字我想不大起来了。
面试官:你说的是Executors类吧,一般我们会用到里面单线程的线程池、固定线程的线程池等,那么它底层是如何去通过参数去创建不同的线程池呢?
小叶:emm...您说的是ThreadPoolExecutor吧,它好像由7个参数组成,好像有核心线程数、最大线程数、空闲时间长度、空闲时间单位、任务队列、线程工厂、拒绝策略这些参数。
面试官:嗯嗯,看来你对线程池还是颇有一些了解,假如我现在设置核心线程数10个,最大线程数20个,任务队列为无界队列,现在我一次性来了30个任务,那么当前线程池中线程数为多少,任务是否会被拒绝呢?
小叶:这个简单,最大线程数为20且一次性来了30个任务,当然线程数就是20,因为任务队列为无界的,所以任务不会被拒绝。
面试官:嗯嗯,那假如我把任务队列改为大小为20的队列,那么现在最多可以接收多少请求?
小叶:emm...应该是核心线程数+最大线程数+任务队列大小=50吧。
面试官:今天的面试就先到这里了。
我们通过小叶的面试惨状,不难发现在面试中线程池是一块经常会被面试官问到的问题,线程池几个简单的参数可能并不是我们预想的结果。以及我们在平时编码的过程中可能为了图方便直接调用Executors类提供的简便方法,但是其中可能有一些问题需要我们去考虑,在Executors类中提供的线程池方法中所有任务队列都是无界的,这可能会导致内存溢出。还有我如果我们想要采用其他的任务拒绝策略,我们要怎么实现呢?带着这些问题,下面小张带你由浅入深的解析源码。
整体结构
文章图片
从上图可以看出ThreadPoolExecutor的整体继承关系,这里可以看出设计线程池作者,根据职责分离设计了两个接口,并通过模板方法添加钩子函数。
Executor接口
public interface Executor {
// 将任务执行 具体怎么执行需要看子类如何实现
void execute(Runnable command);
}
ExecutorService接口
public interface ExecutorService extends Executor {
// 停止线程池,直到池中所有任务执行完毕
void shutdown();
// 立即停止线程池,并返回未执行的任务
List shutdownNow();
// 是否停止状态
boolean isShutdown();
// 是否终结
boolean isTerminated();
// 等待线程池停止
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
// 提交带返回结果的任务,返回一个结果的代理对象
Future submit(Callable task);
// 提交无返回的任务,任务执行成功返回result
Future submit(Runnable task, T result);
// 提交无返回的任务
Future> submit(Runnable task);
// 执行完毕tasks集合中所有的任务,并返回结果
List> invokeAll(Collection extends Callable> tasks)
throws InterruptedException;
// 在上一个方法的基础上添加超时时间
List> invokeAll(Collection extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
// tasks中任一任务执行完毕就返回
T invokeAny(Collection extends Callable> tasks)
throws InterruptedException, ExecutionException;
// 在上一步添加了超时时间
T invokeAny(Collection extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ExecutorService描述了一个线程池应该具备关闭线程池、提交任务、任务执行策略等功能。
接下来我们通过AbstractExecutorService抽象类来看线程池是怎么通过模版方法来减少代码的。
文章图片
【面试官:给我讲讲线程池(上)】通过类的结构图可以看出抽象类实现了submit、invokeAll、invokeAny的方法。
submit
public Future> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// newTaskFor该方法这里先不赘述,这里可以理解为将task进行了包装
RunnableFuture ftask = newTaskFor(task, null);
// 调用Executor类中的方法
execute(ftask);
return ftask;
}
通过源码可以看出该方法将任务包装之后直接进行调用execute方法。
invokeAll
public List> invokeAll(Collection extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
// 计算出超时时间
long nanos = unit.toNanos(timeout);
// 用于存放task的集合
ArrayList> futures = new ArrayList>(tasks.size());
boolean done = false;
try {
for (Callable t : tasks)
futures.add(newTaskFor(t));
// 计算任务执行截止时间
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
// 循环的执行集合中的task
for (int i = 0;
i < size;
i++) {
execute((Runnable)futures.get(i));
// 判断是否到达截止时间
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
return futures;
}
// 代码运行到这里,说明所有任务在规定时间内都已经被执行,但是处理结果完成还是未知
for (int i = 0;
i < size;
i++) {
Future f = futures.get(i);
// 获取任务的返回值代理,判断是否完成
if (!f.isDone()) {
// 如已到达截止时间返回future集合
if (nanos <= 0L)
return futures;
try {
// 阻塞等待future返回
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
// 如果超时返回future集合
return futures;
}
// 更新剩余时间
nanos = deadline - System.nanoTime();
}
}
// 代码运行到这里说明所有任务都已被执行完成,并设置完成标志
done = true;
return futures;
} finally {
/** 如果done为false,将所有正在执行的任务取消
只有提交任务过程中超时、获取某次执行结果超时done才会为false
**/
if (!done)
for (int i = 0, size = futures.size();
i < size;
i++)
futures.get(i).cancel(true);
}
}
invokeAny
private T doInvokeAny(Collection extends Callable> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
// 空检查
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList> futures = new ArrayList>(ntasks);
/**
该类非本文重点具体源码解析到JUC包中阅读,这里将会概述该类提供了什么功能。
这里将线程池实例传递给该类,目的是它可以调用线程池的execute方法,不需要自己再去维护池。
该类中实现了submit提交任务并返回furure,并且在内部维护了一个阻塞队列,当任务执行成功会将future对象放入该阻塞队列
take方法是队列阻塞的获取元素
poll方法是队列非阻塞的获取元素
**/
ExecutorCompletionService ecs =
new ExecutorCompletionService(this);
try {
// 记录异常
ExecutionException ee = null;
// 如果开启超时,计算超时时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator extends Callable> it = tasks.iterator();
// 将第一个任务
futures.add(ecs.submit(it.next()));
// 任务数-1
--ntasks;
int active = 1;
for (;
;
) {
// 非阻塞的获取元素
Future f = ecs.poll();
if (f == null) {
// 如果此时还有任务可执行,继续将任务提交,并将活跃数+1
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
// 活跃数为0退出循环,活跃数为0的情况为从future.get()获取抛出异常才会执行到这里
else if (active == 0)
break;
else if (timed) {
// 如果超时开启,则在响应队列中等待获取元素,超时则报错
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
// 未开启超时,则进入无限等待的阻塞获取响应
f = ecs.take();
}
// 如果响应队列非空,说明此时有任务已经完成
if (f != null) {
// 活跃数-1
--active;
/**
通过future获取响应结果,如果成功直接返回
如果获取结果失败记录异常,继续遍历从响应队列中拿下一个执行成功的结果进行此过程
**/
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}// 执行到这里说明所有的任务都执行完毕,并且所有的future.get都失败了,将会把异常抛出去
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
// 最后将所有任务执行一遍撤销
for (int i = 0, size = futures.size();
i < size;
i++)
futures.get(i).cancel(true);
}
}
最后结语 能够读到这里的读者,相信你已经掌握了线程池中基类中的非常重要的方法,你至少明白了如何通过设计基类增加代码复用,以及invokeAny逻辑执行非常值得耐人寻味,阅读源码是枯燥的过程,但在这个过程中你的思维会跟着大佬一步一步进化,这也是为什么互联网大厂都要求阅读过源码,在下一篇我将会正式进入ThreadPoolExecutor类中,解开它神秘的面纱,上面面试官的提问也会迎刃而解。
本文由博客一文多发平台 OpenWrite 发布!
推荐阅读
- Java|Java基础——数组
- 人工智能|干货!人体姿态估计与运动预测
- java简介|Java是什么(Java能用来干什么?)
- Java|规范的打印日志
- Linux|109 个实用 shell 脚本
- 程序员|【高级Java架构师系统学习】毕业一年萌新的Java大厂面经,最新整理
- Spring注解驱动第十讲--@Autowired使用
- SqlServer|sql server的UPDLOCK、HOLDLOCK试验
- jvm|【JVM】JVM08(java内存模型解析[JMM])
- 技术|为参加2021年蓝桥杯Java软件开发大学B组细心整理常见基础知识、搜索和常用算法解析例题(持续更新...)