RxJava结合Retrofit使用

Retrofit + RxJava + OkHttp+MVP 让网络请求变的简单
传送门
网络请求
Android中有名的网络请求库就那么几个, Retrofit能够从中脱颖而出很大原因就是因为它支持RxJava的方式来调用, 下面简单讲解一下它的基本用法.
1、要使用Retrofit,先添加Gradle配置:

compile 'io.reactivex.rxjava2:rxjava:2.2.1' compile 'io.reactivex.rxjava2:rxandroid:2.1.0' //retrofit compile 'com.squareup.retrofit2:retrofit:2.3.0' //Gson converter compile 'com.squareup.retrofit2:converter-gson:2.3.0' //RxJava2 Adapter compile "com.squareup.retrofit2:adapter-rxjava2:2.3.0" //okhttp compile 'com.squareup.okhttp3:okhttp:3.4.1' compile 'com.squareup.okhttp3:logging-interceptor:3.9.1'

2、随后定义Api接口,本篇采用开源天气接口
public interface ApiStores { //baseUrl String API_SERVER_URL = "http://www.weather.com.cn/"; //加载天气 @GET("adat/sk/{cityId}.html") Observable loadDataByRetrofitRxJava(@Path("cityId") String cityId); }

【RxJava结合Retrofit使用】3、接着创建一个Retrofit客户端:
/** * 创建一个Retrofit客户端: */ public class ApiClient { public static Retrofit mRetrofit; public static Retrofit retrofit() { if (mRetrofit == null) { OkHttpClient.Builder builder = new OkHttpClient.Builder(); builder.readTimeout(5, TimeUnit.SECONDS); builder.connectTimeout(5, TimeUnit.SECONDS); if (BuildConfig.DEBUG) { // Log信息拦截器 HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor(); loggingInterceptor.setLevel(HttpLoggingInterceptor.Level.BODY); //设置 Debug Log 模式 builder.addInterceptor(loggingInterceptor); } OkHttpClient okHttpClient = builder.build(); mRetrofit = new Retrofit.Builder() .baseUrl(ApiStores.API_SERVER_URL) .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) .client(okHttpClient) .build(); } return mRetrofit; }}

MainModel
/** MainModel */ public class MainModel {private WeatherinfoBean weatherinfo; public WeatherinfoBean getWeatherinfo() { return weatherinfo; }public void setWeatherinfo(WeatherinfoBean weatherinfo) { this.weatherinfo = weatherinfo; }public static class WeatherinfoBean { private String city; private String cityid; private String temp; private String WD; private String WS; private String SD; private String WSE; private String time; private String isRadar; private String Radar; private String njd; private String qy; public String getCity() { return city; }public void setCity(String city) { this.city = city; }public String getCityid() { return cityid; }public void setCityid(String cityid) { this.cityid = cityid; }public String getTemp() { return temp; }public void setTemp(String temp) { this.temp = temp; }public String getWD() { return WD; }public void setWD(String WD) { this.WD = WD; }public String getWS() { return WS; }public void setWS(String WS) { this.WS = WS; }public String getSD() { return SD; }public void setSD(String SD) { this.SD = SD; }public String getWSE() { return WSE; }public void setWSE(String WSE) { this.WSE = WSE; }public String getTime() { return time; }public void setTime(String time) { this.time = time; }public String getIsRadar() { return isRadar; }public void setIsRadar(String isRadar) { this.isRadar = isRadar; }public String getRadar() { return Radar; }public void setRadar(String Radar) { this.Radar = Radar; }public String getNjd() { return njd; }public void setNjd(String njd) { this.njd = njd; }public String getQy() { return qy; }public void setQy(String qy) { this.qy = qy; } } }

4、然后发起请求就很简单了:
创建上游即Observable 被观察者 然后去订阅下游观察者
ApiClient.retrofit().create(ApiStores.class).loadDataByRetrofitRxJava("101220602")
ApiClient.retrofit().create(ApiStores.class).loadDataByRetrofitRxJava("101220602") .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer() { @Override public void onSubscribe(Disposable d) {}@Override public void onNext(MainModel mainModel) { Log.w(TAG, "" + mainModel.getWeatherinfo().getCity()); Log.d(TAG, "observer thread is : " + Thread.currentThread().getName()); }@Override public void onError(Throwable e) { Toast.makeText(MainActivity.this, "登录失败", Toast.LENGTH_SHORT).show(); }@Override public void onComplete() { Toast.makeText(MainActivity.this, "登录成功", Toast.LENGTH_SHORT).show(); } }); }

rxjava+retrofit处理网络请求
在使用rxjava+retrofit处理网络请求的时候,一般会采用对观察者进行封装,实现代码复用和拓展。
基类observer public abstract class BaseObserver implements Observer {protected String errMsg = ""; protected Disposable disposable; @Override public void onSubscribe(Disposable d) { disposable = d; }@Override public void onNext(T t) {}@Override public void onError(Throwable e) { LogUtils.d("Subscriber onError", e.getMessage()); if (!NetworkUtils.isConnected()) { errMsg = "网络连接出错,"; } else if (e instanceof APIException) { APIException exception = (APIException) e; errMsg = exception.getMessage() + ", "; } else if (e instanceof HttpException) { errMsg = "网络请求出错,"; } else if (e instanceof IOException) { errMsg = "网络出错,"; }if (disposable != null && !disposable.isDisposed()) { disposable.dispose(); } }@Override public void onComplete() { if (disposable != null && !disposable.isDisposed()) { disposable.dispose(); } } }

内存泄露处理
基本代码:
使用observer 或者Subscriber 创建观察者
private final CompositeDisposable disposables = new CompositeDisposable(); // 1. 创建一个普通的 observable Observable observable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter e) throws Exception { if (e.isDisposed()) return; SystemClock.sleep(2000); e.onNext("next"); e.onComplete(); } }); // 2. 创建一个可被 CompositeDisposable 管理的 observer DisposableObserver observer = new DisposableObserver() { @Override public void onNext(String value) {}@Override public void onError(Throwable e) {}@Override public void onComplete() {} }; // 3. 订阅事件 observable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(observer); // 4. 将需要被 CompositeDisposable 管理的 observer 加入到管理集合中 disposables.add(observer); }在 Activity/Fragment 销毁生命周期中取消异步操作防止内存泄露: @Override protected void onDestroy() { super.onDestroy(); // 将所有的 observer 取消订阅 disposables.clear(); }

    推荐阅读