面试官:给我讲讲线程池(上)

场景回顾 今天回到寝室小叶唱着:快把我删了吧~,吓得我赶快了解情况,我还以为和女朋友闹变扭呢,结果说今天面试又被面试官虐了,要把面试官删了。
面试官:我看简历上写了精通并发编程,那么线程池在平时的工作中肯定需要用到吧,你一般是用在什么场景下呢?
小叶:嗯嗯,线程池平时用过的,我一般用在爬虫的场景,可以将多个网络请求通过线程池的方式去并行处理,这样子可以提高系统的吞吐量。
面试官:嗯嗯,线程池用在爬虫的场景是很常见的一种,那么你是怎么创建线程池的呢?
小叶:我一般用的是JDK自带一个工厂方法里面提供的直接创建线程池的方法,具体名字我想不大起来了。
面试官:你说的是Executors类吧,一般我们会用到里面单线程的线程池、固定线程的线程池等,那么它底层是如何去通过参数去创建不同的线程池呢?
小叶:emm...您说的是ThreadPoolExecutor吧,它好像由7个参数组成,好像有核心线程数、最大线程数、空闲时间长度、空闲时间单位、任务队列、线程工厂、拒绝策略这些参数。
面试官:嗯嗯,看来你对线程池还是颇有一些了解,假如我现在设置核心线程数10个,最大线程数20个,任务队列为无界队列,现在我一次性来了30个任务,那么当前线程池中线程数为多少,任务是否会被拒绝呢?
小叶:这个简单,最大线程数为20且一次性来了30个任务,当然线程数就是20,因为任务队列为无界的,所以任务不会被拒绝。
面试官:嗯嗯,那假如我把任务队列改为大小为20的队列,那么现在最多可以接收多少请求?
小叶:emm...应该是核心线程数+最大线程数+任务队列大小=50吧。
面试官:今天的面试就先到这里了。
我们通过小叶的面试惨状,不难发现在面试中线程池是一块经常会被面试官问到的问题,线程池几个简单的参数可能并不是我们预想的结果。以及我们在平时编码的过程中可能为了图方便直接调用Executors类提供的简便方法,但是其中可能有一些问题需要我们去考虑,在Executors类中提供的线程池方法中所有任务队列都是无界的,这可能会导致内存溢出。还有我如果我们想要采用其他的任务拒绝策略,我们要怎么实现呢?带着这些问题,下面小张带你由浅入深的解析源码。
整体结构 面试官:给我讲讲线程池(上)
文章图片

从上图可以看出ThreadPoolExecutor的整体继承关系,这里可以看出设计线程池作者,根据职责分离设计了两个接口,并通过模板方法添加钩子函数。
Executor接口

public interface Executor { // 将任务执行 具体怎么执行需要看子类如何实现 void execute(Runnable command); }

ExecutorService接口
public interface ExecutorService extends Executor { // 停止线程池,直到池中所有任务执行完毕 void shutdown(); // 立即停止线程池,并返回未执行的任务 List shutdownNow(); // 是否停止状态 boolean isShutdown(); // 是否终结 boolean isTerminated(); // 等待线程池停止 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // 提交带返回结果的任务,返回一个结果的代理对象 Future submit(Callable task); // 提交无返回的任务,任务执行成功返回result Future submit(Runnable task, T result); // 提交无返回的任务 Future submit(Runnable task); // 执行完毕tasks集合中所有的任务,并返回结果 List> invokeAll(Collection> tasks) throws InterruptedException; // 在上一个方法的基础上添加超时时间 List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException; // tasks中任一任务执行完毕就返回 T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException; // 在上一步添加了超时时间 T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }

ExecutorService描述了一个线程池应该具备关闭线程池、提交任务、任务执行策略等功能。
接下来我们通过AbstractExecutorService抽象类来看线程池是怎么通过模版方法来减少代码的。
面试官:给我讲讲线程池(上)
文章图片

【面试官:给我讲讲线程池(上)】通过类的结构图可以看出抽象类实现了submit、invokeAll、invokeAny的方法。
submit
public Future submit(Runnable task) { if (task == null) throw new NullPointerException(); // newTaskFor该方法这里先不赘述,这里可以理解为将task进行了包装 RunnableFuture ftask = newTaskFor(task, null); // 调用Executor类中的方法 execute(ftask); return ftask; }

通过源码可以看出该方法将任务包装之后直接进行调用execute方法。
invokeAll
public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { if (tasks == null) throw new NullPointerException(); // 计算出超时时间 long nanos = unit.toNanos(timeout); // 用于存放task的集合 ArrayList> futures = new ArrayList>(tasks.size()); boolean done = false; try { for (Callable t : tasks) futures.add(newTaskFor(t)); // 计算任务执行截止时间 final long deadline = System.nanoTime() + nanos; final int size = futures.size(); // 循环的执行集合中的task for (int i = 0; i < size; i++) { execute((Runnable)futures.get(i)); // 判断是否到达截止时间 nanos = deadline - System.nanoTime(); if (nanos <= 0L) return futures; } // 代码运行到这里,说明所有任务在规定时间内都已经被执行,但是处理结果完成还是未知 for (int i = 0; i < size; i++) { Future f = futures.get(i); // 获取任务的返回值代理,判断是否完成 if (!f.isDone()) { // 如已到达截止时间返回future集合 if (nanos <= 0L) return futures; try { // 阻塞等待future返回 f.get(nanos, TimeUnit.NANOSECONDS); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } catch (TimeoutException toe) { // 如果超时返回future集合 return futures; } // 更新剩余时间 nanos = deadline - System.nanoTime(); } } // 代码运行到这里说明所有任务都已被执行完成,并设置完成标志 done = true; return futures; } finally { /** 如果done为false,将所有正在执行的任务取消 只有提交任务过程中超时、获取某次执行结果超时done才会为false **/ if (!done) for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } }

invokeAny
private T doInvokeAny(Collection> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { // 空检查 if (tasks == null) throw new NullPointerException(); int ntasks = tasks.size(); if (ntasks == 0) throw new IllegalArgumentException(); ArrayList> futures = new ArrayList>(ntasks); /** 该类非本文重点具体源码解析到JUC包中阅读,这里将会概述该类提供了什么功能。 这里将线程池实例传递给该类,目的是它可以调用线程池的execute方法,不需要自己再去维护池。 该类中实现了submit提交任务并返回furure,并且在内部维护了一个阻塞队列,当任务执行成功会将future对象放入该阻塞队列 take方法是队列阻塞的获取元素 poll方法是队列非阻塞的获取元素 **/ ExecutorCompletionService ecs = new ExecutorCompletionService(this); try { // 记录异常 ExecutionException ee = null; // 如果开启超时,计算超时时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; Iterator> it = tasks.iterator(); // 将第一个任务 futures.add(ecs.submit(it.next())); // 任务数-1 --ntasks; int active = 1; for (; ; ) { // 非阻塞的获取元素 Future f = ecs.poll(); if (f == null) { // 如果此时还有任务可执行,继续将任务提交,并将活跃数+1 if (ntasks > 0) { --ntasks; futures.add(ecs.submit(it.next())); ++active; } // 活跃数为0退出循环,活跃数为0的情况为从future.get()获取抛出异常才会执行到这里 else if (active == 0) break; else if (timed) { // 如果超时开启,则在响应队列中等待获取元素,超时则报错 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); if (f == null) throw new TimeoutException(); nanos = deadline - System.nanoTime(); } else // 未开启超时,则进入无限等待的阻塞获取响应 f = ecs.take(); } // 如果响应队列非空,说明此时有任务已经完成 if (f != null) { // 活跃数-1 --active; /** 通过future获取响应结果,如果成功直接返回 如果获取结果失败记录异常,继续遍历从响应队列中拿下一个执行成功的结果进行此过程 **/ try { return f.get(); } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } }// 执行到这里说明所有的任务都执行完毕,并且所有的future.get都失败了,将会把异常抛出去 if (ee == null) ee = new ExecutionException(); throw ee; } finally { // 最后将所有任务执行一遍撤销 for (int i = 0, size = futures.size(); i < size; i++) futures.get(i).cancel(true); } }

最后结语 能够读到这里的读者,相信你已经掌握了线程池中基类中的非常重要的方法,你至少明白了如何通过设计基类增加代码复用,以及invokeAny逻辑执行非常值得耐人寻味,阅读源码是枯燥的过程,但在这个过程中你的思维会跟着大佬一步一步进化,这也是为什么互联网大厂都要求阅读过源码,在下一篇我将会正式进入ThreadPoolExecutor类中,解开它神秘的面纱,上面面试官的提问也会迎刃而解。
本文由博客一文多发平台 OpenWrite 发布!

    推荐阅读