最近在工作中,频繁的使用了Rxjava来解决一些问题,在使用过程中也给予了自己一些思考,如何使用好RxJava,在什么样的场景中才能发挥它更好的作用,如何脱离代码来理解RxJava的工作机制,下面是自己一些浅显的思考。
太多示例喜欢链式的把RxJava的流程表述起来,这个地方我把观察者和订阅者拆开来看。
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("123"); } }); Observer observer = new Observer<String>() { ... @Override public void onNext(String s) { Log.i("TAG", "onNext: " + s); } }; observable.subscribe(observer); 复制代码
这个简单的例子大家应该都知道,只要subcribe产生了订阅,onNext方法将会收到 emitter.onNext("123");
发射出去的数据。
这个地方让我产生思考主要是有一次去吃自助餐,大家在打酸奶的时候,都会拿着一个杯子对准出口,然后按住开关,酸奶就会自动流到杯子中。在这个过程中,我们不妨把酸奶机看做 Observable
,酸奶机里面的酸奶是许许多多的 emitter.onNext("123")
,按住开关的那一刻产生了 subscribe
订阅,然后我们是用杯子 Observer
去接牛奶的,当然,我们还有橙子机、酸梅汤机等,则机子内盛的饮料类型就是 Observable<String>
。我们知道,酸奶机有很多个开关入口,这时候,又来一个人,拿着杯子 Observer
来打牛奶,那么,我和他是一块共享这酸奶机里面的酸奶,我们俩都能接收到酸奶,等我们不需要接酸奶了,我们就dispose关闭开关。
eg:
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception { //模拟耗时任务 for (int i = 0; i < 5; i++) { observableEmitter.onNext(""+i); } } }).subscribeOn(Schedulers.io()); //杯子1 observable.subscribe(observer1); //杯子2 observable.subscribe(observer2); 复制代码
result:
observer1 onNext: 0 observer2 onNext: 0 observer1 onNext: 1 observer2 onNext: 1 observer1 onNext: 2 observer2 onNext: 2 observer1 onNext: 3 observer2 onNext: 3 observer1 onNext: 4 observer2 onNext: 4 复制代码
之前在思考事件驱动这一块,如何更好的通知其他业务组件,业界比较有名的当属EventBus,但EventBus用起来很杂乱无章,当项目规模大起来,业务复杂起来时,都不敢修改这个post,虽然解耦了,但事件变得更乱了,所以,自己重新思考了事件驱动这一块。
鉴于EventBus提供的的思路,我打算用RxJava的方式来实现。以酸奶机为例,当前页面我想订阅一个事件,等待被触发,我完全可以先准备一个杯子(Observer),然后将他们存到一个集合里面,待酸奶机(Observable)里面有酸奶了(observableEmitter.onNext),然后订阅(subcribe)这个杯子的集合,将酸奶倒到杯子里,鉴于此思路,用代码大致的实现下。
List<Observer> list = new ArrayList<>(); //注册事件 public void registerObserver(Observer observer) { list.add(observer); } //驱动事件 public void postEvent() { Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("123"); } }); for (int i = 0; i < list.size(); i++) { observable.subscribeOn(Schedulers.io()).subscribe(list.get(i)); } } @Test public void Test() { Observer observer = new Observer<String>() { ... @Override public void onNext(String s) { System.out.println("onNext: " + s); } }; //注册事件 registerObserver(observer); //发送事件 postEvent(); } 复制代码
这里只给了大致的思路,用了一个平时都可见的例子来实现了事件驱动。
最近有一个业务场景,需要监听RTK当前状态的变化,业务场景是:
有一个前提,RTK必须先设置账户,才能使用后续的服务。如果用户按照应用准则走,进入主页后,先去设置页面设置RTK,然后回到主页,进入任务执行页,这时候监听RTK state是可用的;但如果用户忘了设置的步骤,或是有强迫症的用户,我就不往你提示的方式走,我就要先进任务执行员页,这时候RTK State监听是不可用的,我们会引导用户进入设置页面设置RTK,这个地方又要分情况,如果用户设置好RTK账户,然后返回了任务页,那么任务页的RTK state监听到了用户设置了账户就会返回可用,那么这次任务是可使用的;但如果用户设置好了账户,想去诊断RTK当前连接的状态的话,RTK state监听事件就会被诊断RTK页面给设置,也就意味着,任务页的RTK state监听就会失效,那么返回任务页的话,任务页是不会有任何反应的。但这一块也是有解决办法的,就是任务页的RTK state监听放在 onResume
方法里面,即设置页面返回任务页后,触发任务页的 onResume
方法,重新夺回RTK state的监听。 办法都是有的,但依赖生命周期去做到这点,感觉并不是特别的可靠,我们可以参考上面事件驱动的例子,将RTK state看成是酸奶机,然后哪个页面(杯子)需要RTK state信息的话,就可以订阅(subcribe)酸奶机,如果想要牛奶的话,就post一个信息出去,告诉酸奶机我要酸奶,下面,我给出一份示例:
Set<Observer> set = new HashSet<>(); //模拟一个RTK state 单例 public Observable getObservableInstance() { return Observable.create(new ObservableOnSubscribe<RTKState>() { @Override public void subscribe(ObservableEmitter<RTKState> emitter) throws Exception { RTK rtk = DjiSettingUtils.getRTK(); rtk.setStateCallback(new RTKState.Callback() { @Override public void onUpdate(@NonNull RTKState rtkState) { emitter.onNext(rtkState); } }); } }); } //驱动事件 public void postEvent(Observer observer) { if (!set.contains(observer)) { getObservableInstance().subscribeOn(Schedulers.io()).subscribe(observer); } } @Test public void onCreate() { Observer observer = new Observer<RTKState>() { ... @Override public void onNext(RTKState s) { System.out.println("onNext: " + s.isRTKBeingUsed()); } }; //发送事件 postEvent(observer); } 复制代码
之后,我们只需要关注 onCreate
方法,在任务页我们发起一个订阅事件,接收RTK state信息,在诊断页面也发起一个订阅,接收RTK信息,这样就不会像上面那样,抢断监听事件的问题。
业务场景中有需要从无人机中读取缩略图,并将缩略图上传至服务器,图片上传我们使用的是七牛云,因为一次任务产生的缩略图非常多,基本上都在百张左右,我们不可能为了在上传过程中,因为某些原因导致了断开了,让用户重新上传所有的缩略图,所以,我们打算让百张缩略图采用顺序上传,当哪个节点发生错误的时候,记住index,等用户点击重新上传时,我们再从index的位置继续上传,如果按照传统方式来做的话,第一张上传成功后,如何通知第二张上传呢,我这里给个大致的代码:
List<File> list=new ArrayList<>(); int index=0; public void uploadPic(){ uploadManager.put(list.get(index), key, token, new UpCompletionHandler() { @Override public void complete(String key, ResponseInfo info, JSONObject res) { if (info.isOK()) { index++; uploadPic() } else { //弹框提示用户,当前index上传失败 } } }, null); } @Test public void test(){ uploadPic() } 复制代码
每次上传成功后都调用自身的方法,如果上传失败了,则记住index的位置,提示用户,用户点击重试上传,那么就继续调用 uploadPic
方法,上传的拿到的文件还是从index位置开始拿,所以,也是没有任何问题的。 但是,总觉得这么设计不那么优雅,比如我想知道上传进度的话,那也就意味着我需要在index++方法下面加一个设置进度条的功能,那如果业务需要再加一个上传完成的操作的话,那是不是又要在index++下面多加一个 index==list.size()
的判断呢,其实,这样设计下去的话,整个上传功能就变得特别的松散,移植性也不强,所以,是时候发挥RxJava的 Observer 了。
鉴于异步回调的思考,我打算把上传任务封装成一个 ObservableOnSubcribe
,每次执行任务成功后,就将事件流onNext交给下游,告诉他我完成了一次上传,如果上传失败了,则发射onError异常。
public class QiNiuBitmapOnSubscribe implements ObservableOnSubscribe<QiniuParam> { ... @Override public void subscribe(final ObservableEmitter<QiniuParam> emitter) throws Exception { //上传操作 uploadManager.put(file, key, token, new UpCompletionHandler() { @Override public void complete(String key, ResponseInfo info, JSONObject res) { if (info.isOK()) { emitter.onNext(new QiniuParam(key, info, res)); emitter.onComplete(); } else { emitter.onError(new ServerException(-1, res.toString())); } } }, null); } } 复制代码
由于图片是存储在一个集合中,那么就肯定要用到RxJava的 fromIterable
来遍历集合,由于需要保证图片是有序上传,就需要用到 concatMap
操作符 , 所以,大致代码如下
Observable.fromIterable(fileList) .concatMap(new Function<QiNiuFile, ObservableSource<QiniuParam>>() { @Override public ObservableSource<QiniuParam> apply(QiNiuFile qiniuFile) throws Exception { //返回七牛云上传 return Observable.create(new QiNiuFileOnSubscribe(uploadManager, qiniuFile.getFile(), qiniuFile.getKey(), qiniuFile.getUploadToken())); } }).subscribe(new Observer<QiniuParam>() { ... @Override public void onNext(QiniuParam qiniuParam) { index++; //通知上传进度 uploadCallBack.onUploadProcess(index); } @Override public void onError(Throwable e) { //通知断传的位置 uploadCallBack.onUploadQiNiuError(index); } @Override public void onComplete() { //上传成功 uploadCallBack.onUploadQiNiuComplete(); } }); 复制代码
对于 Observer
来说,他是一个干净的接收流,他不关心上游发生的事情,只专注结果的处理。
以上思考有的地方可能不是特别的完善,还需要多思考,RxJava用的人确实很多,但要想玩的溜的话,确实任重而道远。