Android异步框架RxJava 1.x系列 - 观察者模式及实现

笛里谁知壮士心,沙头空照征人骨。这篇文章主要讲述Android异步框架RxJava 1.x系列 - 观察者模式及实现相关的知识,希望能为你提供帮助。
android异步框架Rxjava 1.x系列(一) - 观察者模式及实现前言RxJava  是一款基于  Java VM  实现的响应式编程扩展库 - 基于观察者模式的异步和事件处理框架。RxJava  官方目前同时维护了两个版本,分别是  1.x  和  2.x,区别是它们使用不同的  group id  和  namespaces
 

Android异步框架RxJava 1.x系列 - 观察者模式及实现

文章图片
 
版本group idnamespaces
v1.x io.reactivex io.reactivex
v2.x io.reactivex.rxjava2 rx
本系列的文章将针对  RxJava 1.x  进行介绍,先给出  Github  的地址:
  • RxJava:github.com/ReactiveX/R…
  • RxAndroid:github.com/ReactiveX/R…
通过 Gradle 引入相关依赖:
compile ‘io.reactivex:rxjava:1.0.14‘ compile ‘io.reactivex:rxandroid:1.0.1‘ 复制代码

正文 1. RxJava的定义一个精准的解释如下:RxJava  是一个运行于  Java VM  ,由可观测序列组成的,异步、基于事件的函数库。
2. RxJava的优点换句话说,『同样是做异步,为什么人们用它,而不用现成的  AsyncTask  /  Handler  /  XXX  / ... ?』
一个词:简洁。
异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。  Android  创造的  AsyncTask 和Handler,其实都是为了让异步代码更加简洁。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。
 
Android异步框架RxJava 1.x系列 - 观察者模式及实现

文章图片
 
在  Android  开发中,假设有这样一个需求:界面上有一个自定义的视图  imageCollectorView,它的作用是显示多张图片,并能使用 addImage(Bitmap) 方法来任意增加显示的图片。现在需要程序将一个给出的目录数组  File[] folders  中每个目录下的  png  图片都加载出来并显示在  imageCollectorView  中。
注意: 由于读取图片的过程较为耗时,需要放在后台执行,而图片的显示则必须在 UI 线程执行。
常用的实现方式有多种,这里给出其中一种:
new Thread() { @Override public void run() { super.run(); for (File folder : folders) { File[] files = folder.listFiles(); for (File file : files) { if (file.getName().endsWith(".png")) { final Bitmap bitmap = getBitmapFromFile(file); getActivity().runOnUiThread(new Runnable() { @Override public void run() { imageCollectorView.addImage(bitmap); } }); } } } } }.start(); 复制代码

而如果使用  RxJava,实现方式是这样的:
Observable.from(folders) .flatMap(new Func1< File, Observable< File> > () { @Override public Observable< File> call(File file) { return Observable.from(file.listFiles()); } }) .filter(new Func1< File, Boolean> () { @Override public Boolean call(File file) { return file.getName().endsWith(".png"); } }) .map(new Func1< File, Bitmap> () { @Override public Bitmap call(File file) { return getBitmapFromFile(file); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1< Bitmap> () { @Override public void call(Bitmap bitmap) { imageCollectorView.addImage(bitmap); } }); 复制代码

可以发现,使用 RxJava 方式代码量明显大大增加,所谓简洁从何而来?
这里说的简洁是指的逻辑上的。观察一下你会发现,RxJava  的这个实现,是一条从上到下的链式调用,没有任何嵌套,这在逻辑的简洁性上是具有优势的。当需求变得复杂时,这种优势将更加明显(试想如果还要求只选取前 10 张图片,常规方式要怎么办?如果有更多这样那样的要求呢?再试想,在这一大堆需求实现完两个月之后需要改功能,当你翻回这里看到自己当初写下的那一片迷之缩进,你能保证自己将迅速看懂,而不是对着代码重新捋一遍思路?)。
另外,如果你的  IDE  是  Android Studio,其实每次打开某个  Java  文件的时候,你会看到被自动  Lambda  化的预览,这将让你更加清晰地看到程序逻辑:
Observable.from(folders) .flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) }) .filter((Func1) (file) -> { file.getName().endsWith(".png") }) .map((Func1) (file) -> { getBitmapFromFile(file) }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) }); 复制代码

所以,RxJava  有啥优点?就好在简洁,优点就是把复杂逻辑,通过函数式编程模型穿成一条线。
3. 观察者模式的扩展RxJava  的异步实现,是通过一种扩展的观察者模式来实现的。
3.1. 通用的观察者模式
观察者模式面向的需求是:A  对象(观察者)对  B  对象(被观察者)的某种变化高度敏感,需要在  B  变化的一瞬间做出反应。
举个例子,新闻里喜闻乐见的警察抓小偷,警察需要在小偷伸手作案的时候实施抓捕。在这个例子里,警察是观察者,小偷是被观察者,警察需要时刻盯着小偷的一举一动,才能保证不会漏过任何瞬间。
程序的观察者模式略有不同,观察者不需要时刻盯着被观察者(例如  A  不需要每过  2ms  就检查一次  B  的状态),而是采用注册(  Register  )或者称为订阅(Subscribe)的方式,告诉被观察者:我需要你的某种状态,你要在它变化的时候通知我。
采取这样被动的观察方式,既省去了反复检索状态的资源消耗,也能够得到最高的反馈速度。
Android  开发中一个典型的例子是点击监听器  OnClickListener  。对设置  OnClickListener来说,View  是被观察者,OnClickListener  是观察者,二者通过  setOnClickListener()  方法达成订阅关系。订阅之后用户点击按钮的瞬间,Android Framework  就会将点击事件发送给已注册的  OnClickListener  。
OnClickListener  的观察者模式大致如下图:
 
Android异步框架RxJava 1.x系列 - 观察者模式及实现

文章图片
 
如图所示,通过  setOnClickListener()  方法,Button  持有  OnClickListener  的引用(这一过程没有在图上画出)。当用户点击时,Button  自动调用  OnClickListener  的  onClick()  方法。
按照观察者模式抽象出来的各个概念:
  • Button: 被观察者
  • OnClickListener: 观察者
  • setOnClickListener(): 订阅
  • onClick(): 事件处理
就由专用的观察者模式转变成了通用的观察者模式,如下图:
 
Android异步框架RxJava 1.x系列 - 观察者模式及实现

文章图片
 
3.2. RxJava的观察者模式
RxJava  有四个基本概念:
  • Observable: 可观察者,即被观察者
  • Observer: 观察者
  • Subscribe: 订阅
  • Event: 事件处理
Observable  和  Observer  通过  subscribe()  方法实现订阅关系,使得Observable  可以在需要的时候发出事件来通知  Observer
与传统观察者模式不同,RxJava  的事件回调方法除了普通事件  onNext()  (相当于  onClick()) 之外,还定义了两个特殊的事件:onCompleted()  和  onError()
  • onCompleted(): 事件队列完结
RxJava  不仅把每个事件单独处理,还会把它们看做一个队列。RxJava规定,当不会再有新的  onNext()  发出时,需要触发  onCompleted()  方法作为事件完成标志。
  • onError(): 事件队列异常
在事件处理过程中出异常时,onError()  会被触发,同时队列自动终止,不允许再有事件发出。
在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个被调用,并且是事件序列中的最后一个执行。
RxJava  的观察者模式大致如下图:
 
Android异步框架RxJava 1.x系列 - 观察者模式及实现

文章图片
 
4. RxJava的基本使用基于以上的概念,RxJava  的基本使用有 3 个步骤:
4.1. 创建Obsever
Observer  即观察者,它决定事件触发的时候将有怎样的行为。  RxJava  中的  Observer  接口的声明方式:
Observer< String> observer = new Observer< String> () { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); }@Override public void onCompleted() { Log.d(tag, "Completed!"); }@Override public void onError(Throwable e) { Log.d(tag, "Error: " + e.getMessage()); } }; 复制代码

除了  Observer  接口之外,RxJava  还内置了一个实现了  Observer  的抽象类:Subscriber。  Subscriber  对  Observer  接口进行了一些扩展,但他们的基本使用方式是完全一样的:
Subscriber< String> subscriber = new Subscriber< String> () { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); }@Override public void onCompleted() { Log.d(tag, "Completed!"); }@Override public void onError(Throwable e) { Log.d(tag, "Error: " + e.getMessage()); } }; 复制代码

实质上,在  RxJava  的  subscribe  过程中,Observer  也总是会先被转换成一个  Subscriber再使用。所以如果你只想使用基本功能,选择  Observer  和  Subscriber  是完全一样的。它们的区别对于使用者来说主要有两点:
  • onStart()
这是  Subscriber  增加的方法。它会在  subscribe  刚开始,而事件还未发送之前被调用。可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。
需要注意的是,如果对准备工作的线程有要求(例如: 弹出一个显示进度的对话框,这必须在主线程执行),onStart()  就不适用了。因为它总是在  subscribe  所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用  doOnSubscribe()  方法,具体可以在后面的章节中看到。
  • unsubscribe()
这是  Subscriber  所实现的另一个接口  Subscription  的方法,用于取消订阅。在这个方法被调用后,Subscriber  将不再接收事件。一般在这个方法调用前,可以使用  isUnsubscribed()  先判断一下状态。
unsubscribe()  这个方法很重要,因为在  subscribe()  之后,  Observable  会持有 Subscriber 的引用。这个引用如果不能及时被释放,将有内存泄露的风险。
注意:在不再使用的时候尽快在合适的地方(例如:  onPause()  和  onStop()  等方法中)调用  unsubscribe()  来解除引用关系,以避免内存泄露的发生。
4.2. 创建Obsevable
4.2.1. Obsevable.create()Observable  即被观察者,它决定什么时候触发事件以及触发怎样的事件。  RxJava  使用  create()  方法来创建一个  Observable  ,并为它定义事件触发规则。示例如下:
Observable observable = Observable.create(new Observable.OnSubscribe< String> () { @Override public void call(Subscriber< ? super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("Hi"); subscriber.onNext("Aloha"); subscriber.onCompleted(); } }); 复制代码

可以看到,这里传入了一个  OnSubscribe  对象作为参数。OnSubscribe  会被存储在返回的  Observable  对象中。
它的作用相当于一个计划表,当  Observable  被订阅的时候,OnSubscribe  的  call()  方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber  将会被调用三次  onNext()  和一次  onCompleted())。
【Android异步框架RxJava 1.x系列 - 观察者模式及实现】这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。
4.2.2. Obsevable.just(T...)create()  方法是  RxJava  最基本的创建事件序列的方法。基于这个方法,RxJava  还提供了一些方法用于快捷创建事件队列,例如  just()  方法:
Observable observable = Observable.just("Hello", "Hi", "Aloha"); // 将会依次调用方法序列:onNext("Hello") -> onNext("Hi") -> onCompleted() 复制代码

4.2.3. Obsevable.from(T[])和from(Iterable< ? extends T> )将传入的数组或  Iterable  拆分成具体对象后,依次发送给观察者,示例如下:
String[] words = {"Hello", "Hi", "Aloha"}; Observable observable = Observable.from(words); // 将会依次调用方法序列:onNext("Hello") -> onNext("Hi") -> onCompleted() 复制代码

4.3. Subscribe关联
创建了  Observable  和  Observer  之后,再用  subscribe()  方法将它们关联起来,整条链子就可以工作了。代码很简单:
observable.subscribe(observer); // 或者 observable.subscribe(subscriber); 复制代码

可能会注意到,subscribe() 这个方法有点怪:它看起来是『observable 订阅了 observer / subscriber』,而不是『observer / subscriber 订阅了 observable』。这看起来就像『杂志订阅了读者』一样颠倒了对象关系。
这让人读起来有点别扭,不过如果把 API 设计成 『observer.subscribe(observable) / subscriber.subscribe(observable)』,虽然更加符合思维逻辑,但对流式 API 的设计就造成影响了,比较起来明显是得不偿失的。
Observable.subscribe(Subscriber)  的内部实现是这样的(核心代码):
public Subscription subscribe(Subscriber subscriber) { subscriber.onStart(); onSubscribe.call(subscriber); return subscriber; } 复制代码

可以看到subscriber()  做了3件事:
(a). 调用Subscriber.onStart()
这个方法在前面已经介绍过,是一个可选的准备方法。
(b). 调用Observable中的OnSubscribe.call(Subscriber)
事件发送的逻辑开始运行。从这也可以看出,在RxJava中,Observable并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当subscribe()方法执行的时候。
(c). 返回Subscription
将传入的Subscriber作为Subscription返回。这是为了方便后面的unsubscribe()。
整个过程中对象间的关系如下图:
 
Android异步框架RxJava 1.x系列 - 观察者模式及实现

文章图片
 
或者可以看动图:
 
Android异步框架RxJava 1.x系列 - 观察者模式及实现

文章图片
 
除了  subscribe(Observer)  和  subscribe(Subscriber)  ,subscribe()  还支持不完整定义的回调,RxJava  会自动根据定义创建出  Subscriber。形式如下:
Action1< String> onNextAction = new Action1< String> () { // onNext() @Override public void call(String s) { Log.d(tag, s); } }; Action1< Throwable> onErrorAction = new Action1< Throwable> () { // onError() @Override public void call(Throwable throwable) { // Error handling } }; Action0 onCompletedAction = new Action0() { // onCompleted() @Override public void call() { Log.d(tag, "completed"); } }; // 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext() observable.subscribe(onNextAction); // 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError() observable.subscribe(onNextAction, onErrorAction); // 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted() observable.subscribe(onNextAction, onErrorAction, onCompletedAction); 复制代码

简单解释一下这段代码中出现的  Action1  和  Action0
  • Action0
Action0  是  RxJava  的一个接口,它只有一个方法  call(),这个方法是无参无返回值的。由于  onCompleted()  方法也是无参无返回值的,因此  Action0  可以被当成一个包装对象,将  onCompleted()  的内容打包起来将自己作为一个参数传入  subscribe()  以实现不完整定义的回调。
  • Action1
Action1  也是一个接口,它同样只有一个方法  call(T param),这个方法也无返回值,但有一个参数。与  Action0  同理,由于  onNext(T obj)  和  onError(Throwable error)  也是单参数无返回值的,因此  Action1  可以将  onNext(obj)  和  onError(error)  打包起来传入  subscribe()  以实现不完整定义的回调。
事实上,虽然  Action0  和  Action1  在  API  中使用最广泛,但  RxJava  提供了多个  ActionX  形式的接口 (例如:  Action2Action3),它们可以被用以包装不同的无返回值的方法。
4.4. 场景示例
4.4.1. 打印字符串数组将字符串数组 names 中的所有字符串依次打印出来:
String[] names = ...; Observable.from(names) .subscribe(new Action1< String> () { @Override public void call(String name) { Log.d(tag, name); } }); 复制代码

4.4.2. 由ID取得图片显示
int drawableRes = ...; ImageView imageView = ...; Observable.create(new OnSubscribe< Drawable> () { @Override public void call(Subscriber< ? super Drawable> subscriber) { Drawable drawable = getTheme().getDrawable(drawableRes)); subscriber.onNext(drawable); subscriber.onCompleted(); } }).subscribe(new Observer< Drawable> () { @Override public void onNext(Drawable drawable) { imageView.setImageDrawable(drawable); }@Override public void onCompleted() { }@Override public void onError(Throwable e) { Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show(); } }); 复制代码

正如上面两个例子这样,创建出  Observable  和  Subscriber,再用  subscribe()  将它们串起来,一次  RxJava  的基本使用就完成了,非常简单!
然而。
 
Android异步框架RxJava 1.x系列 - 观察者模式及实现

文章图片
 
小结在  RxJava  的默认规则中,事件的发出和消费都是在同一个线程的。也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式。观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于  RxJava  是至关重要的。而要实现异步,则需要用到  RxJava  的另一个核心的概念  Scheduler,后续将给出详细介绍。
欢迎关注技术公众号: 零壹技术栈
 
Android异步框架RxJava 1.x系列 - 观察者模式及实现

文章图片
 
本帐号将持续分享后端技术干货,包括虚拟机基础,多线程编程,高性能框架,异步、缓存和消息中间件,分布式和微服务,架构学习和进阶等学习资料和文章。
 
 

    推荐阅读