Rxjava在GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。</br>
通俗来说,Rxjava是一个采用了观察者模式设计处理异步的框架。链式调用设计让代码优雅易读。
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("a"); } }); observable.subscribe(new Observer<String>() { public void onSubscribe(Disposable d) { } public void onNext(String s) { } public void onError(Throwable e) { } public void onComplete() { } });
这是Rxjava2最简单的用法:
Observable Observable
创建Observable用的是 Observable.create(ObservableOnSubscribe<T> source)
方法。
这个方法的参数是 ObservableOnSubscribe
:
public interface ObservableOnSubscribe<T> { /** * Called for each Observer that subscribes. * e the safe emitter instance, never null * Exception on error */ void subscribe ObservableEmitter<T> e) throws Exception; }
查看源码可以知道,它是个接口,唯一的方法是 subscribe
,参数是 ObservableEmitter<T> e
。 ObservableEmitter
是一个继承了 Emitter
的接口,在 Emitter
里定义了onNext、onError、onComplete等方法。
public interface Emitter<T> { /** * Signal a normal value. * value the value to signal, not null */ void onNext T value); /** * Signal a Throwable exception. * error the Throwable to signal, not null */ void onError Throwable error); /** * Signal a completion. */ void onComplete(); }
ObservableEmitter
对接口 Emitter
进行扩展,增加了setDisposable、setCancellable等方法
基本参数了解了,现在看看create方法里面做了什么,代码如下:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
调用了 RxJavaPlugins
的 onAssembly
方法。又有一个新参数 ObservableCreate<T>(source)
,我们看看它是什么:
final class ObservableCreate<T> extends Observable<T> { public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } }
继承了 Observable
,所以也是个被观察对象,在构造函数中我们看到我们new的 ObservableOnSubscribe
对象,被存在了 ObservableCreate
的 source
里面
那我们继续看看onAssembly方法做什么:
public static <T> Observable<T> onAssembly Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; }
一个Hook方法。 onObservableAssembly
是一个静态变量,我们没有设置,默认为空,所以直接返回source对象。也就是说,Observable的create方法其实就是把我们 ObservableOnSubscribe
对象,存储在 ObservableCreate
对象的 source
里面,然后返回 ObservableCreate
对象。
我们知道 ObservableCreate
是继承 Observable
的,所以创建了 ObservableCreate
对象,我们的Observable也就创建完了。
订阅被观察者的操作是 observable.subscribe(new Observer<String>())
。这个操作符其实是个“被动”,就是事件被观察者观察。因为subscribe方法里的参数 Observer
才是观察者。我们也会在 Observer
里的各个会调方法里接收到事件相关的返回值。
我们看看 subscribe
方法的源码:
public final void subscribe(Observer<? super T> observer) { try { subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { RxJavaPlugins.onError(e); } }
看代码我们知道最主要调用的方法是: subscribeActual(observer);
,这个方法是Observable里的抽象方法,此时我们的Observable是一个 ObservableCreate
对象。所以我们去看一下 ObservableCreate
里面是如何重写这个方法的。代码如下:
protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
我们一看到这个方法主要做了三件事:
CreateEmitter
对象 parent
,把 observer
传进去。 parent
传给 observer
的 onSubscribe
方法。 parent
传给 source
的 subscribe
方法。上面我们知道 source
就是我们传进来的 ObservableOnSubscribe
对象, subscribe
也就是我们重写的方法: public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("a"); }
我们可以在这个方法里做被观察的事件,并可以通过 CreateEmitter
回调相应的方法。 CreateEmitter
是实现 ObservableEmitter
接口,我们看看它内部实现, onNext
源码如下:
public void onNext(T t) { observer.onNext(t); }
也就是说,当我们在 ObservableOnSubscribe
的1 subscribe
方法里调用 ObservableEmitter
的 onNext
方法的时候,它里面会调用observer的 onNext
。于是通过这样的传递,我们就能在observer里响应的回调方法里收到事件的相关状态。