Java|Java 8 学习笔记11——CompletableFuture(组合式异步编程)

Future接口 Future接口在Java 5中被引入,设计初衷是对将来某个时刻会发生的结果进行建模。它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在Future中触发那些潜在耗时的操作把调用线程解放出来,让它能继续执行其他有价值的工作,不再需要呆呆等待耗时的操作完成。
打个比方,你可以把它想象成这样的场景:你拿了一袋子衣服到你中意的干洗店去洗。干洗店的员工会给你张发票,告诉你什么时候你的衣服会洗好(这就是一个Future事件)。衣服干洗的同时,你可以去做其他的事情。
Future的另一个优点是它比更底层的Thread更易用。要使用Future,通常你只需要将耗时的操作封装在一个Callable对象中,再将它提交给ExecutorService,就万事大吉了。下面这段代码展示了Java 8之前使用Future的一个例子。

//创建Executor-Service,通过它你可以向线程池提交任务 ExecutorService executor = Executors.newCachedThreadPool(); //向Executor-Service提交一个Callable对象 Future future = executor.submit(new Callable() { public Double call() { //以异步方式在新的线程中执行耗时的操作 return doSomeLongComputation(); }}); //异步操作进行的同时,你可以做其他的事情 doSomethingElse(); try { //获取异步操作的结果,如果最终被阻塞,无法得到结果,那么在最多等待1秒钟之后退出 Double result = future.get(1, TimeUnit.SECONDS); }catch (ExecutionException ee) { //计算抛出一个异常 }catch (InterruptedException ie) { //当前线程在等待过程中被中断 } catch (TimeoutException te) { //在Future对象完成之前超过已过期 }

这种编程方式让你的线程可以在ExecutorService以并发方式调用另一个线程执行耗时操作的同时,去执行一些其他的任务。接着,如果你已经运行到没有异步操作的结果就无法继续任何有意义的工作时,可以调用它的get方法去获取操作的结果。如果操作已经完成,该方法会立刻返回操作的结果,否则它会阻塞你的线程,直到操作完成,返回相应的结果。
那如果该长时间运行的操作永远不返回了会怎样?为了处理这种可能性,虽然Future提供了一个无需任何参数的get方法,但还是推荐使用重载版本的get方法,它接受一个超时的参数,通过它,你可以定义你的线程等待Future结果的最长时间,而不是像上面代码中那样永无止境地等待下去。
使用Future以异步方式执行长时间的操作的示意图如下
Java|Java 8 学习笔记11——CompletableFuture(组合式异步编程)
文章图片

Future接口的局限性 Future接口提供了方法来检测异步计算是否已经结束(使用isDone方法),等待异步操作结束,以及获取计算的结果。但是这些特性还不足以让你编写简洁的并发代码。比如,我们很难表述Future结果之间的依赖性;从文字描述上这很简单,“当长时间计算任务完成时,请将该计算的结果通知到另一个长时间运行的计算任务,这两个计算任务都完成后,将计算的结果与另一个查询操作结果合并”。但是,使用Future中提供的方法完成这样的操作又是另外一回事。这也是我们需要更具描述能力的特性的原因,比如下面这些。
  • 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。
  • 等待Future集合中的所有任务都完成。
  • 仅等待Future集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。
  • 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。
  • 应对Future的完成事件(即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)。
StreamCompletableFuture的设计都遵循了类似的模式:它们都使用了Lambda表达式以及流水线的思想。从这个角度,你可以说CompletableFutureFuture的关系就跟StreamCollection的关系一样。
使用CompletableFuture构建异步应用 为了展示CompletableFuture的强大特性,我们会创建一个名为“最佳价格查询器”(best-price-finder)的应用,它会查询多个在线商店,依据给定的产品或服务找出最低的价格。这个过程中,你会学到几个重要的技能。
  • 首先,你会学到如何为你的客户提供异步API(如果你拥有一间在线商店的话,这是非常有帮助的)。
  • 其次,你会掌握如何让你使用了同步API的代码变为非阻塞代码。你会了解如何使用流水线将两个接续的异步操作合并为一个异步计算操作。这种情况肯定会出现,比如,在线商店返回了你想要购买商品的原始价格,并附带着一个折扣代码——最终,要计算出该商品的实际价格,你不得不访问第二个远程折扣服务,查询该折扣代码对应的折扣比率。
  • 你还会学到如何以响应式的方式处理异步操作的完成事件,以及随着各个商店返回它的商品价格,最佳价格查询器如何持续地更新每种商品的最佳推荐,而不是等待所有的商店都返回他们各自的价格(这种方式存在着一定的风险,一旦某家商店的服务中断,用户可能遭遇白屏)。
同步API其实只是对传统方法调用的另一种称呼:你调用了某个方法,调用方在被调用方运行的过程中会等待,被调用方运行结束返回,调用方取得被调用方的返回值并继续运行。即使调用方和被调用方在不同的线程中运行,调用方还是需要等待被调用方结束运行,这就是阻塞式调用这个名词的由来。
与此相反,异步API会直接返回,或者至少在被调用方计算完成之前,将它剩余的计算任务交给另一个线程去做,该线程和调用方是异步的——这就是非阻塞式调用的由来。执行剩余计算任务的线程会将它的计算结果返回给调用方。返回的方式要么是通过回调函数,要么是由调用方再次执行一个“等待,直到计算完成”的方法调用。这种方式的计算在I/O系统程序设计中非常常见:你发起了一次磁盘访问,这次访问和你的其他计算操作是异步的,你完成其他的任务时,磁盘块的数据可能还没载入到内存,你只需要等待数据的载入完成。
实现异步API 为了实现最佳价格查询器应用,让我们从每个商店都应该提供的API定义入手。首先,商店应该声明依据指定产品名称返回价格的方法:
public class Shop { public double getPrice(String product) { //待实现 } }

该方法的内部实现会查询商店的数据库,但也有可能执行一些其他耗时的任务,比如联系其他外部服务(比如,商店的供应商,或者跟制造商相关的推广折扣)。接下来将会采用delay方法模拟这些长期运行的方法的执行,它会人为地引入1秒钟的延迟,方法声明如下。
public static void delay() { try { Thread.sleep(1000L); }catch(InterruptedException e) { throw new RuntimeException(e); } }

getPrice方法会调用delay方法,并返回一个随机计算的值,代码如下所示。返回随机计算的价格这段代码看起来有些取巧。它使用charAt,依据产品的名称,生成一个随机值作为价格。
public double getPrice(String product) { return calculatePrice(product); }private double calculatePrice(String product) { delay(); return random.nextDouble() * product.charAt(0) + product.charAt(1); }

很明显,这个API的使用者(这个例子中为最佳价格查询器)调用该方法时,它依旧会被阻塞。为等待同步事件完成而等待1秒钟,这是无法接受的,尤其是考虑到最佳价格查询器对网络中的所有商店都要重复这种操作。你是一个睿智的商店店主,你已经意识到了这种同步API会为你的用户带来多么痛苦的体验,你希望以异步API的方式重写这段代码,让用户更流畅地访问你的网站。
将同步方法转换为异步方法 为了实现这个目标,你首先需要将getPrice转换为getPriceAsync方法,并修改它的返回值:
public Future getPriceAsync(String product) { ... }

Java 5引入了java.util.concurrent.Future接口表示一个异步计算(即调用线程可以继续运行,不会因为调用方法而阻塞)的结果。这意味着Future是一个暂时还不可知值的处理器,这个值在计算完成后,可以通过调用它的get方法取得。因为这样的设计,getPriceAsync方法才能立刻返回,给调用线程一个机会,能在同一时间去执行其他有价值的计算任务。新的CompletableFuture类提供了大量的方法,让我们有机会以多种可能的方式轻松地实现这个方法,比如下面就是这样一段实现代码。
public Future getPriceAsync(String product) { CompletableFuture futurePrice = new CompletableFuture<>(); //创建CompletableFuture对象,它会包含计算的结果 new Thread(() -> { //在另一个线程中以异步方式执行计算 double price = calculatePrice(product); //需长时间计算的任务结束并得出结果时,设置Future的返回值 futurePrice.complete(price); }).start(); //无需等待还没结束的计算,直接返回Future对象 return futurePrice; }

在这段代码中,你创建了一个代表异步计算的CompletableFuture对象实例,它在计算完成时会包含计算的结果。接着,你调用fork创建了另一个线程去执行实际的价格计算工作,不等该耗时计算任务结束,直接返回一个Future实例。当请求的产品价格最终计算得出时,你可以使用它的complete方法,结束completableFuture对象的运行,并设置变量的值。很显然,这个新版Future的名称也解释了它所具有的特性。使用这个API的客户端,可以通过下面的这段代码对其进行调用。
Shop shop = new Shop("BestShop"); long start = System.nanoTime(); //查询商店,试图取得商品的价格 Future futurePrice = shop.getPriceAsync("my favorite product"); long invocationTime = ((System.nanoTime() - start) / 1_000_000); System.out.println("Invocation returned after "+invocationTime +" msecs"); //执行更多任务,比如查询其他商店 doSomethingElse(); //在计算商品价格的同时 try{ //从Future对象中读取价格,如果价格未知,会发生阻塞 double price = futurePrice.get(); System.out.printf(" Price is %.2f%n", price); } catch(Exception e){ throw new RuntimeException(e); }long retrievalTime = ((System.nanoTime() - start) / 1_000_000); System.out.println("Price returned after "+retrievalTime +" msecs");

我们看到这段代码中,客户向商店查询了某种商品的价格。由于商店提供了异步API,该次调用立刻返回了一个Future对象,通过该对象客户可以在将来的某个时刻取得商品的价格。这种方式下,客户在进行商品价格查询的同时,还能执行一些其他的任务,比如查询其他家商店中商品的价格,不会呆呆地阻塞在那里等待第一家商店返回请求的结果。最后,如果所有有意义的工作都已经完成,客户所有要执行的工作都依赖于商品价格时,再调用Future的get方法。执行了这个操作后,客户要么获得Future中封装的值(如果异步任务已经完成),要么发生阻塞,直到该异步任务完成,期望的值能够访问。上面代码产生的输出可能是下面这样:
Invocation returned after 43 msecs Price is 123. 26 Price returned after 1045 msecs

getPriceAsync方法的调用返回远远早于最终价格计算完成的时间。现在我们要解决的问题:如何正确地管理异步任务执行过程中可能出现的错误。
错误处理 如果没有意外,我们目前开发的代码工作得很正常。但是,如果价格计算过程中产生了错误会怎样呢?非常不幸,这种情况下你会得到一个相当糟糕的结果:用于提示错误的异常会被限制在试图计算商品价格的当前线程的范围内,最终会杀死该线程,而这会导致等待get方法返回结果的客户端永久地被阻塞。
客户端可以使用重载版本的get方法,它使用一个超时参数来避免发生这样的情况。这是一种值得推荐的做法,你应该尽量在你的代码中添加超时判断的逻辑,避免发生类似的问题。使用这种方法至少能防止程序永久地等待下去,超时发生时,程序会得到通知发生了TimeoutException。不过,也因为如此,你不会有机会发现计算商品价格的线程内到底发生了什么问题才引发了这样的失效。为了让客户端能了解商店无法提供请求商品价格的原因,你需要使用CompletableFuturecompleteExceptionally方法将导致CompletableFuture内发生问题的异常抛出。对之前getPriceAsync方法的实现代码优化后的结果如下所示。
public Future getPriceAsync(String product) { CompletableFuture futurePrice = new CompletableFuture<>(); new Thread(() -> { try { double price = calculatePrice(product); //如果价格计算正常结束,完成Future操作并设置商品价格 futurePrice.complete(price); } catch (Exception ex) { //否则就抛出导致失败的异常,完成这次Future操作 futurePrice.completeExceptionally(ex); } }).start(); return futurePrice; }

客户端现在会收到一个ExecutionException异常,该异常接收了一个包含失败原因的Exception参数,即价格计算方法最初抛出的异常。所以,举例来说,如果该方法抛出了一个运行时异常“product not available”,客户端就会得到像下面这样一段ExecutionException
Java|Java 8 学习笔记11——CompletableFuture(组合式异步编程)
文章图片

使用工厂方法supplyAsync创建CompletableFuture
目前为止我们已经了解了如何通过编程创建CompletableFuture对象以及如何获取返回值,虽然看起来这些操作已经比较方便,但还有进一步提升的空间,CompletableFuture类自身提供了大量精巧的工厂方法,使用这些方法能更容易地完成整个流程,还不用担心实现的细节。比如,采用supplyAsync方法后,可以用一行语句重写最开始代码中的getPriceAsync方法,如下所示。
public Future getPriceAsync(String product) { return CompletableFuture.supplyAsync(() -> calculatePrice( product)); }

supplyAsync方法接受一个生产者(Supplier)作为参数,返回一个CompletableFuture对象,该对象完成异步执行后会读取调用生产者方法的返回值。生产者方法会交由ForkJoinPool池中的某个执行线程(Executor)运行,但是你也可以使用supplyAsync方法的重载版本,传递第二个参数指定不同的执行线程执行生产者方法。一般而言,向CompletableFuture的工厂方法传递可选参数,指定生产者方法的执行线程是可行的,后面会介绍如何使用适合你应用特性的执行线程改善程序的性能。
此外,上面代码中getPriceAsync方法返回的CompletableFuture对象和再上面代码中你手工创建和完成的CompletableFuture对象是完全等价的,这意味着它提供了同样的错误管理机制,而前者你花费了大量的精力才得以构建。
接下来,我们会假设你非常不幸,无法控制Shop类提供API的具体实现,最终提供给你的API都是同步阻塞式的方法。这也是当你试图使用服务提供的HTTP API时最常发生的情况。你会学到如何以异步的方式查询多个商店,避免被单一的请求所阻塞,并由此提升你的“最佳价格查询器”的性能和吞吐量。
让你的代码免受阻塞之苦 你有一个商家的列表,如下所示:
List> shops = Arrays.asList(new Shop("BestPrice"), new Shop("LetsSaveBig"), new Shop("MyFavoriteShop"), new Shop(" BuyItAll"));

你需要使用下面这样的签名实现一个方法,它接受产品名作为参数,返回一个字符串列表,这个字符串列表中包括商店的名称、该商店中指定商品的价格:
public List> findPrices(String product);

首先,试着使用Stream特性。
采用顺序查询所有商店的方式实现的findPrices方法如下
public List> findPrices(String product) { return shops.stream() .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))) .collect(toList()); }

现在试着用该方法去查询你最近这些天疯狂着迷的唯一产品。此外,也记录下方法的执行时间,通过这些数据,我们可以比较优化之后的方法会带来多大的性能提升。
验证findPrices的正确性和执行性能如下
long start = System.nanoTime(); System.out.println(findPrices(“myPhone27S")); long duration = (System.nanoTime() - start) /1_000_000; System.out.println(“Done in “+ duration + “msecs");

上面代码的运行结果输出如下
Java|Java 8 学习笔记11——CompletableFuture(组合式异步编程)
文章图片

这里findPrices方法的执行时间仅比4秒钟多了那么几毫秒,因为对这4个商店的查询是顺序进行的,并且一个查询操作会阻塞另一个,每一个操作都要花费大约1秒钟左右的时间计算请求商品的价格。下面对这个结果进行改进。
使用并行流对请求进行并行操作 对findPrices进行并行操作如下
public List> findPrices(String product) { return shops.parallelStream() //使用并行流并行地从不同的商店获取价格 .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product))) .collect(toList()); }

【Java|Java 8 学习笔记11——CompletableFuture(组合式异步编程)】运行代码,与之前的执行结果相比较,有所改进
Java|Java 8 学习笔记11——CompletableFuture(组合式异步编程)
文章图片

现在对四个不同商店的查询实现了并行,所以完成所有操作的总耗时只有1秒多一点儿,但还可以做得更好,接下来尝试使用刚学过得CompletableFuture,将findPrices方法中对不同商店的同步调用替换为异步调用。

    推荐阅读