安卓中使用rxjava

添加依赖
compile 'io.reactivex:rxjava:2.1.6'
compile 'io.reactivex:rxandroid:2.0.1'

简介: 响应式编程是一种面向数据流和变化传播的编程范式。通过Rx框架我们可以很好地使用该范式。 以下为官网对该框架的解释: ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences. It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O. rx框架扩展了观察者模式,添加相应操作符(函数),以支持对事件序列进行处理,使你可以无需关心异步编程,线程安全,非阻塞io等技术细节。


使用 以下为其中涉及到的基本概念及其使用 Observable RxJava基于观察者模式,对其进行拓展。 Subscribers(observer,观察者)订阅事件源(Observable,被观察者)。Observables发出一系列事件,Subscribers处理这些事件。这里的事件可以是任何你感兴趣的东西(比如数据库查询,网络操作,按键事件等) 一个Observable可以发出零个或者多个事件,直到结束或者出错。每发出一个事件,就会调用它的Subscriber的onNext方法,最后调用Subscriber.onNext()或者Subscriber.onError()结束。 另外,在没有任何的的Subscriber,那么这个Observable是不会发出任何事件的。 通过观察者模式,我们可以在被观察者中执行网络操作等耗时的异步操作,等有结果返回时才会调用观察者的onnext方法.


基础例子: 创建observer,重写oncall方法


Observable observable= Observable.create(new Observable.OnSubscribe() {


@Override
public void call(Subscriber subscriber) {
subscriber.onNext("Hello RxAndroid !!");
subscriber.onCompleted();
}
})
创建subscriber,重写onNext,onCompleted,onError方法


Subscriber subscriber=new Subscriber() {


@Override
public void onCompleted() {
Log.v(TAG, "testFirst:onCompleted");
}




@Override
public void onError(Throwable e) {


}


@Override
public void onNext(String s) {
Log.v(TAG, "onNext:" + s);
}
}); 订阅 observable.subscribe(subscriber);
小结: rxjava基于观察者模式,可以实现对任何异步操作的回调处理。可以将observable理解成一个callable对象,在observer subscribe的时候,执行了observable的call方法,在call方法中可以执行各种异步操作,执行完以后将结果回调给subscribe的onNext方法。


创建observable Create 见上面的例子 From 可以将一些集合等数据结构转换为事件流,在订阅时将各个item依次发送给订阅者


List test = new ArrayList();


test.add(s);


test.add("testMap");


Observable.from(list)


Just


将一系列的对象转换为事件流,在订阅时将各个item依次发送给订阅者 Observable.just(1,2,3,4)


observable转换 map


map操作符提供了对事件流的转换,以下例子为将String前面加上hello,你也可以将事件转换为任意类型。


Observable.just("coderrobin").map(new Func1() {


@Override
public String call(String s) {
return "hello"+test;
}
}) .subscribe(new Action1() {


@Override
public void call(String text) {
Log.v(TAG, text);
}
});
flatmap flatmap提供了对observable的转换


Observable.just("coderrobin").map(new Func1() {


@Override
public List call(String s) {
List test = new ArrayList();
test.add(s);
test.add("testMap");
return test;
}
}).flatMap(new Func1() {


@Override
public Observable call(List list) {
return Observable.from(list);
}
}) .subscribe(new Action1() {


@Override
public void call(String text) {
Log.v(TAG, text);
}
});
flatmap的强大之处在于在转换observable时我们可以添加包括异步操作在内的各种业务逻辑。以下载一个app为例子,我们在第一个observable中通过网络获取某个app的下载地址,获取到以后传递给第二个observable,在第二个observable中进行下载操作,下载过程中通过onnext通知observer下载进度,通过onComplete通知下载完成。如果我们要自己实现上述逻辑,则需要3级回调,堪称回调地狱,而在rxjava中通过操作符的链式调用即可实现。 Buffer 聚合,将分散的事件集中为list发送给订阅者 Observable.just(1,2,3,4).buffer(2).subscribe(new Subscriber>() {


@Override
public void onCompleted() {


}


@Override
public void onError(Throwable e) {


}


@Override
public void onNext(List integers) {


}
}); GroupBy,按特定规则进行分组 Observable.just(1,2,3,4,"1","2","3",new Repo("1","1","1")).groupBy(


new Func1() {


@Override
public String call(Object o) {
if (o instanceof Number) {
return "num";
} else if (o instanceof String) {
return "string";
}
return "other";
}
}).subscribe(new Subscriber() {


@Override
public void onCompleted() {


}


@Override
public void onError(Throwable e) {


}


@Override
public void onNext(GroupedObservable stringObjectGroupedObservable) {
Log.v(TAG,stringObjectGroupedObservable.getKey());
}
}); 过滤事件 filter filter可以对事件流进行过滤操作,对于符合条件的事件才继续往下传递


Observable.just(1,2,3,4,5).filter(new Func1() {


@Override
public Boolean call(Integer integer) {
return integer>3;
}
}) .subscribe(new Action1() {


@Override
public void call(Integer num) {
Log.v(TAG, num+"");
}
});
distinct 过滤重复的事件,直接调用distinct方法即可 first 只获取第一个事件 合并事件流 merge


Observable odds = Observable.just(1, 3, 5).subscribeOn(someScheduler);


Observable evens = Observable.just(2, 4, 6);


Observable.merge(odds, evens) .subscribe(new Subscriber() {


@Override public void onNext(Integer item) {


System.out.println(“Next: “ + item); }


@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}




@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
错误处理 Retry 可以设置错误时重试策略 if a source Observable sends an onError notification, resubscribe to it in the hopes that it will complete without error Schedulers 在RxAndroid中,可以通过observeOn和subscribeOn方法指定observable和observer分别工作的线程。Rx框架自动帮我们实现线程相关逻辑,无需开发者去考虑,这极大的方便了开发。 observeOn(Schedulers.newThread()) subscribeOn(AndroidSchedulers.mainThread()) 如上述代码则指令被观察者在新线程中运行,观察者在主线程中运行, demo 以下为一个为使用rxjava+Retrofit访问github api的简单例子: Repo


public class Repo {


public String name;


public String description;


public String url;


public Repo(String name, String description, String url) {


this.name = name;
this.description = description;
this.url = url;
}


public String toString() {


return name + ": " + url;
} } GithubResults public class GithubResults {


ArrayList items; Integer total_count;


} GithubService


public interface GithubService {


@GET("/search/repositories")


Observable getTopNewAndroidRepos(@QueryMap Map queryMap);


} Activity public void testNetwork(){


Map options = new HashMap<>();
options.put("q","coderrobin");


RestAdapter restAdapter = new RestAdapter.Builder()
.setEndpoint("https://api.github.com")
.build();
GithubService apiService = restAdapter.create(GithubService.class);
apiService.getTopNewAndroidRepos(options)
.observeOn(Schedulers.newThread())
.subscribeOn(AndroidSchedulers.mainThread())
.flatMap(new Func1>() {
@Override
public Observable call(GithubResults results) {
Log.v(TAG,results.total_count+"");


return Observable.from(results.items);
}
}).map(new Func1() {

@Override
public String call(Repo repo) {
return repo.url;
}
})
.subscribe(new Action1() {

@Override
public void call(String url) {
Log.v(TAG, "url:" + url);
}
});
}

【安卓中使用rxjava】



    推荐阅读