首先创建一个 observer
和 observable
new Observer<Object>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Object o) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }; 复制代码
Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> emitter) throws Exception { } }); 复制代码
先看 Observable
的创建过程, create()
只是把传进去 ObservableOnSubscribe
对象包装了一层返回,这里可以忽略
然后是 ObservableOnSubscribe
类的内部
final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { // 1 this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { // 2 CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { // 3 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } 复制代码
2. 这里new了一个Emitter,也就是我们用来onNext,onError的类,然后把Emitter传给observer中,由于CreateEmitter实现了Disposable接口,所以可以看到Observer里有一个 onSubscribe(Disposable d)
,我们可以用来控制流的结束等操作,实际上这个 Disposable
就是源Obserable创建的,
3. 这里把 Emitter
传给source,也就是传给我们new的 ObservableOnSubscribe
的 subscribe()
ObservableMap
,也是一个 Observable
Observable
都会有一个 subscribeActual()
方法,这个方法在 Observable
调用 subscrible()
之后会被调用,也就是说一个 Observable
在调用 subscrible()
之后,实际上的操作逻辑都是在 subscribeActual()
里面 ObservableMap
的 subscribeActual()
里,调用了 source.subscribe(new MapObserver<T, U>(t, function));
这句代码, source
是上层的observable,参数为包装过后的 observer
即 MapObserver
,由图三可以看出, MapObserver
内部的 onNext
是往下游传递一个经过 apply()
变化过后的数据,也就达到了我们用 map
变化数据的功能了。