JDK9新特性|JDK9新特性 Reactive Stream 响应式流

JDK9新特性 Reactive Stream 响应式流 ?本篇主要讲解 JDK9特性 Reactive Stream 响应式流,介绍 Reactive Stream是什么 背压是什么,以及JDK9中提供的关于Reactive Stream的接口和 2个使用案例包括如何使用Processor。
?1.Reactive Stream 概念
?Reactive Stream (响应式流/反应流) 是JDK9引入的一套标准,是一套基于发布/订阅模式的数据处理规范。响应式流从2013年开始,作为提供非阻塞背压的异步流处理标准的倡议。 它旨在解决处理元素流的问题——如何将元素流从发布者传递到订阅者,而不需要发布者阻塞,或订阅者有无限制的缓冲区或丢弃。更确切地说,Reactive流目的是“找到最小的一组接口,方法和协议,用来描述必要的操作和实体以实现这样的目标:以非阻塞背压方式实现数据的异步流”。
?反应式流 (Reactive Stream) 规范诞生,定义了如下四个接口:

Subscription 接口定义了连接发布者和订阅者的方法 Publisher 接口定义了发布者的方法 Subscriber 接口定义了订阅者的方法 Processor 接口定义了处理器

?Reactive Stream 规范诞生后,RxJava 从 RxJava 2 开始实现 Reactive Stream 规范 , 同时 Spring提供的Reactor 框架(WebFlux的基础) 等也相继实现了 Reactive Stream 规范
?下图展示了订阅者和发布者之间的交互
JDK9新特性|JDK9新特性 Reactive Stream 响应式流
文章图片
Xnip20200225_131010.png ?2.背压(back pressure)概念
?如果生产者发出的信息比消费者能够处理消息最大量还要多,消费者可能会被迫一直在抓消息,耗费越来越多的资源,埋下潜在的崩溃风险。为了防止这一点,需要有一种机制使消费者可以通知生产者,降低消息的生成速度。生产者可以采用多种策略来实现这一要求,这种机制称为背压。
?简单来说就是
  • 背压指的发布者和订阅者之间的互动
  • 订阅者可以告诉发布者自己需要多少数据,可以调节数据流量,不会导致发布者发布数据过多导致数据浪费或压垮订阅者
?3.JDK9中 Reactive Stream规范的实现
?JDK9中Reactive Stream的实现规范 通常被称为 Flow API ,通过java.util.concurrent.Flow 和java.util.concurrent.SubmissionPublisher 类来实现响应式流
?在JDK9里Reactive Stream的主要接口声明在Flow类里,Flow 类中定义了四个嵌套的静态接口,用于建立流量控制的组件,发布者在其中生成一个或多个供订阅者使用的数据项:
  • Publisher:数据项发布者、生产者
  • Subscriber:数据项订阅者、消费者
  • Subscription:发布者与订阅者之间的关系纽带,订阅令牌
  • Processor:数据处理器


? JDK9新特性|JDK9新特性 Reactive Stream 响应式流
文章图片
Xnip20200225_132212.png ??3.1 发布者 Publisher ??Publisher 将数据流发布给注册的 Subscriber。 它通常使用 Executor 异步发布项目给订阅者。 Publisher 需要确保每个订阅的 Subscriber 方法严格按顺序调用。
  • subscribe:订阅者订阅发布者
@FunctionalInterface public static interface Flow.Publisher { public void subscribe(Subscriber subscriber); }

??3.2 订阅者 Subscriber ??Subscriber 订阅 Publisher 的数据流,并接受回调。 如果 Subscriber 没有发出请求,就不会收到数据。对于给定 订阅合同(Subscription),调用 Subscriber 的方法是严格按顺序的。
  • onSubscribe:发布者调用订阅者的这个方法来异步传递订阅 , 这个方法在 publisher.subscribe方法调用后被执行
  • onNext:发布者调用这个方法传递数据给订阅者
  • onError:当 Publisher 或 Subscriber 遇到不可恢复的错误时调用此方法,之后不会再调用其他方法
  • onComplete:当数据已经发送完成,且没有错误导致订阅终止时,调用此方法,之后不再调用其他方法
??3.3 订阅合同 Subscription ??Subscription 用于连接 Publisher 和 Subscriber。Subscriber 只有在请求时才会收到项目,并可以通过 Subscription 取消订阅。Subscription 主要有两个方法:
  • request:订阅者调用此方法请求数据
  • cancel:订阅者调用这个方法来取消订阅,解除订阅者与发布者之间的关系
    public static interface Flow.Subscription { public void request(long n); public void cancel(); }

??3.4 处理器 Processor ??Processor 位于 Publisher 和 Subscriber 之间,用于做数据转换。可以有多个 Processor 同时使用,组成一个处理链,链中最后一个处理器的处理结果发送给 Subscriber。JDK 没有提供任何具体的处理器。处理器同时是订阅者和发布者,接口的定义也是继承了两者 即作为订阅者也作为发布者 ,作为订阅者接收数据,然后进行处理,处理完后作为发布者,再发布出去。
/** * A component that acts as both a Subscriber and Publisher. * * @param the subscribed item type * @param the published item type */ public static interface Processor extends Subscriber, Publisher { }

JDK9新特性|JDK9新特性 Reactive Stream 响应式流
文章图片
Xnip20200225_133449.png ?4.JDK9 中Reactive Stream(Flow API )规范调用流程
?Publisher是能够发出元素的发布者,Subscriber是接收元素并做出响应的订阅者。当执行Publisher里的subscribe方法时,发布者会回调订阅者的onSubscribe方法,这个方法中,通常订阅者会借助传入的Subscription向发布者请求n个数据。然后发布者通过不断调用订阅者的onNext方法向订阅者发出最多n个数据。如果数据全部发完,则会调用onComplete告知订阅者流已经发完;如果有错误发生,则通过onError发出错误数据,同样也会终止流。
?其中,Subscription相当于是连接Publisher和Subscriber的“纽带(合同)”。因为当发布者调用subscribe方法注册订阅者时,会通过订阅者的回调方法onSubscribe传入Subscription对象,之后订阅者就可以使用这个Subscription对象的request方法向发布者“要”数据了。背压机制正是基于此来实现的。
JDK9新特性|JDK9新特性 Reactive Stream 响应式流
文章图片
Xnip20200225_133733.png ?5.案例一 响应式基础使用案例
??5.1 以下代码简单演示了SubmissionPublisher 和这套发布-订阅框架的基本使用方式: ??注意要使用JDK9以上的版本
/** * @author johnny * @create 2020-02-24 下午5:44 **/ @Slf4j public class ReactiveStreamTest {public static void main(String[] args) throws InterruptedException {//1.创建 生产者Publisher JDK9自带的 实现了Publisher接口 SubmissionPublisher publisher = new SubmissionPublisher<>(); //2.创建 订阅者 Subscriber,需要自己去实现内部方法Flow.Subscriber subscriber = new Flow.Subscriber<>() {private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; System.out.println("订阅成功。。"); subscription.request(1); System.out.println("订阅方法里请求一个数据"); }@Override public void onNext(Integer item) { log.info("【onNext 接受到数据 item : {}】 ", item); subscription.request(1); }@Override public void onError(Throwable throwable) { log.info("【onError 出现异常】"); subscription.cancel(); }@Override public void onComplete() { log.info("【onComplete 所有数据接收完成】"); } }; //3。发布者和订阅者 建立订阅关系 就是回调订阅者的onSubscribe方法传入订阅合同 publisher.subscribe(subscriber); //4.发布者 生成数据 for (int i = 1; i <= 5; i++) { log.info("【生产数据 {} 】", i ); //submit是一个阻塞方法,此时会调用订阅者的onNext方法 publisher.submit(i); }//5.发布者 数据都已发布完成后,关闭发送,此时会回调订阅者的onComplete方法 publisher.close(); //主线程睡一会 Thread.currentThread().join(100000); } }

?打印输出结果
JDK9新特性|JDK9新特性 Reactive Stream 响应式流
文章图片
Xnip20200225_134439.png ?看结果好像我们看不出来Reactive Stream有什么用 ,其实关键点在 publisher.submit(i); submit它是一个阻塞方法
让我们把代码修改一点
1.将onNext添加耗时操作,模拟业务耗时逻辑
2.增加发布者发布数据的数量,模拟真实场景 无限数据
@Override public void onNext(Integer item) { log.info("【onNext 接受到数据 item : {}】 ", item); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } subscription.request(1); }//发布者 生成数据 for (int i = 1; i <= 1000; i++) { log.info("【生产数据 {} 】", i ); //submit是一个阻塞方法,此时会调用订阅者的onNext方法 publisher.submit(i); }

?直接看打印
?会发现发布者 生成数据到256后就会停止生产,这是因为publisher.submit(i)方法是阻塞的,
内部有个缓冲数组最大容量就是256,只有当订阅者发送 subscription.request(1); 请求后,才会从缓冲数组里拿按照顺序拿出数据传给 onNext方法 供订阅者处理,当subscription.request(1)这个方法被调用后,发布者发现数组里没有满才会再生产数据,这样就防止了生产者一次生成过多的数据把订阅者压垮,从而实现了背压机制
JDK9新特性|JDK9新特性 Reactive Stream 响应式流
文章图片
Xnip20200225_135111.png ?6.案例二 响应式带 Processor 使用案例
??6.1创建自定义Processor
package com.johnny.webflux.webfluxlearn.reactivestream; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; /** * 自定义 Processor * * @author johnny * @create 2020-02-25 下午1:56 **/ @Slf4j public class MyProcessor extends SubmissionPublisher implements Flow.Processor {private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { log.info("【Processor 收到订阅请求】"); //保存订阅关系,需要用它来给发布者 相应 this.subscription = subscription; this.subscription.request(1); }@Override public void onNext(Integer item) { log.info("【onNext 收到发布者数据: {} 】", item); //做业务处理。。 if (item % 2 == 0) { //筛选偶数 发送给 订阅者 this.submit(item); } this.subscription.request(1); }@Override public void onError(Throwable throwable) { // 我们可以告诉发布者, 后面不接受数据了 this.subscription.cancel(); }@Override public void onComplete() { log.info("【处理器处理完毕】"); this.close(); } }

??6.2 运行demo 关联publisher 和 Processor 和 subscriber
package com.johnny.webflux.webfluxlearn.reactivestream; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Flow; import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.TimeUnit; /** * 带Processor的案例 * * @author johnny * @create 2020-02-25 下午2:17 **/ @Slf4j public class ProcessorDemo {public static void main(String[] args) throws InterruptedException {//创建发布者 SubmissionPublisher publisher = new SubmissionPublisher<>(); //创建 Processor 即是发布者也是订阅者 MyProcessor myProcessor = new MyProcessor(); //创建最终订阅者 Flow.Subscriber subscriber = new Flow.Subscriber<>() {private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; this.subscription.request(1); }@Override public void onNext(Integer item) { log.info("【onNext 从Processor 接受到过滤后的 数据 item : {}】 ", item); this.subscription.request(1); }@Override public void onError(Throwable throwable) { log.info("【onError 出现异常】"); subscription.cancel(); }@Override public void onComplete() { log.info("【onComplete 所有数据接收完成】"); } }; //建立关系 发布者和处理器, 此时处理器扮演 订阅者 publisher.subscribe(myProcessor); //建立关系 处理器和订阅者此时处理器扮演 myProcessor.subscribe(subscriber); //发布者发布数据publisher.submit(1); publisher.submit(2); publisher.submit(3); publisher.submit(4); publisher.close(); TimeUnit.SECONDS.sleep(2); } }

JDK9新特性|JDK9新特性 Reactive Stream 响应式流
文章图片
Xnip20200225_143039.png ?7.总结
?本篇主要讲解 JDK9特性 Reactive Stream 响应式流,介绍 Reactive Stream是什么 背压是什么,以及JDK9中提供的关于Reactive Stream的接口和 2个使用案例包括如何使用Processor。
?只需要关注JDK9提供的 4个接口,以及内部的方法,对着案例敲一遍代码 其实流程还是很简单的 加油吧!!!
【JDK9新特性|JDK9新特性 Reactive Stream 响应式流】个人博客网站 https://www.askajohnny.com 欢迎来访问!
本文由博客一文多发平台 OpenWrite 发布!

    推荐阅读