java.util.concurrent学习(四) FutureTask

FutureTask是一个实现了RunnableFuture的任务类,而RunnableFuture又同时继承了Runable,Future接口,那么这就说明,FutureTask同时具备了Runable,Future的属性。
java.util.concurrent学习(四) FutureTask
文章图片

FutureTask生命周期 FutureTask内部维护了Callable callable 需要执行的任务,Object outcome 返回的结果或者抛出的异常,volatile Thread runner 一个内存可见的线程用来执行callable ,volatile WaitNode waiters 一个内存可见的正在等待执行的线程的Treiber stack (表示不会翻译),volatile int state 内存可见的任务执行状态。FutureTask有七种状态,通过UNSAFE的cas操作来改变状态,这七种状态在类内部用了七个常量来定义,分别是:

private static final int NEW= 0; 新建 private static final int COMPLETING= 1; 正在完成 private static final int NORMAL= 2; 已完成 private static final int EXCEPTIONAL= 3; 异常 private static final int CANCELLED= 4; 已取消 private static final int INTERRUPTING = 5; 正在打断 private static final int INTERRUPTED= 6; 已打断

NEW
从构造方法中可以看到,FutureTask通过Executors.callable(runnable, result)的方法实现了可以构造Callable,Runable两种任务的构造方法。构造方法调用以后,将FutureTask状态置为NEW,表示一个任务已建立,等待执行。
public FutureTask(Callable callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }

COMPLETING
通过执行run方法来进入该状态。
public void run() { //不处于new状态 或者 通过UnSafe的CAS操作无法将本对象的runnerOffset从null指向当前线程 则不执行 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable c = callable; if (c != null && state == NEW) { V result; boolean ran; try { //执行任务 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; //任务异常 setException(ex); } if (ran) //结束任务 set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; //如果处于正要打断或已打断的状态 则让出cpu if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }

NORMAL
protected void set(V v) { //将任务从NEW更新为COMPLETING状态 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; //改为NORMAL UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state //完成 finishCompletion(); } }

EXCEPTIONAL
protected void setException(Throwable t) { //任务从NEW更新为COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; //改为EXCEPTIONAL UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state //完成 finishCompletion(); } }

CANCELLED/ INTERRUPTING / INTERRUPTED
public boolean cancel(boolean mayInterruptIfRunning) { //如果状态处于NEW 则根据mayInterruptIfRunning来决定原子更新为INTERRUPTING或者CANCELLED //如果状态不为NEW,或者无法从NEW转为INTERRUPTING或CANCELLED,则返回false。 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception //如果mayInterruptIfRunning为true,则正在执行的线程也会打断 if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state //最后更新为INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }

从这几个状态更新的方法我们可以看出FutureTask可能有的生命周期:
【java.util.concurrent学习(四) FutureTask】java.util.concurrent学习(四) FutureTask
文章图片

几个额外的方法 runAndReset();该方法会执行任务但不会更新任务状态并返回结果,除非发生异常,如果任务执行并且处于NEW的状态则返回ture。
awaitDone(boolean timed, long nanos);限时等待任务执行完成。
get();通过report(s)获取返回值,从上面的run方法可以看出,该方法获取到的可能是异常。
get(long timeout, TimeUnit unit); 限时获取返回值。通过awaitDone(boolean timed, long nanos)来限时。

    推荐阅读