异步编程|异步编程 CompletableFuture

1. 前言 刚工作时,对于异步编程的观念,可能停留在线程、线程池的概念。创建一个新线程去执行,只执行用 Runnable,想要有返回值就用 Callable 返回 Future。再有经验一点的,用线程池来管理线程。但这些都停留在基础阶段,到实际开发时会遇到各种复杂的应用场景,虽然并发包里面也提供了一些像 SemaphoreCountDownLatchCyclicBarrier 的协作类,但还是不够,这里给大家介绍 jdk8 推出的 CompletableFuture
先列出来几个实际开发中遇到的问题吧:

  • 代码中有一个异步调用方法A,还有另外一个方法B需要依赖方法A调用完成后,处理方法A返回的结果。如果通过 future.get()等待方法A执行完成,无疑又回到了同步阻塞。
  • 代码中有多个异步调用方法A、B、C、D,但最终的结果需要等所有异步方法都执行完才能返回。这种场景也比较常见,微服务项目中,某个接口要同时获取不同业务服务的数据。
这些问题,在学会了 CompletableFuture之后都不是问题,它的针对传统异步编程方式的提升,主要体现在支持了异步回调。CompletableFuture 之于传统异步编程的提升,我觉得有点像 AIO(异步非阻塞)之于 NIO(同步非阻塞)。
下面是关于 CompletableFuture的用法介绍,它的方法很多很杂,不少功能性都有重叠。我是每个方法都试过之后,按照自己的理解做了分类,每个分类按照下面的单个章节来说明。
2. 基础 CompletableFuture 类实现了 Future 和 CompletionStage 接口,因此依然可以使用原有 Future 提供的方法,但新的特性都在 CompletionStage 里面。
创建 CompletableFuture
CompletableFuture 提供了几个 static 方法,它们使用任务来实例化一个 CompletableFuture 实例。
CompletableFuture CompletableFuture.runAsync(Runnable runnable); CompletableFuture CompletableFuture.runAsync(Runnable runnable, Executor executor); CompletableFuture CompletableFuture.supplyAsync(Supplier supplier); CompletableFuture CompletableFuture.supplyAsync(Supplier supplier, Executor executor)

这里对比下几个区别:
  • runAsync 类似 Runnable,注重执行任务,无返回值,因此返回 CompletableFuture。而 supplyAsync 类似 Callable,有返回值 CompletableFuture
  • 这两个方法的带 executor 的重载方法,表示让任务在指定的线程池中执行,不指定的话,通常任务是在 ForkJoinPool.commonPool() 线程池中执行的。后续介绍的其他 CompletableFuture 方法基本都有带 executor 的重载方法。
Future 方法
因为 实现了 Future 接口,所以依然可以使用原有 Future 提供的方法。
public Tget() public Tget(long timeout, TimeUnit unit) public TgetNow(T valueIfAbsent) public Tjoin()

  • getNow有点特殊,如果结果已经计算完则返回结果或者抛出异常,否则返回给定的valueIfAbsent值。
  • join返回计算的结果或者抛出一个unchecked异常(CompletionException),它和get对抛出的异常的处理有些细微的区别。
显示完成
  • boolean complete(T value):如果尚未完成,则将get()和相关方法返回的值设置为给定值。如果完成了,则正常返回 get()值。
  • boolean completeExceptionally(Throwable ex):如果尚未完成,则导致调用get()和相关方法抛出给定的异常。如果完成了,则正常返回 get()值。
有个例子:
public static CompletableFuture divideNumber(int a, int b) { return CompletableFuture.supplyAsync(() -> { try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } return a / b; }); }public static void main(String[] args) throws ExecutionException, InterruptedException { log.debug("begin"); CompletableFuture intFuture = divideNumber(10, 2); new Thread(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } intFuture.complete(0); }).start(); CompletableFuture exceptionFuture = divideNumber(10, 2); new Thread(()->{ try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } exceptionFuture.completeExceptionally(new RuntimeException("exceptionFuture 没有执行完")); }).start(); log.debug("(没执行完就返回0) result:{}", intFuture.get()); log.debug("(没执行完就报错) result:{}",exceptionFuture.get()); }

返回结果:
18:06:56.179 [main] DEBUG pers.kerry.aservice.service.CFComplete - begin 18:06:59.245 [main] DEBUG pers.kerry.aservice.service.CFComplete - (没执行完就返回0) result:0 Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: exceptionFuture 没有执行完 at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at pers.kerry.aservice.service.CFComplete.main(CFComplete.java:50) Caused by: java.lang.RuntimeException: exceptionFuture 没有执行完 at pers.kerry.aservice.service.CFComplete.lambda$main$2(CFComplete.java:46) at java.lang.Thread.run(Thread.java:748)

3. Function方法 Function 方法,是指下面的这类方法的入参都是函数式接口 FunctionBiFunction(Bi是 Bidirectional-双向 的缩写,是指有两个入参的Function)。那么功能嘛,就和 Stream 里的 mapflatMap方法类似,返回当前内部数据经过映射转换的 CompletableFuture。因此它们都是等待上一个 CompletableFuture 完成后执行的。
下面先一一介绍功能,然后再做对比。
3.1. thenApply
thenApply 方法定义为:
public CompletableFuture thenApply( Function fn)

特点为:
  • 入参 Function T -> U,如果前者 CompletableFuture 不报错,正常完成后,会执行 thenApply 方法。
  • 入参 Function 中没有 Throwable,故没有异常处理。如果前者 CompletableFuture 报错,会直接抛出异常,并不会执行该方法。
示例代码:
public static CompletableFuture divideNumber(int a, int b) { return CompletableFuture.supplyAsync(() -> a / b); }public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture intFuture = divideNumber(10, 2) .thenApply(param -> param * 10); CompletableFuture stringFuture = divideNumber(10, 2) .thenApply(param -> "这是字符串-" + param); CompletableFuture exceptionFuture = divideNumber(10, 0) .thenApply(param -> param * 10); log.debug("intFuture result:{}", intFuture.get()); log.debug("stringFuture result:{}", stringFuture.get()); log.debug("exceptionFuture result:{}", exceptionFuture.get()); }

打印日志为:
22:39:23.478 [main] DEBUG pers.kerry.aservice.service.CFApply - intFuture result:50 22:39:23.485 [main] DEBUG pers.kerry.aservice.service.CFApply - stringFuture result:这是字符串-5 Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at pers.kerry.aservice.service.CFApply.main(CFApply.java:36) Caused by: java.lang.ArithmeticException: / by zero at pers.kerry.aservice.service.CFApply.lambda$divideNumber$0(CFApply.java:21) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

3.2. exceptionally
exceptionally 方法定义为:
public CompletableFuture exceptionally( Function fn)

特点为:
  • 入参 Function Throwable -> T,如果前者 CompletableFuture 不报错,就不会触发 exceptionally 方法。
  • 入参 Function 中有 Throwable,因此可以处理异常,前者 CompletableFuture 如果报错,会触发该方法,并且返回一个自定义应对错误的值。
示例代码:
public static CompletableFuture divideNumber(int a, int b) { return CompletableFuture.supplyAsync(() -> a / b); }public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture normalFuture = divideNumber(10, 2) .exceptionally((throwable -> { return 0; })); CompletableFuture exceptionFuture = divideNumber(10, 0) .exceptionally((throwable -> { return 0; })); log.debug("normalFuture result:{}", normalFuture.get()); log.debug("exceptionFuture result:{}", exceptionFuture.get()); }

打印日志为:
22:53:34.335 [main] DEBUG pers.kerry.aservice.service.CFExceptionally - normalFuture result:5 22:53:34.342 [main] DEBUG pers.kerry.aservice.service.CFExceptionally - exceptionFuture result:0

3.3. handle
handle 方法定义为:
public CompletableFuture handle( BiFunction fn)

特点为:
  • 入参 BiFunction T,Throwable -> U,无论是否报错,在执行完成后都会触发 handle 方法。
  • 入参 Function 中有 Throwable,因此可以处理异常,前者 CompletableFuture 如果报错,会触发该方法,并且返回一个自定义应对错误的值。
示例代码:
public static CompletableFuture divideNumber(int a, int b) { return CompletableFuture.supplyAsync(() -> a / b); }public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture intFuture = divideNumber(10, 2) .handle((value, throwable) -> value * 10); CompletableFuture stringFuture = divideNumber(10, 2) .handle((value, throwable) -> "这是字符串-" + value); CompletableFuture exceptionFuture = divideNumber(10, 0) .handle((value, throwable) -> { if (throwable != null) { return 0; } return value * 10; }); log.debug("intFuture result:{}", intFuture.get()); log.debug("stringFuture result:{}", stringFuture.get()); log.debug("exceptionFuture result:{}", exceptionFuture.get()); }

打印日志为:
23:03:08.771 [main] DEBUG pers.kerry.aservice.service.CFHandle - intFuture result:50 23:03:08.782 [main] DEBUG pers.kerry.aservice.service.CFHandle - stringFuture result:这是字符串-5 23:03:08.782 [main] DEBUG pers.kerry.aservice.service.CFHandle - exceptionFuture result:0

3.4. thenCompose
thenCompose 方法定义为:
public CompletableFuture thenCompose( Function> fn)

特点为:
  • 入参 Function T -> CompletionStage,如果前者 CompletableFuture 不报错,正常完成后,会执行 thenCompose 方法。
  • 入参 Function 中没有 Throwable,故没有异常处理。如果前者 CompletableFuture 报错,都会直接抛出异常,并不会执行该方法。
示例代码:
public static CompletableFuture divideNumber(int a, int b) { return CompletableFuture.supplyAsync(() -> a / b); }public static CompletableFuture multiplyTen(int a) { return CompletableFuture.supplyAsync(() -> a * 10); }public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture intFuture = divideNumber(10, 2) .thenCompose(CFCompose::multiplyTen); CompletableFuture exceptionFuture = divideNumber(10, 0) .thenCompose(CFCompose::multiplyTen); log.debug("intFuture result:{}", intFuture.get()); log.debug("exceptionFuture result:{}", exceptionFuture.get()); }

打印日志为:
23:47:18.967 [main] DEBUG pers.kerry.aservice.service.CFCompose - intFuture result:50 Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at pers.kerry.aservice.service.CFCompose.main(CFCompose.java:48) Caused by: java.lang.ArithmeticException: / by zero at pers.kerry.aservice.service.CFCompose.lambda$divideNumber$0(CFCompose.java:35) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

3.5. thenCombine
thenCombine 方法定义为:
public CompletableFuture thenCombine( CompletionStage other, BiFunction fn)

特点为:
  • 入参 Function T -> U,如果前者 CompletableFuture 和 CompletionStage other 不报错,都正常完成后,会执行 thenCombine 方法。
  • 入参 Function 中没有 Throwable,故没有异常处理。如果前者 CompletableFuture 或 CompletionStage other 报错,都会直接抛出异常,并不会执行该方法。
示例代码:
public static CompletableFuture divideNumber(int a, int b) { return CompletableFuture.supplyAsync(() -> a / b); }public static void main(String[] args) throws ExecutionException, InterruptedException {CompletableFuture intFuture = divideNumber(10, 2) .thenCombine(divideNumber(10, 5), (v1, v2) -> { return v1 + v2; }); CompletableFuture exceptionFuture = divideNumber(10, 2) .thenCombine(divideNumber(10, 0), (v1, v2) -> { return v1 + v2; }); log.debug("intFuture result:{}", intFuture.get()); log.debug("exceptionFuture result:{}", exceptionFuture.get()); }

打印日志为:
23:41:52.889 [main] DEBUG pers.kerry.aservice.service.CFCombine - intFuture result:7 Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at pers.kerry.aservice.service.CFCombine.main(CFCombine.java:30) Caused by: java.lang.ArithmeticException: / by zero at pers.kerry.aservice.service.CFCombine.lambda$divideNumber$0(CFCombine.java:16) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

3.6. 总结和对比
将上面介绍的几种方法各自组合起来横行对比。
1. thenApply、exceptionally、handle 对比
它们的区别在于 Function/BiFunction 的传入参数上:
  • thenApply: T -> U,只有在不报错的时候才会执行。
  • exceptionally: Throwable -> T,只有在报错的时候才会执行。
  • handle: T,Throwable -> U,无论报错或不报错的时候,都会执行。
handle 有点像是把 thenApply、exceptionally 结合起来了。
2. thenApply、thenCompose 对比
它们都是只有在不报错的时候才会执行,最终也是返回 CompletableFuture,但区别在于 Function 中的返回参数类型:
  • thenApply: Function fn)
  • thenCompose: Function> fn)
因此可以类比于 Stream 中 map 和 flatMap 方法的区别,它们 Function 中的返回参数类型:
  • map: Function mapper
  • flatMap: Function
3. thenCombine
很遗憾,thenCombine 在这个分类里面没有同行可以对比,它的功能在于结合两个 ComletableFuture 的值,最终返回一个新的 ComletableFuture。它的横向对比应该在 allOf。只不过因为属于 Function 的范畴,就放进来了。
4. Consumer方法 Consumer 方法,是指下面的这类方法的入参都是函数式接口 ConsumerBiConsumer。也都是等待上一个 CompletableFuture 完成后执行的。
4.1. thenAccept
thenAccept 方法定义为:
public CompletableFuture thenAccept(Consumer action)

特点为:
  • 入参 Consumer T,如果前者 CompletableFuture 不报错,正常完成后,会执行 thenAccept 方法。
  • 入参 Consumer 中没有 Throwable,故没有异常处理。如果前者 CompletableFuture 报错,会直接抛出异常,并不会执行该方法。
示例代码:
public static CompletableFuture divideNumber(int a, int b) { return CompletableFuture.supplyAsync(() -> a / b); }public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture intFuture = divideNumber(10, 2) .thenAccept((value) -> { log.debug("intFuture result:{}", value); }); CompletableFuture exceptionFuture = divideNumber(10, 0) .thenAccept((value) -> { log.debug("exceptionFuture result:{}", value); }); Thread.sleep(1000); }

打印日志为:
00:15:21.228 [main] DEBUG pers.kerry.aservice.service.CFAccept - intFuture result:5

4.2. whenComplete
whenComplete 方法定义为:
public CompletableFuture whenComplete( BiConsumer action)

【异步编程|异步编程 CompletableFuture】特点为:
  • 入参 BiConsumer T,Throwable -> U,无论是否报错,在执行完成后都会触发 whenComplete 方法。
  • 入参 BiConsumer 中有 Throwable,因此可以处理异常,前者 CompletableFuture 如果报错,会触发该方法。
示例代码:
public static CompletableFuture divideNumber(int a, int b) { return CompletableFuture.supplyAsync(() -> a / b); }public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture intFuture = divideNumber(10, 2) .whenComplete((value, throwable) -> { log.debug("whenComplete value:{}", value); }); CompletableFuture exceptionFuture = divideNumber(10, 0) .whenComplete((value, throwable) -> { if (throwable != null) { log.error("whenComplete 出错啦"); } }); log.debug("intFuture result:{}", intFuture.get()); log.debug("exceptionFuture result:{}", exceptionFuture.get()); }

打印日志为:
00:19:27.738 [main] DEBUG pers.kerry.aservice.service.CFWhen - whenComplete value:5 00:19:27.746 [main] ERROR pers.kerry.aservice.service.CFWhen - whenComplete 出错啦 00:19:27.746 [main] DEBUG pers.kerry.aservice.service.CFWhen - intFuture result:5 Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at pers.kerry.aservice.service.CFWhen.main(CFWhen.java:37) Caused by: java.lang.ArithmeticException: / by zero at pers.kerry.aservice.service.CFWhen.lambda$divideNumber$0(CFWhen.java:20) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

4.3. 总结对比
1. thenAccept、whenComplete 对比
它们的区别包含 Function 的入参,和方法的返回值:
  • thenAccept: 入参:T,只有在不报错的时候才会执行。返回 CompletableFuture,如果再用 get() 只有 null。
  • whenComplete: 入参:Throwable,T,无论报错或不报错的时候,都会执行。返回 CompletableFuture,即 执行 whenComplete 方法前的原 CompletableFuture
在 Function 入参上,它们的区别和前面的 thenApply、exceptionally、handle 类似。有人说,那这里怎么就没有入参只为 Throwable 的方法呢?仔细想想 whenComplete 就够用了。
其他 thenRun
前面都是有入参数的,如果后续的方法依然依赖前者,而且不需要入餐,可以试试 thenRun
5. 并行处理方法 前面说的 Function、Consumer 都是有先后执行关系,因为后面的任务依赖于前面的任务的结果。这章来聊聊并行任务的处理方法。
5.1. 两个并行
5.1.1. both 两个任务,如果我们想让它们都执行完成,如果使用传统的 future的话,我们通常是这么写的:
Future futureA = executorService.submit(() -> "resultA"); Future futureB = executorService.submit(() -> "resultB"); String resultA = futureA.get(); String resultB = futureB.get();

但其实就在上述介绍的方法里,除了已经说过的 thenCombine,还有很多 both 方法的变种能实现:
CompletableFuture cfA = CompletableFuture.supplyAsync(() -> "resultA"); CompletableFuture cfB = CompletableFuture.supplyAsync(() -> "resultB"); cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {}); cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B"); cfA.runAfterBoth(cfB, () -> {});

5.1.2. either 上述的问题,两个任务,如果我们想让它们只要有任意一个执行完成就可以了,怎么实现呢?
cfA.acceptEither(cfB, result -> {}); cfA.acceptEitherAsync(cfB, result -> {}); cfA.acceptEitherAsync(cfB, result -> {}, executorService); cfA.applyToEither(cfB, result -> {return result; }); cfA.applyToEitherAsync(cfB, result -> {return result; }); cfA.applyToEitherAsync(cfB, result -> {return result; }, executorService); cfA.runAfterEither(cfA, () -> {}); cfA.runAfterEitherAsync(cfB, () -> {}); cfA.runAfterEitherAsync(cfB, () -> {}, executorService);

上面的各个带 either 的方法,表达的都是一个意思,指的是两个任务中的其中一个执行完成,就执行指定的操作。它们几组的区别也很明显,分别用于表达是否需要任务 A 和任务 B 的执行结果,是否需要返回值。
5.2. 多个并行
如果只考虑两个任务的并行,太局限了,这里来考虑任意多个任务的并行情况,allOfanyOf 方法:
public static CompletableFuture allOf(CompletableFuture... cfs){...} public static CompletableFuture anyOf(CompletableFuture... cfs) {...}
这两个方法都非常简单,下面是 allOf 的例子:
CompletableFuture cfA = CompletableFuture.supplyAsync(() -> "resultA"); CompletableFuture cfB = CompletableFuture.supplyAsync(() -> 123); CompletableFuture cfC = CompletableFuture.supplyAsync(() -> "resultC"); CompletableFuture future = CompletableFuture.allOf(cfA, cfB, cfC); // 所以这里的 join() 将阻塞,直到所有的任务执行结束 future.join();

由于 allOf 聚合了多个 CompletableFuture 实例,所以它是没有返回值的。这也是它的一个缺点。
anyOf 也非常容易理解,就是只要有任意一个 CompletableFuture 实例执行完成就可以了,看下面的例子:
CompletableFuture cfA = CompletableFuture.supplyAsync(() -> "resultA"); CompletableFuture cfB = CompletableFuture.supplyAsync(() -> 123); CompletableFuture cfC = CompletableFuture.supplyAsync(() -> "resultC"); CompletableFuture future = CompletableFuture.anyOf(cfA, cfB, cfC); Object result = future.join();
最后一行的 join() 方法会返回最先完成的任务的结果,所以它的泛型用的是 Object,因为每个任务可能返回的类型不同。

    推荐阅读