RxJava使用(三)之Backpressure背压

所谓的Backpressure其实就是为了控制流量, 水缸存储的能力毕竟有限。

Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) {//无限循环发事件 emitter.onNext(i); } } }).subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Thread.sleep(2000); Log.d(TAG, "" + integer); } });

这段代码很简单, 上游同样无限循环的发送事件, 在下游每次接收事件前延时2秒. 上下游工作在同一个线程里, 来看下运行结果:

RxJava使用(三)之Backpressure背压
文章图片
image.png
怎么如此平静, 感觉像是走错了片场.
为什么呢, 因为上下游工作在同一个线程呀骚年们! 这个时候上游每次调用emitter.onNext(i)其实就相当于直接调用了
Consumerpublic void accept(Integer integer) throws Exception { Thread.sleep(2000); Log.d(TAG, "" + integer); }

那我们加个线程呢, 改成这样:
Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) {//无限循环发事件 emitter.onNext(i); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Thread.sleep(2000); Log.d(TAG, "" + integer); } });

【RxJava使用(三)之Backpressure背压】这个时候把上游切换到了IO线程中去, 下游到主线程去接收, 来看看运行结果呢:

RxJava使用(三)之Backpressure背压
文章图片
image.png
可以看到, 给上游加了个线程之后, 它就像脱缰的野马一样, 内存又爆掉了.
当上下游工作在同一个线程中时, 这时候是一个同步的订阅关系, 也就是说上游每发送一个事件必须等到下游接收处理完了以后才能接着发送下一个事件.
当上下游工作在不同的线程中时, 这时候是一个异步的订阅关系, 这个时候上游发送数据不需要等待下游接收, 为什么呢, 因为两个线程并不能直接进行通信, 因此上游发送的事件并不能直接到下游里去, 这个时候就需要一个田螺姑娘来帮助它们俩, 这个田螺姑娘就是我们刚才说的水缸 ! 上游把事件发送到水缸里去, 下游从水缸里取出事件来处理, 因此, 当上游发事件的速度太快, 下游取事件的速度太慢, 水缸就会迅速装满, 然后溢出来, 最后就OOM了.
我们找到了上下游流速不均衡的源头 , 在这一节里我们将学习如何去治理它 . 可能很多看过其他人写的文章的朋友都会觉得只有Flowable才能解决 , 所以大家对这个Flowable都抱有很大的期许 , 其实呐 , 你们毕竟图样图森破 , 今天我们先抛开Flowable, 仅仅依靠我们自己的双手和智慧 , 来看看我们如何去治理 , 通过本节的学习之后我们再来看Flowable, 你会发现它其实并没有想象中那么牛叉, 它只是被其他人过度神化了
方法:
一是从数量上进行治理, 减少发送进水缸里的事件 二是从速度上进行治理, 减缓事件发送进水缸的速度 上游发送的所有事件都放到水缸里了, 所以瞬间水缸就满了, 那我们可以只放我们需要的事件到水缸里呀, 只放一部分数据到水缸里, 这样不就不会溢出来了吗, 因此, 我们把上面的代码修改一下:
Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { emitter.onNext(i); } } }).subscribeOn(Schedulers.io()) .filter(new Predicate() { @Override public boolean test(Integer integer) throws Exception { return integer % 10 == 0; } }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "" + integer); } });

在这段代码中我们增加了一个filter, 只允许能被10整除的事件通过, 再来看看运行结果:

RxJava使用(三)之Backpressure背压
文章图片
image.png
这种方法归根到底其实就是减少放进水缸的事件的数量, 是以数量取胜, 但是这个方法有个缺点, 就是丢失了大部分的事件.
那么我们换一个角度来思考, 既然上游发送事件的速度太快, 那我们就适当减慢发送事件的速度, 从速度上取胜, 听上去不错, 我们来试试:
Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { for (int i = 0; ; i++) { emitter.onNext(i); Thread.sleep(2000); //每次发送完事件延时2秒 } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "" + integer); } });

这次我们让上游每次发送完事件后都延时了2秒, 来看看运行结果:

RxJava使用(三)之Backpressure背压
文章图片
image.png
可以看到, 我们给上游加上延时了之后, 瞬间一头发情的公牛就变得跟只小绵羊一样, 如此温顺, 如此平静, 如此平稳的内存线, 美妙极了. 而且事件也没有丢失, 上游通过适当的延时, 不但减缓了事件进入水缸的速度, 也可以让下游有充足的时间从水缸里取出事件来处理 , 这样一来, 就不至于导致大量的事件涌进水缸, 也就不会OOM啦.
到目前为止, 我们没有依靠任何其他的工具, 就轻易解决了上下游流速不均衡的问题.
Flowable 基本用法
Flowable upstream = Flowable.create(new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter emitter) throws Exception { Log.d(TAG, "emit 1"); emitter.onNext(1); Log.d(TAG, "emit 2"); emitter.onNext(2); Log.d(TAG, "emit 3"); emitter.onNext(3); Log.d(TAG, "emit complete"); emitter.onComplete(); } }, BackpressureStrategy.ERROR); //增加了一个参数Subscriber downstream = new Subscriber() {@Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); s.request(Long.MAX_VALUE); //注意这句代码 }@Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); }@Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); }@Override public void onComplete() { Log.d(TAG, "onComplete"); } }; upstream.subscribe(downstream);

这段代码中,分别创建了一个上游Flowable和下游Subscriber, 上下游工作在同一个线程中, 和之前的Observable的使用方式只有一点点的区别, 先来看看运行结果吧:
D/TAG: onSubscribe D/TAG: emit 1 D/TAG: onNext: 1 D/TAG: emit 2 D/TAG: onNext: 2 D/TAG: emit 3 D/TAG: onNext: 3 D/TAG: emit complete D/TAG: onComplete

结果也和我们预期的是一样的.
我们注意到这次和Observable有些不同. 首先是创建Flowable的时候增加了一个参数, 这个参数是用来选择背压,也就是出现上下游流速不均衡的时候应该怎么处理的办法, 这里我们直接用BackpressureStrategy.ERROR这种方式, 这种方式会在出现上下游流速不均衡的时候直接抛出一个异常,这个异常就是著名的MissingBackpressureException.
另外的一个区别是在下游的onSubscribe方法中传给我们的不再是Disposable了, 而是Subscription, 它俩有什么区别呢, 首先它们都是上下游中间的一个开关, 之前我们说调用Disposable.dispose()方法可以切断水管, 同样的调用Subscription.cancel()也可以切断水管, 不同的地方在于Subscription增加了一个void request(long n)方法, 这个方法有什么用呢, 在上面的代码中也有这么一句代码:
s.request(Long.MAX_VALUE);

如果取消掉request这句代码, 在上游发送第一个事件之后, 下游就抛出了一个著名的MissingBackpressureException异常, 并且下游没有收到任何其余的事件. 可是这是一个同步的订阅呀, 上下游工作在同一个线程, 上游每发送一个事件应该会等待下游处理完了才会继续发事件啊, 不可能出现上下游流速不均衡的问题呀.
带着这个疑问, 我们再来看看异步的情况:
Flowable.create(new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter emitter) throws Exception { Log.d(TAG, "emit 1"); emitter.onNext(1); Log.d(TAG, "emit 2"); emitter.onNext(2); Log.d(TAG, "emit 3"); emitter.onNext(3); Log.d(TAG, "emit complete"); emitter.onComplete(); } }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber() {@Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); mSubscription = s; }@Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); }@Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); }@Override public void onComplete() { Log.d(TAG, "onComplete"); } });

这是因为Flowable在设计的时候采用了一种新的思路也就是响应式拉取的方式来更好的解决上下游流速不均衡的问题, 与我们之前所讲的控制数量和控制速度不太一样, 这种方式用通俗易懂的话来说就好比是叶问打鬼子, 我们把上游看成小日本, 把下游当作叶问, 当调用Subscription.request(1)时, 叶问就说我要打一个! 然后小日本就拿出一个鬼子给叶问, 让他打, 等叶问打死这个鬼子之后, 再次调用request(10), 叶问就又说我要打十个! 然后小日本又派出十个鬼子给叶问, 然后就在边上看热闹, 看叶问能不能打死十个鬼子, 等叶问打死十个鬼子后再继续要鬼子接着打...
所以我们把request当做是一种能力, 当成下游处理事件的能力, 下游能处理几个就告诉上游我要几个, 这样只要上游根据下游的处理能力来决定发送多少事件, 就不会造成一窝蜂的发出一堆事件来, 从而导致OOM. 这也就完美的解决之前我们所学到的两种方式的缺陷, 过滤事件会导致事件丢失, 减速又可能导致性能损失. 而这种方式既解决了事件丢失的问题, 又解决了速度的问题。
首先第一个同步的代码, 为什么上游发送第一个事件后下游就抛出了MissingBackpressureException异常, 这是因为下游没有调用request, 上游就认为下游没有处理事件的能力, 而这又是一个同步的订阅, 既然下游处理不了, 那上游不可能一直等待吧, 如果是这样, 万一这两根水管工作在主线程里, 界面不就卡死了吗, 因此只能抛个异常来提醒我们. 那如何解决这种情况呢, 很简单啦, 下游直接调用request(Long.MAX_VALUE)就行了, 或者根据上游发送事件的数量来request就行了, 比如这里request(3)就可以了.
然后我们再来看看第二段代码, 为什么上下游没有工作在同一个线程时, 上游却正确的发送了所有的事件呢? 这是因为在Flowable里默认有一个大小为128的水缸, 当上下游工作在不同的线程中时, 上游就会先把事件发送到这个水缸中, 因此, 下游虽然没有调用request, 但是上游在水缸中保存着这些事件, 只有当下游调用request时, 才从水缸里取出事件发给下游.
在上游发送了第129个事件的时候, 就抛出了MissingBackpressureException异常, 提醒我们发洪水啦. 当然了, 这个128也不是我凭空捏造出来的, Flowable的源码中就有这个buffersize的大小定义, 可以自行查看.
注意这里我们是把上游发送的事件全部都存进了水缸里, 下游一个也没有消费, 所以就溢出了, 如果下游去消费了事件, 可能就不会导致水缸溢出来了. 这里我们说的是可能不会, 这也很好理解, 比如刚才这个例子上游发了129个事件, 下游只要快速的消费了一个事件, 就不会溢出了, 但如果下游过了十秒钟再来消费一个, 那肯定早就溢出了.
Flowable.create(new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter emitter) throws Exception { for (int i = 0; ; i++) { emitter.onNext(i); } } }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber() {@Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); mSubscription = s; }@Override public void onNext(Integer integer) { Log.d(TAG, "onNext: " + integer); }@Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); }@Override public void onComplete() { Log.d(TAG, "onComplete"); } });

按照我们以前学习Observable一样, 让上游无限循环发送事件, 下游一个也不去处理, 来看看运行结果吧:

RxJava使用(三)之Backpressure背压
文章图片
image.png
同样可以看到, 内存迅速增长, 直到最后抛出OOM. 所以说不要迷恋FLowable, 它只是个传说.
Observable的时候说到的如何解决上游发送事件太快的, 有一招叫从数量上取胜, 同样的FLowable中也有这种方法, 对应的就是BackpressureStrategy.DROP和BackpressureStrategy.LATEST这两种策略.
Drop就是直接把存不下的事件丢弃 Latest就是只保留最新的事件

FLowable的策略我们也讲完了, 有些朋友要问了, 这些FLowable是我自己创建的, 所以我可以选择策略, 那面对有些FLowable并不是我自己创建的, 该怎么办呢? 比如RxJava中的interval操作符, 这个操作符并不是我们自己创建的, 来看下面这个例子吧:
Flowable.interval(1, TimeUnit.MICROSECONDS) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); mSubscription = s; s.request(Long.MAX_VALUE); }@Override public void onNext(Long aLong) { Log.d(TAG, "onNext: " + aLong); try { Thread.sleep(1000); //延时1秒 } catch (InterruptedException e) { e.printStackTrace(); } }@Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); }@Override public void onComplete() { Log.d(TAG, "onComplete"); } });

interval操作符发送Long型的事件, 从0开始, 每隔指定的时间就把数字加1并发送出来, 在这个例子里, 我们让它每隔1毫秒就发送一次事件, 在下游延时1秒去接收处理, 不用猜也知道结果是什么:
zlc.season.rxjava2demo D/TAG: onSubscribe zlc.season.rxjava2demo W/TAG: onError: io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:87) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:278) atjava.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:273) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) at java.lang.Thread.run(Thread.java:761)

一运行就抛出了MissingBackpressureException异常, 提醒我们发太多了, 那么怎么办呢, 这个又不是我们自己创建的FLowable啊...
别慌, 虽然不是我们自己创建的, 但是RxJava给我们提供了其他的方法:
onBackpressureBuffer() onBackpressureDrop() onBackpressureLatest()

用法也简单, 拿刚才的例子现学现用:
Flowable.interval(1, TimeUnit.MICROSECONDS) .onBackpressureDrop()//加上背压策略 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber() { @Override public void onSubscribe(Subscription s) { Log.d(TAG, "onSubscribe"); mSubscription = s; s.request(Long.MAX_VALUE); }@Override public void onNext(Long aLong) { Log.d(TAG, "onNext: " + aLong); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }@Override public void onError(Throwable t) { Log.w(TAG, "onError: ", t); }@Override public void onComplete() { Log.d(TAG, "onComplete"); } });

Flowable的响应式拉取

    推荐阅读