GitHub关于RxJava的介绍:
a library for composing asynchronous and event-based programs by using observable sequences
他的意思就是 一个通过可观测的序列来组成异步和基于事件的库。
RxJava的出现消除同步问题、线程安全等问题
总的来说就是方便我们异步编程。
异步
链式调用结构
使用复杂的异步调用方式的时候依旧可以保持简洁
学习成本比较高,入门的门槛比较高
难以理解的API,需要查看源码才能理解API的具体效果
首先明白他的基础使用步骤:
正常创建被观察者:
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("ONE"); emitter.onNext("TWO"); emitter.onNext("THREE"); emitter.onComplete(); } }); 复制代码
在这里面一共产生了四个事件:One、Two、Three、结束。
PS:
非正常创建第一弹:
Observable observable = Observable.just("ONE","TWO","THREE");
非正常创建第二弹:
String[] values = {"ONE", "TWO", "THREE"}; Observable observable = Observable.fromArray(values); 复制代码
其实这样的非正常创建是内部将这些信息包装成onNext()这样的事件发送给观察者。
正常创建:
Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.i("z", "onSubscribe: "); } @Override public void onNext(String s) { Log.i("z", "onNext: s = " + s); } @Override public void onError(Throwable e) { Log.i("z", "onError: "); } @Override public void onComplete() { Log.i("z", "onComplete: "); } }; 复制代码
非正常创建:
Consumer<String> observer = new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.i("z", "accept: s = " + s); } }; 复制代码
observable.subscribe(observer);
你已经注意到不一样的地方,为什么被观察者订阅了观察者?
之所以会这样,是因为RxJava为了保持链式调用的流畅性。
RxJava既然是异步库,当然对于异步的处理会更好
在我们看RxJava的异步调用之前,我们先来学习下其中比较重要的两个点
这个表示Observable在一个指定的环境下创建,只能使用一次,多次创建的话会以第一次为准。
表示 事件传递和 最终处理发生在哪个环境下,可以多次调用,每次指定之后,下一步就生效。
比如:
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("ONE"); emitter.onNext("TWO"); emitter.onNext("THREE"); emitter.onComplete(); } }) // 被观察者在一个新的线程中创建 .subscribeOn(Schedulers.newThread()) // 下面这个操作是在io线程中 .observeOn(Schedulers.io()) .map(new Function<String, String>() { @Override public String apply(String s) throws Exception { return s.toLowerCase(); } }) // 切换,观察者是在主线程中 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { } }); 复制代码
先看一下基础的调用方式:
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { Log.i(TAG, "subscribe: "); emitter.onNext("ONE"); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.i(TAG, "onSubscribe: "); } @Override public void onNext(String s) { Log.i(TAG, "onNext: s = " + s); } @Override public void onError(Throwable e) { Log.i(TAG, "onError: e = " + e.getMessage()); } @Override public void onComplete() { Log.i(TAG, "onComplete: "); } }); } }); 复制代码
结果:
onSubscribe: subscribe: onNext: s = ONE 复制代码
我们先从订阅开始看,也就是 subscribe
方法
public final void subscribe(Observer<? super T> observer) { ... // 忽略部分源码 subscribeActual(observer); ... // 忽略部分源码 } 复制代码
直接找到主要的方法 subscribeActual(observer)
,这个是抽象的方法,会被实现在子类中。
所以我们接着看看 Observable
的子类实现:
我们进入到 create
方法中:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } 复制代码
其实 返回的就是 ObservableCreate
的对象,
需要注意的是: ObservableCreate
是 Observable
的一个子类 ObservableCreate
被创建都会传入一个 source
的字段,这个 source
就是 ObservableOnSubscribe
。
在 ObservableCreate
具体实现了 subscribeActual
方法
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); // 在这里触发 observer#onSubscribe() observer.onSubscribe(parent); try { // 在这里回调 source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } 复制代码
在这里方法可以看到 观察者 observer
的 onSubscribe
会先于回调发生。
然后调用 ObservableOnSubscribe
的方法 subscribe
具体的事件后由开发者去做,
可以看到在案例中调用了 CreateEmitter
,可以进入到 CreateEmitter
看看 onNext()
的实现
@Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } 复制代码
可以看到 在 CreateEmitter
的 onNext()
中调用了 观察者 observer
的 onNext()
方法.
然后可以看到案例中的调用:
@Override public void onNext(String s) { Log.i(TAG, "onNext: s = " + s); } 复制代码