Java8中AbstractExecutorService与FutureTask源码详解
目录
- 前言
- 一、AbstractExecutorService
- 1、定义
- 2、submit
- 3、invokeAll
- 4、invokeAny
- 二、FutureTask
- 1、定义
- 2、构造方法
- 3、get
- 4、run/ runAndReset
- 5、 cancel
- 三、ExecutorCompletionService
- 1、定义
- 2、submit
- 3、take/ poll
- 总结
前言 本篇博客重点讲解ThreadPoolExecutor的三个基础设施类AbstractExecutorService、FutureTask和ExecutorCompletionService的实现细节,AbstractExecutorService实现了ExecutorService的大部分接口,子类只需实现excute方法和shutdown相关方法即可;FutureTask是RunnableFuture接口的主要实现,该接口是Runnable和Future的包装类接口,会执行Runnable对应的run方法,调用方可以通过Future接口获取任务的执行状态和结果;ExecutorCompletionService是帮助获取多个RunnableFuture任务的执行结果的工具类,基于FutureTask执行完成时的回调方法done实现的。
一、AbstractExecutorService
1、定义
ThreadPoolExecutor的类继承关系如下:
文章图片
其中ExecutorService的子类如下:
文章图片
右上角带S的表示内部类,我们重点关注ThreadPoolExecutor,ScheduledThreadPoolExecutor和ForkJoinPool三个类的实现,后面两个类会在后面的博客中逐一探讨。
Executor包含的方法如下:
文章图片
ExecutorService包含的方法如下:
文章图片
上述接口方法中涉及的Callable接口的定义如下:
文章图片
该接口也是表示一个执行任务,跟常见的Runnable接口的区别在于call方法有返回值而run方法没有返回值。
Future表示某个任务的执行结果,其定义的方法如下:
文章图片
其子类比较多,如下:
文章图片
后面会将涉及的子类逐一探讨的。 AbstractExecutorService基于Executor接口的excute方法实现了大部分的ExecutorService的接口,子类只需要重点实现excute方法和shutdown相关方法即可,下面来分析其具体的实现。
2、submit
//Runnable接口方法没有返回值,但是可以通过Future判断任务是否执行完成public Future> submit(Runnable task) {if (task == null) throw new NullPointerException(); RunnableFutureftask = newTaskFor(task, null); execute(ftask); return ftask; } //因为Runnable的run方法没有返回值,所以如果run方法正常执行完成,其结果就是resultpublic Future submit(Runnable task, T result) {if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, result); execute(ftask); return ftask; } public Future submit(Callable task) {if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task); execute(ftask); return ftask; } //都是返回FutureTaskprotected RunnableFuture newTaskFor(Runnable runnable, T value) {return new FutureTask (runnable, value); } protected RunnableFuture newTaskFor(Callable callable) {return new FutureTask (callable); }
3、invokeAll
//执行完成tasks中所有的任务,如果有一个抛出异常,则取消掉剩余的任务publicList > invokeAll(Collection extends Callable > tasks)throws InterruptedException {if (tasks == null)throw new NullPointerException(); ArrayList > futures = new ArrayList >(tasks.size()); boolean done = false; try {//遍历tasks中的任务将其转换成RunnableFuture,然后提交到线程池执行for (Callable t : tasks) {RunnableFuture f = newTaskFor(t); futures.add(f); execute(f); }//遍历Future列表for (int i = 0, size = futures.size(); i < size; i++) {Future f = futures.get(i); if (!f.isDone()) { //如果未执行完成try {//等待任务执行完成f.get(); } catch (CancellationException ignore) {} catch (ExecutionException ignore) {}}}//所有任务都执行完了done = true; return futures; } finally {if (!done)//出现异常,将所有的任务都取消掉for (int i = 0, size = futures.size(); i < size; i++)futures.get(i).cancel(true); }} //逻辑同上,不过加了等待时间限制,所有的任务的累计时间不能超过指定值,如果超时直接返回Future列表public List > invokeAll(Collection extends Callable > tasks,long timeout, TimeUnit unit)throws InterruptedException {if (tasks == null)throw new NullPointerException(); //转换成纳秒long nanos = unit.toNanos(timeout); ArrayList > futures = new ArrayList >(tasks.size()); boolean done = false; try {//转换成Futurefor (Callable t : tasks)futures.add(newTaskFor(t)); //计算终止时间final long deadline = System.nanoTime() + nanos; final int size = futures.size(); for (int i = 0; i < size; i++) {execute((Runnable)futures.get(i)); nanos = deadline - System.nanoTime(); //计算剩余时间if (nanos <= 0L) //如果超时了则直接返回return futures; } for (int i = 0; i < size; i++) {Future f = futures.get(i); if (!f.isDone()) { //任务未执行if (nanos <= 0L)return futures; //等待超时try {//等待任务执行完成f.get(nanos, TimeUnit.NANOSECONDS); } catch (CancellationException ignore) {} catch (ExecutionException ignore) {} catch (TimeoutException toe) {return futures; }nanos = deadline - System.nanoTime(); }}done = true; return futures; } finally {if (!done) //出现异常,取消掉剩余未执行的任务for (int i = 0, size = futures.size(); i < size; i++)futures.get(i).cancel(true); }}
4、invokeAny
//多个任务只要有一个执行成功就返回,并把剩余的已提交未执行的任务给取消掉publicT invokeAny(Collection extends Callable > tasks)throws InterruptedException, ExecutionException {try {return doInvokeAny(tasks, false, 0); } catch (TimeoutException cannotHappen) {assert false; return null; }} //多个任务只要有一个执行成功就返回,并把剩余的已提交未执行的任务给取消掉//如果指定时间内没有执行成功的,则抛出TimeoutException 异常public T invokeAny(Collection extends Callable > tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {return doInvokeAny(tasks, true, unit.toNanos(timeout)); } private T doInvokeAny(Collection extends Callable > tasks,boolean timed, long nanos)throws InterruptedException, ExecutionException, TimeoutException {//参数校验if (tasks == null)throw new NullPointerException(); int ntasks = tasks.size(); if (ntasks == 0)throw new IllegalArgumentException(); ArrayList > futures = new ArrayList >(ntasks); ExecutorCompletionService ecs =new ExecutorCompletionService (this); try {ExecutionException ee = null; final long deadline = timed ? System.nanoTime() + nanos : 0L; Iterator extends Callable > it = tasks.iterator(); //提交一个任务futures.add(ecs.submit(it.next())); --ntasks; int active = 1; for (; ; ) {//获取最新的已完成任务Future f = ecs.poll(); if (f == null) {//没有执行完的if (ntasks > 0) {--ntasks; //继续添加下一个任务futures.add(ecs.submit(it.next())); ++active; }else if (active == 0) //所有任务都执行失败了,没有执行成功的break; else if (timed) { //等待超时f = ecs.poll(nanos, TimeUnit.NANOSECONDS); if (f == null)throw new TimeoutException(); //计算剩余等待时间nanos = deadline - System.nanoTime(); }else//所有任务都提交了,阻塞等待某个任务执行完成f = ecs.take(); }if (f != null) {--active; try {//某个任务已执行完成,如果抛出异常则执行下一个任务return f.get(); } catch (ExecutionException eex) {ee = eex; } catch (RuntimeException rex) {ee = new ExecutionException(rex); }}} //for循环终止//所有任务都执行失败了if (ee == null)ee = new ExecutionException(); throw ee; } finally {//返回前,将未执行完成的任务都取消掉for (int i = 0, size = futures.size(); i < size; i++)futures.get(i).cancel(true); }}
二、FutureTask 1、定义
FutureTask的类继承关系如下:
文章图片
RunnableFuture接口没有新增方法,将Runnable的run方法由public改成包级访问了,如下:
文章图片
该类包含的实例属性如下:
/** 执行的任务*/private Callablecallable; /** 任务执行的结果或者执行过程中抛出的异常 */private Object outcome; // non-volatile, protected by state reads/writes/** 执行任务的线程 */private volatile Thread runner; /** 等待线程的链表*/private volatile WaitNode waiters; //状态private volatile int state;
其中WaitNode是一个简单的内部类,其定义如下:
文章图片
该类包含的静态属性都是字段偏移量,通过static代码块初始化,如下:
文章图片
FutureTask定义了多个表示状态的常量,如下:
//初始状态private static final int NEW= 0; //是一个很短暂的中间状态,表示任务已执行完成,保存完执行结果后就流转成NORMAL或者EXCEPTIONALprivate static final int COMPLETING= 1; //正常执行完成private static final int NORMAL= 2; //异常终止private static final int EXCEPTIONAL= 3; //任务被取消了private static final int CANCELLED= 4; //是一个很短暂的中间状态,调用interrupt方法后,会将状态流转成INTERRUPTEDprivate static final int INTERRUPTING = 5; //任务执行已中断private static final int INTERRUPTED= 6;
可能的状态流转如下图:
文章图片
2、构造方法
public FutureTask(Callablecallable) {if (callable == null)throw new NullPointerException(); this.callable = callable; this.state = NEW; //初始状态是NEW} public FutureTask(Runnable runnable, V result) {//将Runnable适配成Callablethis.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable} //Executors方法public static Callable callable(Runnable task, T result) {if (task == null)throw new NullPointerException(); return new RunnableAdapter (task, result); }
其中RunnableAdapter是Executors的一个静态内部类,其实现如下:
文章图片
3、get
get方法用于阻塞当前线程直到任务执行完成,如果阻塞的过程中被中断则抛出异常InterruptedException,可以限制阻塞的时间,如果等待超时还是未完成则抛出异常TimeoutException。
//阻塞当前线程等待任务执行完成public V get() throws InterruptedException, ExecutionException {int s = state; if (s <= COMPLETING) //如果未完成s = awaitDone(false, 0L); return report(s); } //同上,可以限制等待的时间public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException(); int s = state; if (s <= COMPLETING &&//阻塞当前线程,如果返回值还是未完成说明是等待超时了,则抛出异常(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } //timed为true表示等待指定的时间,否则是无期限等待//该方法返回退出此方法时的状态private int awaitDone(boolean timed, long nanos)throws InterruptedException {//计算等待的终止时间final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (; ; ) {if (Thread.interrupted()) { //如果当前线程被中断了,则从等待链表中移除,并抛出异常removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { //如果任务已执行完if (q != null)q.thread = null; return s; }else if (s == COMPLETING) //正在状态流转的过程中,让出当前CPU时间片Thread.yield(); //未开始执行else if (q == null)q = new WaitNode(); else if (!queued)//修改waiters属性,插入到链表头queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q); //已插入到链表中else if (timed) {nanos = deadline - System.nanoTime(); if (nanos <= 0L) { //等待超时,从链表中移除removeWaiter(q); return state; }//让当前线程休眠LockSupport.parkNanos(this, nanos); }else//让当前线程休眠LockSupport.park(this); }} private void removeWaiter(WaitNode node) {if (node != null) {node.thread = null; //将thread置为nullretry:for (; ; ) {// restart on removeWaiter racefor (WaitNode pred = null, q = waiters, s; q != null; q = s) {s = q.next; if (q.thread != null)pred = q; //q.thread为null,需要被移除else if (pred != null) {pred.next = s; //将q从链表移除if (pred.thread == null) //如果为null,则从头开始遍历continue retry; }//q.thread为null,pred为null,之前没有有效节点,修改waiters,修改失败重试else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,q, s))continue retry; }break; }}} //awaitDone正常返回后调用此方法,此时状态应该是COMPLETING之后了private V report(int s) throws ExecutionException {Object x = outcome; if (s == NORMAL) //如果是正常结束return (V)x; if (s >= CANCELLED) //如果被取消了throw new CancellationException(); throw new ExecutionException((Throwable)x); //如果出现异常了}
4、run / runAndReset
run方法是有线程池调用的,会执行Callable任务,保存执行的结果,如果出现异常则保存异常对象,并完成状态流转,最后将等待任务完成的阻塞中的线程唤醒。runAndReset和run类似,区别在于runAndReset正常执行完成后不会保存执行的结果,不会改变状态,状态还是NEW,如果是正常执行则返回true,该方法是子类使用的,其调用链如下:
文章图片
这两方法的实现如下:
//由线程池中的某个线程调用此方法public void run() {//如果不等于NEW,说明其他某个线程正在执行任务//如果等于NEW,则cas修改runner属性,修改失败说明其他某个线程也准备执行这个任务if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return; //cas成功表示这个任务由当前线程抢占成功try {Callablec = callable; if (c != null && state == NEW) {V result; boolean ran; try {//执行任务result = c.call(); ran = true; } catch (Throwable ex) {//出现异常result = null; ran = false; setException(ex); //保存异常对象}if (ran)//执行成功保存结果set(result); }} finally {//如果任务被cancel了,则上述setException和set方法因为状态不是NEW了会直接返回runner = null; int s = state; if (s >= INTERRUPTING) //如果被中断,自旋等待中断完成handlePossibleCancellationInterrupt(s); }} //跟run方法相比区别就是正常执行完成不会保存结果,不会流转状态protected boolean runAndReset() {//如果state不是NEW或者cas修改runner失败if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return false; boolean ran = false; int s = state; try {Callable c = callable; if (c != null && s == NEW) {try {//执行任务,但是不保存结果,状态就不会从NEW流转成NORMALc.call(); // don't set resultran = true; } catch (Throwable ex) {setException(ex); //保存异常实例}}} finally {runner = null; s = state; if (s >= INTERRUPTING) //任务被中断了,自旋等待中断完成handlePossibleCancellationInterrupt(s); }//返回任务是否正常完成return ran && s == NEW; } //保存异常对象并修改状态protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//只有原来的状态是NEW才进入下面的逻辑outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state//任务执行完成,唤醒阻塞的线程finishCompletion(); }} //保存执行结果并修改状态protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//只有原来的状态是NEW才进入下面的逻辑outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state//任务执行完成,唤醒阻塞的线程finishCompletion(); }} private void handlePossibleCancellationInterrupt(int s) {//正在中断的过程中if (s == INTERRUPTING)while (state == INTERRUPTING)Thread.yield(); //自旋等待中断完成} private void finishCompletion() {// assert state > COMPLETING; for (WaitNode q; (q = waiters) != null; ) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {//cas将waiters置为nullfor (; ; ) {Thread t = q.thread; if (t != null) {//唤醒阻塞的新线程q.thread = null; LockSupport.unpark(t); }//遍历下一个节点WaitNode next = q.next; if (next == null) //遍历结束,终止循环break; q.next = null; // unlink to help gcq = next; }break; //终止外层循环}}//执行完成的回调方法,默认是空实现,子类可改写此方法done(); callable = null; // to reduce footprint}
【Java8中AbstractExecutorService与FutureTask源码详解】
5、 cancel
cancel方法的参数为true,则会将当前状态由NEW改成INTERRUPTING,如果此任务已经开始执行了,则将正在执行任务的线程标记为已中断,如果该线程响应中断则可能抛出异常,如果不响应中断则继续执行,最后再将状态改成INTERRUPTED;如果方法的参数为false,则将当前状态由NEW改成CANCELLED,如果此任务已经开始执行了则会继续执行。上述两种情形下,状态流转完成后都会唤醒还在阻塞中的等待线程,如果任务已经开始执行并且继续执行,因为状态已经不是NEW了,直接结果不会保存下来。
//如果mayInterruptIfRunning为true,则会将正在执行任务的线程标记为已中断,线程有可能继续执行,也有可能响应中断抛出异常//如果为false,则标记为CANCELLED,如果任务已经开始执行了则会继续执行//如果未执行,则标记为CANCELLED或者INTERRUPTING都会让这任务不会被执行了public boolean cancel(boolean mayInterruptIfRunning) {//如果state不是NEW 或者cas修改失败,则返回falseif (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false; try {// in case call to interrupt throws exceptionif (mayInterruptIfRunning) {try {Thread t = runner; if (t != null)t.interrupt(); //将正在执行任务的线程标记为已中断} finally { //修改状态为已中断 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); }}} finally {//唤醒等待的线程finishCompletion(); }return true; }
三、ExecutorCompletionService 1、定义
ExecutorCompletionService是一个帮助获取多个Future执行结果的工具类,其类继承关系如下:
文章图片
CompletionService包含的方法如下:
文章图片
后面会讲解各方法的用途,该类包含的属性如下:
//执行任务的线程池实现private final Executor executor; //调用其newTaskFor方法private final AbstractExecutorService aes; //已执行完成的Future阻塞队列private final BlockingQueue> completionQueue;
其构造方法实现如下:
public ExecutorCompletionService(Executor executor) {if (executor == null)throw new NullPointerException(); this.executor = executor; //如果executor继承自AbstractExecutorService,则aes为executor,否则为nullthis.aes = (executor instanceof AbstractExecutorService) ?(AbstractExecutorService) executor : null; //没有指定队列,默认使用基于链表的无固定容量的LinkedBlockingQueuethis.completionQueue = new LinkedBlockingQueue>(); } public ExecutorCompletionService(Executor executor,BlockingQueue > completionQueue) {if (executor == null || completionQueue == null)throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ?(AbstractExecutorService) executor : null; this.completionQueue = completionQueue; }
2、submit
submit方法将Callable或者Runnable任务包装成一个RunnableFuture,然后提交到线程池中,返回RunnableFuture实例。
public Futuresubmit(Callable task) {if (task == null) throw new NullPointerException(); //将其包装成RunnableFuture实现类RunnableFuture f = newTaskFor(task); //提交任务到线程池executor.execute(new QueueingFuture(f)); return f; } public Future submit(Runnable task, V result) {if (task == null) throw new NullPointerException(); RunnableFuture f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; } private RunnableFuture newTaskFor(Callable task) {if (aes == null)return new FutureTask (task); //默认使用FutureTask作为RunnableFuture的实现elsereturn aes.newTaskFor(task); //如果aes不为null,则使用该类的特定实现} private RunnableFuture newTaskFor(Runnable task, V result) {if (aes == null)return new FutureTask (task, result); elsereturn aes.newTaskFor(task, result); }
其中QueueingFuture是一个内部类,继承自FutureTask,其实现如下:
文章图片
重点改写了done方法的实现,如果任务已经执行完成,则会将该Future实例添加到阻塞队列中。
3、take / poll
这三方法就是从已完成的Future阻塞队列中获取并移除Future实例,如果队列为空,take方法会无期限阻塞阻塞,不带时间参数的poll方法不会阻塞返回null,带时间参数的poll方法会阻塞指定的时间,如果超时则返回null,其实现都是直接调用阻塞队列的方法,如下:
文章图片
总结 到此这篇关于Java8中AbstractExecutorService与FutureTask源码详解的文章就介绍到这了,更多相关Java8 AbstractExecutorService与FutureTask内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
推荐阅读
- response文件流输出文件名中文不显示的解决
- 如何在mapper文件中使用in("str1","str2")
- 浏览器动态显示服务器日志,基于 websocket 实现远程实时日志 在浏览器中查看设备的运行日志...
- 凯迪拉克故障u1510案例_凯迪拉克ATSL行驶中仪表板偶尔提示“请速检修车辆”
- YYC蜻蜓系统系列|点发送短信无反应是很多用户在搭建优雅草蜻蜓q系统中常见问题-解决点击发送短信无反应
- testflight 公开版本中应用90天到期失效了该怎么处理-testflight 到期如何续期
- 深度学习|yolov5中使用denselayer替换focus,fpn结构改为bi-fpn
- 数据库|TiDB Online DDL 在 TiCDC 中的应用丨TiDB 工具分享
- 数据库开发|DM 中 relay log 性能优化实践丨TiDB 工具分享
- Java|几种常见的注册中心以及区别