Future和Promise

第四章 Future和Promise
Netty是一个异步网络处理框架,在实现中大量使用了Future机制,并在Java自带Future的基础上,增加了Promise机制。这两者的目的都是使异步编程更加方便使用。在阅读源码之前,我们需要对Future的机制有很清楚的认识。
4.1 异步编程模型
4.1.1 Future
使用Future机制时,我们调用耗时任务会立刻返回一个Future实例,使用该实例能够以阻塞的方式或者在未来某刻获得耗时任务的执行结果,还可以添加监听事件设置后续程序。
function Future asynchronousFunction(String arg){
Future future = new Future(new Callable(){

public Object call(){ return null; }

});
return future;
}
ReturnHandler handler = asynchronousFunction(); // 耗时函数,但会立即返回一个句柄
handler.getResult(); // 通过句柄可以等待结果
handler.addListener(); //通过句柄可以添加完成后执行的事件
handler.cancel(); // 通过句柄取消耗时任务
4.1.2 Promise
在Future机制中,业务逻辑所在任务执行的状态(成功或失败)是在Future中实现的,而在Promise中,可以在业务逻辑控制任务的执行结果,相比Future,更加灵活。
// 异步的耗时任务接收一个promise
function Promise asynchronousFunction(String arg){
Promisepromise = new PromiseImpl(); Object result = null; result = search()//业务逻辑, if(success){ promise.setSuccess(result); // 通知promise当前异步任务成功了,并传入结果 }else if(failed){ promise.setFailure(reason); //// 通知promise当前异步任务失败了 }else if(error){ promise.setFailure(error); //// 通知promise当前异步任务发生了异常 }

}
// 调用异步的耗时任务
Promise promise = asynchronousFunction(promise) ;//会立即返回promise
//添加成功处理/失败处理/异常处理等事件
promise.addListener(); // 例如,可以添加成功后执行的事件
doOtherThings() ; // 继续做其他事件,不需要理会asynchronousFunction何时结束
在Netty中,Promise继承了Future,包含了这两者的功能。
Java的Future机制 4.2.1 Java的Future机制
Future顾名思义,是一个未来完成的异步操作,可以获得未来返回的值。常用的场景如:调用一个耗时的方法search()(根据产品名称在全网查询价格,假设需要3s左右才能返回),该方法会立即返回Future对象,调使用Future.get()可以同步等待耗时方法的返回,也可以调用future的cancel()取消Future任务。如下面的程序,search方法逻辑会根据名字在全网查找价格,假设需要耗时3s,该方法会立即返回一个Future对象供用户线程使用;在主方法中可以使用get()等待获取到价格,也可以使用cancel()取消查询。
public Future search(String prodName) {
FutureTask future = new FutureTask(new Callable() { @Override public String call(){ try { System.out.println(String.format(">>search price of %s from internet!",prodName)); Thread.sleep(3000); return "$99.99"; }catch(InterruptedException e){ System.out.println("search function is Interrupted!"); } return null; } }); new Thread(future).start(); //交给线程去执行 return future; // 立刻返回future对象 }

JavaFuture jf = new JavaFuture();
Future future = jf.search("Netty权威指南"); // 返回future
System.out.println("Begin search,get future!");
// 测试1-【获取结果】等待3s后会返回
String prods = future.get(); //获取prods
System.out.println("get result:"+prods);
// 测试2-【取消任务】1s后取消任务
Thread.sleep(1000);
future.cancel(false); //true时会中断线程,false不会
System.out.println("Future is canceled? " + (future.isCancelled()?"yes":"no"));
Thread.sleep(4000); //等待4s检查一下future所在线程是否还在执行
4.2.2 Future的实现
假如我们需要实现一个Future,考虑一下需要实现哪些功能:
Future future = jf.search("Netty权威指南");
Future search(){
//启动线程或者在线程池中执行业务逻辑
return future; //立刻返回future
}
search方法需要立即返回一个Future对象,并且需要启动一个线程(或线程池)执行业务逻辑;
由于Future对象可以等待线程执行结束或者取消线程,Future内部需要能够管理业务逻辑的执行状态。
业务逻辑结束或异常时需要告诉Future对象,有两种方式:在Future中启动线程执行业务逻辑;或者业务逻辑单独执行,通过创建的Future实例的方法如setSuccess(result)方法通知Future。Java的FutureTask采用了第一种方法,其本身继承了Runnable,在run方法中执行传入的业务逻辑。而Netty的Promise中采用了第二种方法。
get()方法中,如果业务逻辑还未执行完毕,需要等待,可以用锁机制实现。
Java中的Future是一个接口,内部有如下方法:
boolean cancel(boolean mayInterruptIfRunning) 试图取消对此任务的执行。
V get() 如有必要,等待计算完成,然后获取其结果。
V get(long timeout, TimeUnit unit) 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
boolean isCancelled() 如果在任务正常完成前将其取消,则返回 true。
boolean isDone() 如果任务已完成,则返回 true。
下面,我们自己实现一个Future加深理解,下面定义了一个继承Future的MyFutureTask,初始化时传递一个Callable作为业务逻辑,实现Future接口是为了控制业务逻辑线程,实现Runnable接口是为了业务线程执行时能够修改Future的内部状态。
public class MyFutureTask implements Future,Runnable {
Callable callable; //业务逻辑 boolean running = false ,done = false,cancel = false; // 业务逻辑执行状态 ReentrantLock lock ; //锁 V outcome; //结果public MyFutureTask(Callable callable) { if(callable == null) { throw new NullPointerException("callable cannot be null!"); } this.callable = callable; this.done = false; this.lock = new ReentrantLock(); }@Override public boolean cancel(boolean mayInterruptIfRunning) { callable = null; cancel = true; return true; }@Override public boolean isCancelled() { return cancel; }@Override public boolean isDone() { return done; }@Override public V get() throws InterruptedException, ExecutionException { try { this.lock.lock(); //先获取锁,获得后说明业务逻辑已经执行完毕 return outcome; }finally{ this.lock.unlock(); } }@Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { try { this.lock.tryLock(timeout, unit); return outcome; }catch (InterruptedException e) { return null; }finally{ this.lock.unlock(); } } @Override public void run() { try { this.lock.lock(); // 启动线程,先上锁,防止get时直接返回 running = true; try { outcome = callable.call(); // 业务逻辑 } catch (Exception e) { e.printStackTrace(); } done = true; running = false; }finally { this.lock.unlock(); // 解锁后get可获取 } }

}
测试程序如下:
public Future search(String prodName) {
MyFutureTask future = new MyFutureTask(new Callable() { @Override public String call(){ try { System.out.println(String.format(">>search price of %s from internet!",prodName)); Thread.sleep(3000); return "$99.99"; }catch(InterruptedException e){ System.out.println("search function is Interrupted!"); } return null; } }); new Thread(future).start(); // 或提交到线程池中 return future; }

4.2.3 Java的Future实现
当然,上面是自己实现的FutureTask,Java自带的FutureTask要比上面的更加复杂和健壮。下面我们进行一些分析。
FutureTask内部维护了state,表示运行状态,只能通过set,setException, 和 cancel来修改。
private static final int NEW= 0; //初始状态, private static final int COMPLETING= 1; // 业务逻辑已经结束 private static final int NORMAL= 2; // 正常结束 private static final int EXCEPTIONAL= 3; // 异常结束 private static final int CANCELLED= 4; // 已经取消 private static final int INTERRUPTING = 5; // 中断中 private static final int INTERRUPTED= 6; // 已经中断

private volatile WaitNode waiters; 维护了等待的线程,get()方法时,如果业务逻辑还未执行完毕,则创建WaitNode q,将其q.next设置为waiters,waiters设置为q;这样组成了一个等待链表。在业务逻辑执行完毕(正常或异常结束)时,
run方法
run方法用来执行业务逻辑,在此过程中需要维护好业务逻辑的运行状态
【Future和Promise】public void run() {
// 1. 如果state不为初始状态或者runner不为null,说明已经在运行了,直接返回 // 如果为空,使用CAS将runner设置为当前线程,防止并发进入 //runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner")); if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable c = callable; if (c != null && state == NEW) { // 2.业务逻辑不为空并且state为NEW时才运行 V result; boolean ran; try { result = c.call(); // 3. 执行业务逻辑 ran = true; // ran为true表示正常返回 } catch (Throwable ex) { result = null; // 发生异常,结果为null ran = false; // 非正常结束 setException(ex); // 设置异常 } if (ran) set(result); // 正常结束,设置结果 } } finally { // 为例防止并发调用run()方法,进入run时使用cas将runner设置为非空,结束时设为null runner = null; int s = state; // 当前状态为INTERRUPTING或者INTERRUPTED 说明要取消 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); // 如果在中断进行中,则一直等待 } }

执行run方法时,要判断Future状态是否正确,必须为NEW;使用CAS将runner对象设置为当前线程,若runner不为null,说明其他线程已经执行了run方法,则直接return;
状态为NEW,执行传入的业务逻辑,正常结束时,将结果保存到result,ran设置为true;若发生异常,设置result为空,ran为false,并设置异常setException(ex);
正常结束,调用set(result); 设置结果
业务逻辑执行结束,讲runner设置为null,若线程在INTERRUPTING或者INTERRUPTED 说明要取消;如果在中断进行中,则一直等待。
setException(ex); 业务逻辑异常时调用
protected void setException(Throwable t) {
// 若状态为NEW,将其设置为COMPLETING-完成 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; // 结果为抛出的异常 UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 最终状态为EXCEPTIONAL-异常 finishCompletion(); } }

set(V v) 业务逻辑正常结束时设置结果
protected void set(V v) {
// 若状态为NEW,将其设置为COMPLETING-完成 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终状态为NORMAL-正常结束 finishCompletion(); } }

finishCompletion做了一些收尾性工作,根据waiters链表,唤醒等待的线程。
private void finishCompletion() {
// assert state > COMPLETING; for (WaitNode q; (q = waiters) != null; ) { // 遍历链表 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (; ; ) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); // 唤醒线程 } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }

get方法
get时,如果业务逻辑尚未结束,需要使用LockSupport.park(this); 将休眠等待的线程,在业务逻辑完成后,finishCompletion()会唤醒线程,之后返回业务逻辑的处理结果。
public V get() throws InterruptedException, ExecutionException {
int s = state; // 如果状态为NEW或者COMPLETING,说明还未结束,加入等待链表waiters if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); // 返回结果 }

cancel方法
cancel方法会取消执行业务逻辑的,主要逻辑如下:
public boolean cancel(boolean mayInterruptIfRunning) {
// mayInterruptIfRunning表示以中断取消 // 如果状态为NEW,说明还未执行,无需取消;讲状态设置为打断或取消 if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try {// in case call to interrupt throws exception if (mayInterruptIfRunning) { // 以中断取消 try { Thread t = runner; if (t != null) t.interrupt(); // 执行线程的interrupt方法 } finally { // 中断完成,修改状态为INTERRUPTED-已中断 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); // 唤醒等待线程 } return true; }

复制
通过分析,可以看到,java的FutureTask通过state记录业务逻辑的执行状态;多线程时使用CAS防止重复进入;业务逻辑未执行完成时,会将线程加入到waiter链表,使用LockSupport.park()阻塞业务线程;业务逻辑执行完毕或发生异常或被取消时,唤醒等待列表的线程。
与我们实现时使用的ReentrantLock在原理上是一样的,ReentrantLock的lock在获取不到锁时,也会维护一个链表保存等待列表,释放锁时,唤醒等待列表上的线程。区别在与,Java的实现会同时唤醒所有的等待线程,而unlock时等线程表会依次获得锁。
Netty的Future机制 4.3.1 Netty的Future
Netty的Future在concurrent包的Future基础上,增加了更多的功能。在Java的Future中,主要是任务的运行/取消,而Netty的Future增加了更多的功能。
public interface Future extends java.util.concurrent.Future
boolean isSuccess(); 只有IO操作完成时才返回true
boolean isCancellable(); 只有当cancel(boolean)成功取消时才返回true
Throwable cause(); IO操作发生异常时,返回导致IO操作以此的原因,如果没有异常,返回null
// 向Future添加事件,future完成时,会执行这些事件,如果add时future已经完成,会立即执行监听事件
Future addListener(GenericFutureListener> listener);
// 移除监听事件,future完成时,不会触发
Future removeListener(GenericFutureListener> listener);
Future sync() throws InterruptedException; //等待future done
Future syncUninterruptibly(); // 等待future done,不可打断
Future await() throws InterruptedException; // 等待future完成
Future awaitUninterruptibly(); // 等待future 完成,不可打断
V getNow(); // 立刻获得结果,如果没有完成,返回null
boolean cancel(boolean mayInterruptIfRunning); // 如果成功取消,future会失败,导致CancellationException
Netty为Future加入的功能主要是添加/删除监听事件,在Promise中会有实例演示。其他的方法是为get()方法服务的,get()方法可以通过调用await/getNow等方法实现。
4.3.2 Netty的Promise机制
Netty的Future与Java自带到Future略有不同,其引入了Promise机制。在Java的Future中,业务逻辑为一个Callable或Runnable实现类,该类的call()或run()执行完毕意味着业务逻辑的完结;而在Promise机制中,可以在业务逻辑中人工设置业务逻辑的成功与失败。
Netty中Promise接口的定义如下:
public interface Promise extends Future {
// 设置future执行结果为成功 Promise setSuccess(V result); // 尝试设置future执行结果为成功,返回是否设置成功

boolean trySuccess(V result);
// 设置失败
Promise setFailure(Throwable cause); boolean tryFailure(Throwable cause); // 设置为不能取消 boolean setUncancellable(); //一下省略了覆盖Future的一些方法

}
下面以一个例子来说明Promise的使用方法,还是以seach()查询产品报价为例:
// main 方法
NettyFuture4Promise test = new NettyFuture4Promise();
Promise promise = test.search("Netty In Action");
String result = promise.get();
System.out.println("price is " + result);
//
private Promise search(String prod) {
NioEventLoopGroup loop = new NioEventLoopGroup(); // 创建一个DefaultPromise并返回 DefaultPromise promise = new DefaultPromise(loop.next()); loop.schedule(new Runnable() { @Override public void run() { try { System.out.println(String.format(">>search price of %s from internet!",prod)); Thread.sleep(5000); promise.setSuccess("$99.99"); // 等待5S后设置future为成功, // promise.setFailure(new NullPointerException()); //当然,也可以设置失败 } catch (InterruptedException e) { e.printStackTrace(); } } },0,TimeUnit.SECONDS); return promise; }

可以看到,Promise能够在业务逻辑线程中通知Future成功或失败,由于Promise继承了Netty的Future,因此可以加入监听事件。
// main方法中,查询结束后获取promise,加入两个监听事件,分别给小Hong发通知和Email
Promise promise = test.search("Netty In Action");
promise.addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) throws Exception { System.out.println("Listener 1, make a notifice to Hong,price is " + future.get()); }}); promise.addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) throws Exception { System.out.println("Listener 2, send a email to Hong,price is " + future.get()); }});

Future和Promise的好处在于,获取到Promise对象后可以为其设置异步调用完成后的操作,然后立即继续去做其他任务。
4.3.3 Netty常用的Promise类
Netty常用的纯Future机制的类,有SucceededFuture和FailedFuture,他们不需要设置业务逻辑代码,会立刻完成,只需要设置成功后的返回和抛出的异常。
Netty的常用Promise类有DefalutPromise类,这是Promise实现的基础,后续会对这个类的实现进行解读;DefaultChannelPromise是DefalutPromise的子类,加入了channel这个属性。
下面对DefaultChannelPromise进行分析,其类图如下:
NettyFuture类图
DefaultPromise的使用
Netty中涉及到异步操做的地方都使用了promise,例如,下面是服务器/客户端启动时的注册任务,最终会调用unsafe的register,调用过程中会传入一个promise,unsafe进行事件的注册时调用promise可以设置成功/失败。
// SingleThreadEventLoop.java
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }

// AbstractChannel.AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; }

......
}
DefaultPromise的实现
DefaultChannelPromise提供的功能可以分为两个部分:一方面是为调用者提供get()和addListener()用于获取Future任务执行结果和添加监听事件;另一方面是为业务处理任务提供setSuccess()等方法设置任务的成功或失败。
get方法
DefaultPromise的get方法有两个,无参数的get会阻塞等待;有参数的get会等待指定事件,若未结束抛出超时异常。这两个get()是在其父类AbstractFuture中实现的,通过调用下面四个方法实现:
await(); // 等待Future任务结束
await(timeout, unit) // 等待Future任务结束,超过事件则抛出异常
cause(); // 返回Future任务的异常
getNow() // /返回Future任务的执行结果
// 先等待,如果有异常则抛出,无异常返回getNow()
public V get() throws InterruptedException, ExecutionException {
await(); Throwable cause = cause(); if (cause == null) { return getNow(); } if (cause instanceof CancellationException) { throw (CancellationException) cause; } throw new ExecutionException(cause); }

await
await()方法判断Future任务是否结束,之后获取this锁,如果任务未完成,则调用Object的wait()等待
public Promise await() throws InterruptedException {
// 判断Future任务是否结束,内部根据result是否为null判断,setSuccess或setFailure时会通过CAS修改result if (isDone()) { return this; }if (Thread.interrupted()) { // 线程是否被中断 throw new InterruptedException(toString()); }checkDeadLock(); // 检查当前线程是否与线程池运行的线程是一个synchronized (this) { while (!isDone()) { incWaiters(); // waiters计数加1 try { wait(); // Object的方法,让出cpu,加入等待队列 } finally { decWaiters(); //waiters计数减1 } } } return this; }

await(long timeout, TimeUnit unit)与awite类似,只是调用了Object对象的wait(long timeout, int nanos)方法
awaitUninterruptibly()方法在内部catch住了等待线程的中断异常,因此不会抛出中断异常。
监听事件相关方法
add/remove方法
addListener方法被调用时,将传入的回调类传入到listeners对象中,如果监听多于1个,会创建DefaultFutureListeners对象将回调方法保存在一个数组中。removeListener会将listeners设置为null(只有一个时)或从数组中移除(多个回调时)。
private void addListener0(GenericFutureListener> listener) {
if (listeners == null) { listeners = listener; } else if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).add(listener); } else { listeners = new DefaultFutureListeners((GenericFutureListener>) listeners, listener); } }

private void removeListener0(GenericFutureListener> listener) {
if (listeners instanceof DefaultFutureListeners) { ((DefaultFutureListeners) listeners).remove(listener); } else if (listeners == listener) { listeners = null; } }

notifyListeners()
在添加监听器的过程中,如果任务刚好执行完毕done(),则立即触发监听事件。触发监听通过notifyListeners()实现。主要逻辑为:如果当前addListener的线程(准确来说应该是调用notifyListeners的线程,因为addListener和setSuccess都会调用notifyListeners()和Promise内的线程池当前执行的线程是同一个线程,则放在线程池中执行,否则提交到线程池去执行;例如,main线程中调用addListener时任务完成,notifyListeners()执行回调,会提交到线程池中执行;而如果是执行Future任务的线程池中setSuccess()时调用notifyListeners(),会放在当前线程中执行。
内部维护了notifyingListeners用来记录是否已经触发过监听事件,只有未触发过且监听列表不为空,才会依次便利并调用operationComplete
private static void notifyListener0(Future future, GenericFutureListener l) {
try { l.operationComplete(future); } catch (Throwable t) { logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t); } }

setSuccess()方法
Future任务在执行完成后调用setSuccess()或setFailure()通知Future执行结果;主要逻辑是:修改result的值,若有等待线程则唤醒,通知监听事件。
if (setSuccess0(result)) { // 设置成功后唤醒等待线程
notifyListeners(); // 通知 return this;

}
// 通知成功时将结果保存在变量result,通知失败时,使用CauseHolder包装Throwable赋值给result
// RESULT_UPDATER 是一个使用CAS更新内部属性result的类,
// 如果result为null或UNCANCELLABLE,更新为成功/失败结果;UNCANCELLABLE是不可取消状态
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { checkNotifyWaiters(); // 调用Object的notifyAll(); 通知等待线程 return true; } return false; }

cancel()方法
cancel用来取消任务,根据result判断,如果可以取消,则唤醒等待线程,通知监听事件。
public boolean cancel(boolean mayInterruptIfRunning) {
//如果result为null,说明未setUncancellable()/setSuccess/setFailure if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE_HOLDER)) { checkNotifyWaiters(); // 唤醒等待线程 notifyListeners(); // 触发监听事件 return true; } return false; }

通过上面的分析,我们可以看到DefaultPromise内部通过result记录Future任务的执行状态:
null - 未完成
CANCELLATION_CAUSE_HOLDER -被取消
UNCANCELLABLE - 不可取消
业务处理调用setSuccess时传入的结果
业务处理调用setFailure时包装Throws的CauseHolder
DefaultPromise内部维护了一个监听列表保存监听事件,在任务完成或取消时通知监听事件(提交到线程池中执行);任务的等待与唤醒通过Object的wait()和notifyAll()完成
DefaultChannelPromise实现
DefaultChannelPromise是DefaultPromise的子类,内部维护了一个通道变量Channel channel; Promise机制相关的方法都是调用父类方法。
除此之外,还实现了FlushCheckpoint接口,供ChannelFlushPromiseNotifier使用,我们可以将ChannelFuture注册到ChannelFlushPromiseNotifier类,当有数据写入或到达checkpoint时使用。
interface FlushCheckpoint { long flushCheckpoint(); void flushCheckpoint(long checkpoint); ChannelPromise promise(); }

    推荐阅读