MVP+RxJava|MVP+RxJava ProgressDialog封装

第一版 封装一个包含DialogFragment 的Subscriber,

public class ProgressDialogSubscriber implements FlowableSubscriber {private static final String TAG = "ProgressSubscriber"; private MyDialogFragment myDialogFragment; private FragmentManager fragmentManager; private Consumer onNext; private Consumer onError; public ProgressDialogSubscriber(FragmentManager fragmentManager, Consumer onNext, Consumer onError) { myDialogFragment = new MyDialogFragment(); this.fragmentManager = fragmentManager; this.onNext = onNext; this.onError = onError; }@Override public void onSubscribe(Subscription s) { Log.i(TAG, "onSubscribe: "); myDialogFragment.show(fragmentManager, "dialog"); s.request(1); }@Override public void onError(Throwable t) { Log.i(TAG, "onError: "); myDialogFragment.dismiss(); try { onError.accept(t); } catch (Exception e) { e.printStackTrace(); } }@Override public void onComplete() { Log.i(TAG, "onComplete: "); myDialogFragment.dismiss(); }@Override public void onNext(T o) { Log.i(TAG, "onNext: " + o); myDialogFragment.dismiss(); try { onNext.accept(o); } catch (Exception e) { e.printStackTrace(); } }public static class MyDialogFragment extends DialogFragment {@Nullable @Override public View onCreateView(LayoutInflater inflater, @Nullable ViewGroup container, @Nullable Bundle savedInstanceState) { return inflater.inflate(R.layout.layout_dialog_fragment, container, false); } } }

P层调用
loginBiz.login(userName, pw) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new ProgressDialogSubscriber(loginView.getSupportFragmentManager(), new Consumer() { @Override public void accept(UserEntity userEntity) throws Exception { Log.i(TAG, "accept: userEntity=" + userEntity.getUserName()); } }, new Consumer() { @Override public void accept(Throwable throwable) throws Exception { if (throwable instanceof LoginBiz.LoginInfoException) { Log.i(TAG, "accept:登录信息错误 " + throwable.getMessage()); } else { Log.i(TAG, "accept: " + throwable.getMessage()); } } }));

P层调用的时候,需要View层暴露getSupportFragmentManager(也有传入Context的)方法。而且Subscriber和P层依赖到了特定的Android平台代码,对纯业务逻辑有伤害,这样并不好。
第二版 隔离Android平台特有的context和getSupportFragmentManager。
ILoadingView 弹框控制接口
public interface ILoadingView {/** * 显示弹框 */ void show(); /** * 关闭弹框 */ void dismiss(); }

LoadingSubscriber 定义的Subscriber只依赖控制ILoadingView
public class LoadingSubscriber implements FlowableSubscriber {private ILoadingView iLoadingView; private Consumer onNext; private Consumer onError; public LoadingSubscriber(ILoadingView iLoadingView, Consumer onNext, Consumer onError) { this.iLoadingView = iLoadingView; if (iLoadingView == null) throw new NullPointerException("ILoadingView 不能为空"); this.onNext = onNext; this.onError = onError; }@Override public void onSubscribe(Subscription s) { iLoadingView.show(); s.request(1); }@Override public void onNext(T t) { iLoadingView.dismiss(); if (onNext != null) { try { onNext.accept(t); } catch (Exception e) { e.printStackTrace(); } } }@Override public void onError(Throwable t) { iLoadingView.dismiss(); if (onError != null) { try { onError.accept(t); } catch (Exception e) { e.printStackTrace(); } } }@Override public void onComplete() { } }

View层 实现ILoadingView 接口,在P层调用如下
loginBiz.login(userName, pw) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new LoadingSubscriber(loginView, new Consumer() { @Override public void accept(UserEntity userEntity) throws Exception { Log.i(TAG, "accept: userEntity=" + userEntity.getUserName()); } }, new Consumer() { @Override public void accept(Throwable throwable) throws Exception { if (throwable instanceof LoginBiz.LoginInfoException) { Log.i(TAG, "accept:登录信息错误 " + throwable.getMessage()); } else { Log.i(TAG, "accept: " + throwable.getMessage()); } } }));

【MVP+RxJava|MVP+RxJava ProgressDialog封装】但是这个感觉还是有点问题,在订阅的时候使用了我自己特定的Subscriber,感觉还是不够干净清爽。能不能用链式的方式把弹框的控制逻辑加进去?
第三版 使用compose+doOnXXX
仍然使用上面的ILoadingView 接口。
/** *这个必须放在subscribe之前,紧贴着subscribe,否则doOnComplete可能不会被调用 * @param iLoadingView * @param * @return */ public FlowableTransformer waitLoadingTransformer(final ILoadingView iLoadingView) {return new FlowableTransformer() { @Override public Publisher apply(Flowable upstream) { return upstream.doOnSubscribe(new Consumer() { @Override public void accept(Subscription subscription) throws Exception { Log.i(TAG, "doOnSubscribe accept: "); iLoadingView.show(); } }).doOnNext(new Consumer() { @Override public void accept(Object o) throws Exception { Log.i(TAG, "doOnNext accept: "); } }).doOnError(new Consumer() { @Override public void accept(Throwable throwable) throws Exception { Log.i(TAG, "doOnError accept: "); iLoadingView.dismiss(); } }).doOnComplete(new Action() { @Override public void run() throws Exception { Log.i(TAG, "doOnComplete run: "); iLoadingView.dismiss(); } }); } }; }

Flowable.create(new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter emitter) throws Exception { for (int i=1; i>0; i++) { Log.i(TAG, "subscribe: i=" + i +" Thread="+Thread.currentThread()); emitter.onNext((long) i); try { Thread.sleep(2*1000); } catch (InterruptedException e) { e.printStackTrace(); return; } if (i ==10) { emitter.onError(new Throwable("TTT")); return; } } } },BackpressureStrategy.BUFFER) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .compose(waitLoadingTransformer(loginView)).subscribe(new FlowableSubscriber() { @Override public void onSubscribe(Subscription s) { Log.i(TAG, "onSubscribe: "); s.request(Integer.MAX_VALUE); }@Override public void onNext(Long aLong) { Log.i(TAG, "onNext: "+aLong); }@Override public void onError(Throwable t) { Log.i(TAG, "onError: "+t); }@Override public void onComplete() { Log.i(TAG, "onComplete: "); } });

这样是不是很纯粹很Rx的方式实现了对dialog的控制!
一直觉得可以用这样的方式去实现对dialog的控制,但是百度了很多,都是传入context之类的,觉得不纯粹,于是使用伟大的Google,发现了这个库RxLoading,参考了他的实现。
这是使用compose的方式实现的,AutoDispose 链式处理的思路 ObservableConverter+as操作符,感觉也是可以实现对dialog的控制,只是还没找到关于ObservableConverter 使用的介绍,看了半天,每太看懂这个怎么用,这个是2.1.7加入的实验性的功能。先暂时放置一下,后面再花时间好好的研究AutoDispose 以及ObservableConverter的使用。
第四版 在Google的时候,发现另外一种实现方案,使用using操作符
关于using操作符的介绍
Flowable.using(new Callable() { @Override public ILoadingView call() throws Exception { Log.i(TAG, "using call: 11"); //初始化 loginView.show(); return loginView; } }, new Function>() { @Override public Publisher apply(ILoadingView iLoadingView) throws Exception { Log.i(TAG, "using apply: 22"); //发射数据的逻辑 returnFlowable.create(new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter emitter) throws Exception { for (int i=1; i>0; i++) { Log.i(TAG, "subscribe: i=" + i +" Thread="+Thread.currentThread()); emitter.onNext((long) i); try { Thread.sleep(2*1000); } catch (InterruptedException e) { e.printStackTrace(); return; } if (i ==10) { emitter.onError(new Throwable("TTT")); return; } } } },BackpressureStrategy.BUFFER); } }, new Consumer() { @Override public void accept(ILoadingView iLoadingView) throws Exception { Log.i(TAG, "using accept:333 "); //释放资源 iLoadingView.dismiss(); } }).compose(rxLifeCycleHelper.io_main()).compose(rxLifeCycleHelper.bindTolifecycle( RxLifeCycleHelper.ActivityEvent.DESTROY)).subscribe(new FlowableSubscriber() { @Override public void onSubscribe(Subscription s) { Log.i(TAG, "onSubscribe: "); s.request(Integer.MAX_VALUE); }@Override public void onNext(Long aLong) { Log.i(TAG, "onNext: "+aLong); }@Override public void onError(Throwable t) { Log.i(TAG, "onError: "+t); }@Override public void onComplete() { Log.i(TAG, "onComplete: "); } });

可以直接在using的第一个参数中初始化Dialog,在第二个参数中,构造发射数据请求,第三个参数中释放dialog。
这种写法比较适合直接在Activity中初始化和显示Dialog,但是在MVP中显得不够友好,每次使用的时候写的有点复杂不好封装以通用,于是再次compose化。
using compose化
public FlowableTransformer waitLoadingTransformerUsing(final ILoadingView iLoadingView) {return new FlowableTransformer() { @Override public Publisher apply(final Flowable upstream) { return Flowable.using(new Callable() { @Override public ILoadingView call() throws Exception { Log.i(TAG, "using call: 11"); //初始化 iLoadingView.show(); return iLoadingView; } }, new Function>() { @Override public Publisher apply(ILoadingView iLoadingView) throws Exception { Log.i(TAG, "using apply: 22"); return upstream; } }, new Consumer() { @Override public void accept(ILoadingView iLoadingView) throws Exception { Log.i(TAG, "using accept:333 "); //释放资源 iLoadingView.dismiss(); } }); } }; }

使用,又链式了,很Rx。
Flowable.create(new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter emitter) throws Exception { for (int i=1; i>0; i++) { Log.i(TAG, "subscribe: i=" + i +" Thread="+Thread.currentThread()); emitter.onNext((long) i); try { Thread.sleep(2*1000); } catch (InterruptedException e) { e.printStackTrace(); return; } if (i ==10) { emitter.onError(new Throwable("TTT")); return; } } } },BackpressureStrategy.BUFFER).compose(waitLoadingTransformerUsing(loginView)).compose(rxLifeCycleHelper.io_main()).compose(rxLifeCycleHelper.bindTolifecycle( RxLifeCycleHelper.ActivityEvent.DESTROY)).subscribe(new FlowableSubscriber() { @Override public void onSubscribe(Subscription s) { Log.i(TAG, "onSubscribe: "); s.request(Integer.MAX_VALUE); }@Override public void onNext(Long aLong) { Log.i(TAG, "onNext: "+aLong); }@Override public void onError(Throwable t) { Log.i(TAG, "onError: "+t); }@Override public void onComplete() { Log.i(TAG, "onComplete: "); } });

方式四相对于方式三,不用写很多doOn方法,但是先onSubscribe 之后才执行using的第一个参数,方式三的doOnSubscribe 是在onSubscribe 之前执行。

    推荐阅读