经过两晚阅读RxJava源码的时间,终于把RxJava从观察者订阅被观察者、到被观察者发射订阅信息给观察者、到观察者解除订阅被观察者的信息,这一串基本流程通过流程图的形式体现了出来。所读源码版本是RxJava2.2.13,以下是流程图对应的示例代码。
/** * rxjava2 流程示例代码 */ private void rxJava2Flow() { /** * 被观察者的订阅,当观察者订阅了被观察者后,此回调将会被调用,发射器将会发送信号 */ ObservableOnSubscribe<Integer> observableOnSubscribe = new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { System.out.println("已经订阅:subscribe,获取发射器"); emitter.onNext(1); System.out.println("信号发射:"+1); emitter.onNext(2); System.out.println("信号发射:"+2); emitter.onNext(3); System.out.println("信号发射:"+3); emitter.onNext(4); System.out.println("信号发射:"+4); emitter.onComplete(); System.out.println("信号发射:onComplete"); } }; /** * 创建被观察者,并带上被观察者的订阅 */ Observable<Integer> observable = Observable.create(observableOnSubscribe); /** * 订阅解除器 */ final Disposable[] disposable = new Disposable[1]; /** * 创建观察者 */ Observer<Integer> observer = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { disposable[0] = d; System.out.println("已经订阅:onSubscribe,获取解除器"); } @Override public void onNext(Integer integer) { System.out.println("信号接收:onNext "+integer); if (integer==2){ disposable[0].dispose(); System.out.println("解除订阅:dispose"); } } @Override public void onError(Throwable e) { System.out.println("信号接收:onError "+e.getMessage()); } @Override public void onComplete() { System.out.println("信号接收:onComplete"); } }; /** * 订阅 */ System.out.println("开始订阅:subscribe"); observable.subscribe(observer); } 复制代码
转载请注明出处。