转载

RxJava2和Retrofit2配合使用详解

常见的 button 点击事件为例,button 是 被观察者 ,listener 是 观察者 ,setOnClickListener 过程是 订阅 ,有了订阅关系后在 button 被点击的时候,监听者 listener 就可以响应事件。

这里的 button.setOnClickListener(listener) 看上去意思是被观察者订阅了观察者(杂志订阅了读者),逻辑上不符合日常生活习惯。其实这是设计模式的习惯,不必纠结,习惯了这种模式就利于理解观察者模式了。

2. RxJava 中的观察者模式

Observable
Observer
subscribe

首先创建 Observable 和 Observer,然后 observable.subscribe(observer) ,这样 Observable 发出的事件就会被 Observer 响应。一般我们不手动创建 Observable,而是由 Retrofit 返回给我们,我们拿到 Observable 之后只需关心如何操作 Observer 中的数据即可。

不过为了由浅入深的演示,还是手动创建 Observable 来讲解。

2.1 创建 Observable

常见的几种方式,不常用的不写了,因为我觉得这个模块不是重点。

var observable=Observable.create(ObservableOnSubscribe<String> {...})
var observable=Observable.just(...)
var observable = Observable.fromIterable(mutableListOf(...))

2.1.1 create()

var observable2=Observable.create(object :ObservableOnSubscribe<String>{
            override fun subscribe(emitter: ObservableEmitter<String>) {
                emitter.onNext("Hello ")
                emitter.onNext("RxJava ")
                emitter.onNext("GoodBye ")
                emitter.onComplete()            }

        })
复制代码

ObservableOnSubscribeObservableEmitter 都是陌生人,这个要是详细讲涉及到源码分析,东西可就多了(主要是我不熟悉),所以可以理解成 ObservableOnSubscribe 是用来帮助创建 Observable 的,ObservableEmitter 是用来发出事件的(这些事件在观察者 Observer 中可以响应处理)。

emitter 一次发射了三个事件,然后调用了 onComplete() 这些在下面讲观察者 Observer 时还会提到,一并讲解。

2.1.2 just

var observable=Observable.just("Hello","RxJava","GoodBye")
复制代码

这句的效果等同于上面用 create 创建 observable,即 调用 3 次 onNext 后再调 onComplete。

2.1.3 fromIterable

var observable = Observable.fromIterable(mutableListOf("Hello","RxJava","GoodBye"))
复制代码

这句的效果等同于上面用 create 创建 observable,即 调用 3 次 onNext 后再调 onComplete。

2.2 创建 Observer

val observer = object : Observer<String> {
            override fun onComplete() {
                Log.e("abc", "-----onComplete-----")
            }

            override fun onSubscribe(d: Disposable) {
                Log.e("abc", "-----onSubscribe-----")
            }

            override fun onNext(t: String) {
                Log.e("abc", "-----onNext-----$t")
            }

            override fun onError(e: Throwable) {
                Log.e("abc", "-----onError-----$e")
            }
        }
//订阅
observable.subscribe(observer)
复制代码

log 打印情况:

-----onSubscribe-----
-----onNext-----Hello
-----onNext-----RxJava
-----onNext-----GoodBye
-----onComplete-----
复制代码

可以看到,先是建立订阅关系,然后根据前面 observable 的发射顺序来打印 onNext,参数通过 onNext(t: String) 传进来,最后调用 onComplete,多说一句,在 just 和 fromIterable 的情况下,没有手动调用 Emitter,但是仍会先调用 onNext,最后调用 onComplete

2.3 Consumer 和 Action

这两个词意思分别是消费者(可以理解为消费被观察者发射出来的事件)和行为(可以理解为响应被观察者的行为)。对于 Observer 中的 4 个回调方法,我们未必都能用得到,如果只需要用到其中的一部分,就需要 Consumer 和 Action 上场了。

有参数的 onSubscribeonNextonError 我们用 Consumer 来代替,无参的 onComplete 用 Action 代替:

2.3.1 subscribe(Consumer<? super T> onNext)

observable.subscribe(object :Consumer<String>{
            override fun accept(t: String?) {
                Log.e("abc", "-----onNext-----$t")
            }
        })
//打印
-----onNext-----Hello
-----onNext-----RxJava
-----onNext-----GoodBye
复制代码

说明一下,如果 subscribe 中我们只传一个对象参数,那只能是 subscribe(Consumer<? super T> onNext) (onNext 方法),不能是 Action 或 Consumer<? super Throwable> onError、Consumer<? super Disposable> onSubscribe

==注意==:Consumer 中的回调方法名称是 accept,区别于前面的 onNext

2.3.2 subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)

带有两个 Consumer 参数,分别负责 onNext 和 onError 的回调。

observable.subscribe(object : Consumer<String> {
            override fun accept(t: String?) {
                Log.e("abc", "-----onNext-----$t")
            }
        }, object : Consumer<Throwable> {
            override fun accept(t: Throwable?) {
                Log.e("abc", "-----onError-----$e")
            }
        })
复制代码

如果想要一个带有两个 Consumer 但是不是这种搭配(比如 subscribe(Consumer<? super T> onNext, Consumer<? super Disposable> onSubscribe) ),可以吗?答案是: 不行

2.3.3 subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete)

带有三个参数,分别负责onNext、onError和onComplete的回调。

observable.subscribe(object : Consumer<String> {
            override fun accept(t: String?) {
                Log.e("abc", "-----onNext-----$t")
            }
        }, object : Consumer<Throwable> {
            override fun accept(t: Throwable?) {
                Log.e("abc", "-----onError-----$e")
            }
        }, object : Action {
            override fun run() {
                Log.e("abc", "-----onComplete-----")
            }
        })
复制代码

同样,三个参数只能有这一种搭配

==注意==:Action 中的回调方法名称是 run,区别于前面的 onComplete

2.3.4 subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,Action onComplete, Consumer<? super Disposable> onSubscribe)

这种情况和直接 new 出来的 Observer 效果一样。

observable2.subscribe(object : Consumer<String> {
            override fun accept(t: String?) {
                Log.e("abc", "-----onNext-----$t")
            }
        }, object : Consumer<Throwable> {
            override fun accept(t: Throwable?) {
                Log.e("abc", "-----onError-----$e")
            }
        }, object : Action {
            override fun run() {
                Log.e("abc", "-----onComplete-----")
            }
        },object : Consumer<Disposable>{
            override fun accept(t: Disposable?) {
                Log.e("abc", "-----onSubscribe-----")            }
        })
复制代码
原文  https://juejin.im/post/5d904bfa6fb9a04e3e7232d9
正文到此结束
Loading...