Reactor|Reactor 之 多任务并发执行,结果按顺序返回第一个

1 场景 调用多个平级服务,按照服务优先级返回第一个有效数据。
具体case:一个页面可能有很多的弹窗,弹窗之间又有优先级。每次只需要返回第一个有数据的弹窗。但是又希望所有弹窗之间的数据获取是异步的。这种场景使用 Reactor 怎么实现呢?
2 创建 service 2.1 创建基本接口和实体类

public interface TestServiceI { Mono request(); }

提供一个 request 方法,返回一个 Mono 对象。
@Data @ToString @AllArgsConstructor @NoArgsConstructor public class TestUser { private String name; }

2.2 创建 service 实现
@Slf4j public class TestServiceImpl1 implements TestServiceI { @Override public Mono request() { log.info("execute.test.service1"); return Mono.fromSupplier(() -> { try { System.out.println("service1.threadName=" + Thread.currentThread().getName()); Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } return ""; }) .map(name -> { return new TestUser(name); }); } }

【Reactor|Reactor 之 多任务并发执行,结果按顺序返回第一个】第一个 service 执行耗时 500ms。返回空对象;
创建第二个 service 执行耗时 1000ms。返回空对象;代码如上,改一下sleep时间即可。
继续创建第三个 service 执行耗时 1000ms。返回 name3。代码如上,改一下 sleep 时间,以及返回为 name3。
3 主体方法
public static void main(String[] args) { long startTime = System.currentTimeMillis(); TestServiceI testServiceImpl4 = new TestServiceImpl4(); TestServiceI testServiceImpl5 = new TestServiceImpl5(); TestServiceI testServiceImpl6 = new TestServiceImpl6(); List serviceIList = new ArrayList<>(); serviceIList.add(testServiceImpl4); serviceIList.add(testServiceImpl5); serviceIList.add(testServiceImpl6); // 执行 service 列表,这样有多少个 service 都可以 Flux> monoFlux = Flux.fromIterable(serviceIList) .map(service -> { return service.request(); }); // flatMap(或者flatMapSequential) + map 实现异常继续下一个执行 Flux flux = monoFlux.flatMapSequential(mono -> { return mono.map(user -> { TestUser testUser = JsonUtil.parseJson(JsonUtil.toJson(user), TestUser.class); if (Objects.nonNull(testUser) && StringUtils.isNotBlank(testUser.getName())) { return testUser; } // null 在 reactor 中是异常数据。 return null; }) .onErrorContinue((err, i) -> { log.info("onErrorContinue={}", i); }); }); Mono mono = flux.elementAt(0, Mono.just("")); Object block = mono.block(); System.out.println(block + "blockFirst 执行耗时ms:" + (System.currentTimeMillis() - startTime)); }

1、Flux.fromIterable 执行 service 列表,可以随意增删 service 服务。
2、flatMap(或者flatMapSequential) + map + onErrorContinue 实现异常继续下一个执行。具体参考:
Reactor 之 onErrorContinue 和 onErrorResume
3、Mono mono = flux.elementAt(0, Mono.just("")); 返回第一个正常数据。
执行输出:
20:54:26.512 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework 20:54:26.553 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1 service1.threadName=main 20:54:27.237 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=) 20:54:27.237 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2 service5.threadName=main 20:54:28.246 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=) 20:54:28.246 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3 service6.threadName=main TestUser(name=name3)blockFirst 执行耗时ms:2895

1、service1 和 service2 因为返回空,所以继续下一个,最终返回 name3。
2、查看总耗时:2895ms。service1 耗时 500,service2 耗时1000,service3 耗时 1000。发现耗时基本上等于 service1 + service2 + service3 。这是怎么回事呢?查看返回执行的线程,都是 main。
总结:这样实现按照顺序返回第一个正常数据。但是执行并没有异步。下一步:如何实现异步呢?
4 实现异步 4.1 subcribeOn 实现异步
修改 service 实现。增加 .subscribeOn(Schedulers.boundedElastic())
如下:
@Slf4j public class TestServiceImpl1 implements TestServiceI { @Override public Mono request() { log.info("execute.test.service1"); return Mono.fromSupplier(() -> { try { System.out.println("service1.threadName=" + Thread.currentThread().getName()); Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } return ""; }) //增加subscribeOn .subscribeOn(Schedulers.boundedElastic()) .map(name -> { return new TestUser(name); }); } }

再次执行输出如下:
21:02:04.213 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework 21:02:04.265 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1 service4.threadName=boundedElastic-1 21:02:04.300 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2 21:02:04.302 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3 service2.threadName=boundedElastic-2 service3.threadName=boundedElastic-3 21:02:04.987 [boundedElastic-1] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=) 21:02:05.307 [boundedElastic-2] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=) TestUser(name=name6)blockFirst 执行耗时ms:1242

1、发现具体实现 sleep 的线程都不是 main 线程,而是 boundedElastic
2、最终执行耗时 1242ms,只比执行时间最长的 service2 和 service3 耗时 1000ms,多一些。证明是异步了。
4.2 CompletableFuture 实现异步
修改 service 实现,使用 CompletableFuture 执行耗时操作(这里是sleep,具体到项目中可能是外部接口调用,DB 操作等);然后使用 Mono.fromFuture 返回 Mono 对象。
@Slf4j public class TestServiceImpl1 implements TestServiceI{ @Override public Mono request() { log.info("execute.test.service1"); CompletableFuture uCompletableFuture = CompletableFuture.supplyAsync(() -> { try { System.out.println("service1.threadName=" + Thread.currentThread().getName()); Thread.sleep(500); } catch (InterruptedException e) { throw new RuntimeException(e); } return "testname1"; }); return Mono.fromFuture(uCompletableFuture).map(name -> { return new TestUser(name); }); } }

执行返回如下:
21:09:59.465 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework 21:09:59.510 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2 service2.threadName=ForkJoinPool.commonPool-worker-1 21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3 service3.threadName=ForkJoinPool.commonPool-worker-2 21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1 service1.threadName=ForkJoinPool.commonPool-worker-3 21:10:00.526 [ForkJoinPool.commonPool-worker-1] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=) 21:10:00.538 [ForkJoinPool.commonPool-worker-2] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=) TestUser(name=testname1)blockFirst 执行耗时ms:1238

1、耗时操作都是使用 ForkJoinPool 线程池中的线程执行。
2、最终耗时和方法1基本差不多。
大家都去试试吧~
相关链接:
Reactor 之 onErrorContinue 和 onErrorResume
Reactor 之 flatMap vs map 详解

    推荐阅读