reactive|reactive stream 响应式流
1 初识Reactive Stream
反应式流
2015 年反应式流 (Reactive Stream) 规范诞生,定义了如下四个接口:
- Subscription 接口定义了连接发布者和订阅者的方法 - Publisher
接口定义了发布者的方法 - Subscriber 接口定义了订阅者的方法 - Processor 接口定义了处理器
下图展示了订阅者与发布者交互的典型场景:
文章图片
image
文章图片
image RS 基于流进行处理可以更高效地使用内存,把业务逻辑从模板代码中抽离出来,把代码从并发、同步问题中解脱出来,同时还可以提高代码的可读性。
RS 在某些方面是迭代器模式和观察者模式的结合,同时存在数据的 Pull 和 Push。 订阅者先请求 N 个项目,然后发布者推送最多 N 个项目给订阅者。
文章图片
image
文章图片
image Java 9 中的 Flow 类定义了反应式编程的API。 实际上就是拷贝了 RS 的四个接口定义,然后放在 java.util.concurrent.Flow 类中。 Java 9 提供了
SubmissionPublisher
和 ConsumerSubscriber
两个默认实现。Java 8 引入了 Stream 用于流的操作,Java 9 引入的 Flow 也是数据流的操作。 相比之下
- Stream 更侧重于流的过滤、映射、整合、收集 - 而 Flow 更侧重于流的产生与消费(下面的代码基于JDK11)
Subscription 用于连接 Publisher 和 Subscriber。 Subscriber 只有在请求时才会收到项目,并可以通过 Subscription 取消订阅。 Subscription 主要有两个方法:
- request:订阅者调用此方法请求数据 - cancel:订阅者调用这个方法来取消订阅,解除订阅者与发布者之间的关系
public static interface Flow.Subscription { public void request(long n);
public void cancel();
}
(2) 发布者 Publisher
Publisher 将数据流发布给注册的 Subscriber。 它通常使用 Executor 异步发布项目给订阅者。 Publisher 需要确保每个订阅的 Subscriber 方法严格按顺序调用。
- subscribe:订阅者订阅发布者
@FunctionalInterface public static interface Flow.Publisher { public void subscribe(Subscriber super T> subscriber);
}
(3) 订阅者 Subscriber
Subscriber 订阅 Publisher 的数据流,并接受回调。 如果 Subscriber 没有发出请求,就不会收到数据。 对于给定订阅(Subscription),调用 Subscriber 的方法是严格按顺序的。
- onSubscribe:发布者调用订阅者的这个方法来异步传递订阅 - onNext:发布者调用这个方法传递数据给订阅者 - onError:当 Publisher 或 Subscriber 遇到不可恢复的错误时调用此方法,之后不会再调用其他方法 - onComplete:当数据已经发送完成,且没有错误导致订阅终止时,调用此方法,之后不再调用其他方法
public static interface Subscriber { public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
(4) 处理器 Processor
Processor 位于 Publisher 和 Subscriber 之间,用于做数据转换。可以有多个 Processor 同时使用,组成一个处理链,链中最后一个处理器的处理结果发送给 Subscriber。JDK 没有提供任何具体的处理器。处理器同时是订阅者和发布者,接口的定义也是继承了两者,作为订阅者接收数据,然后进行处理,处理完后作为发布者,再发布出去。
(5) 背压 back pressure
Subscriber 向 Publisher 请求消息,并通过提供的回调方法被激活调用。如果 Publisher 的处理能力比 Subscriber 强得多,需要有一种机制使得 Subscriber 可以通知 Publisher 降低生产速度。Publisher 实现这种功能的机制被称为背压。提供数据生产者和消费者的消息机制,协调它们之间的产销失衡的情况。 Java 9 中的 Flow API 没有提供任何 API 来发信号或者处理背压,需要开发者自行处理背压。jdk 官方建议参考 RxJava 的背压处理方式。
(6) 事件顺序
反应式流中的事件顺序: a.创建发布者和订阅者,分别是 Publisher 和 Subscriber 的实例 b.订阅者调用发布者的 subscribe 进行订阅 c.发布者调用订阅者的 onSubscribe 传递订阅 Subscription d.订阅者调用 Subscription 的 request 方法请求数据 e.发布者调用订阅者的 onNext 方法传递数据给订阅者 f.数据传递完成后发布者调用订阅者的 onComplete 方法通知完成
参考 反应式流 - Reactive Stream
1 初识Reactive Stream 反应式流 2015 年反应式流 (Reactive Stream) 规范诞生,定义了如下四个接口:
- Subscription 接口定义了连接发布者和订阅者的方法
- Publisher 接口定义了发布者的方法
- Subscriber 接口定义了订阅者的方法
- Processor
接口定义了处理器
【reactive|reactive stream 响应式流】
下图展示了订阅者与发布者交互的典型场景:
文章图片
image RS 基于流进行处理可以更高效地使用内存,把业务逻辑从模板代码中抽离出来,把代码从并发、同步问题中解脱出来,同时还可以提高代码的可读性。
RS 在某些方面是迭代器模式和观察者模式的结合,同时存在数据的 Pull 和 Push。
订阅者先请求 N 个项目,然后发布者推送最多 N 个项目给订阅者。
文章图片
image Java 9 中的 Flow 类定义了反应式编程的API。
实际上就是拷贝了 RS 的四个接口定义,然后放在 java.util.concurrent.Flow 类中。
Java 9 提供了
SubmissionPublisher
和 ConsumerSubscriber
两个默认实现。Java 8 引入了 Stream 用于流的操作,Java 9 引入的 Flow 也是数据流的操作。
相比之下
- Stream 更侧重于流的过滤、映射、整合、收集
- 而 Flow 更侧重于流的产生与消费(下面的代码基于JDK11)
Subscription 用于连接 Publisher 和 Subscriber。
Subscriber 只有在请求时才会收到项目,并可以通过 Subscription 取消订阅。
Subscription 主要有两个方法:
- request:订阅者调用此方法请求数据
- cancel:订阅者调用这个方法来取消订阅,解除订阅者与发布者之间的关系
public static interface Flow.Subscription {
public void request(long n);
public void cancel();
}
(2) 发布者 Publisher
Publisher 将数据流发布给注册的 Subscriber。
它通常使用 Executor 异步发布项目给订阅者。
Publisher 需要确保每个订阅的 Subscriber 方法严格按顺序调用。
- subscribe:订阅者订阅发布者
@FunctionalInterface
public static interface Flow.Publisher {
public void subscribe(Subscriber super T> subscriber);
}
(3) 订阅者 Subscriber
Subscriber 订阅 Publisher 的数据流,并接受回调。 如果 Subscriber 没有发出请求,就不会收到数据。
对于给定订阅(Subscription),调用 Subscriber 的方法是严格按顺序的。
- onSubscribe:发布者调用订阅者的这个方法来异步传递订阅
- onNext:发布者调用这个方法传递数据给订阅者
- onError:当 Publisher 或 Subscriber 遇到不可恢复的错误时调用此方法,之后不会再调用其他方法
- onComplete:当数据已经发送完成,且没有错误导致订阅终止时,调用此方法,之后不再调用其他方法
public static interface Subscriber {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
(4) 处理器 Processor
Processor 位于 Publisher 和 Subscriber 之间,用于做数据转换。可以有多个 Processor 同时使用,组成一个处理链,链中最后一个处理器的处理结果发送给 Subscriber。JDK 没有提供任何具体的处理器。处理器同时是订阅者和发布者,接口的定义也是继承了两者,作为订阅者接收数据,然后进行处理,处理完后作为发布者,再发布出去。
(5) 背压 back pressure
Subscriber 向 Publisher 请求消息,并通过提供的回调方法被激活调用。如果 Publisher 的处理能力比 Subscriber 强得多,需要有一种机制使得 Subscriber 可以通知 Publisher 降低生产速度。Publisher 实现这种功能的机制被称为背压。提供数据生产者和消费者的消息机制,协调它们之间的产销失衡的情况。 Java 9 中的 Flow API 没有提供任何 API 来发信号或者处理背压,需要开发者自行处理背压。jdk 官方建议参考 RxJava 的背压处理方式。
(6) 事件顺序
反应式流中的事件顺序: a.创建发布者和订阅者,分别是 Publisher 和 Subscriber 的实例 b.订阅者调用发布者的 subscribe 进行订阅 c.发布者调用订阅者的 onSubscribe 传递订阅 Subscription d.订阅者调用 Subscription 的 request 方法请求数据 e.发布者调用订阅者的 onNext 方法传递数据给订阅者 f.数据传递完成后发布者调用订阅者的 onComplete 方法通知完成
参考 反应式流 - Reactive Stream
推荐阅读
- (七)谈条件
- 关于响应式编程的十个问题
- Vue源码分析—响应式原理(二)
- Stream详解
- http请求与响应
- kylin-stream|kylin-stream source
- 就地过年!浙江这两地发通告后,有企业发双倍工资响应
- iOS|iOS 响应者及响应者链
- springboot3+r2dbc响应式编程实践
- FastAPI传输响应文件