转载

RxJava2订阅、发射、解除流程图

经过两晚阅读RxJava源码的时间,终于把RxJava从观察者订阅被观察者、到被观察者发射订阅信息给观察者、到观察者解除订阅被观察者的信息,这一串基本流程通过流程图的形式体现了出来。所读源码版本是RxJava2.2.13,以下是流程图对应的示例代码。

/**
     * rxjava2 流程示例代码
     */
    private void rxJava2Flow() {
        /**
         * 被观察者的订阅,当观察者订阅了被观察者后,此回调将会被调用,发射器将会发送信号
         */
        ObservableOnSubscribe<Integer> observableOnSubscribe = new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                System.out.println("已经订阅:subscribe,获取发射器");
                emitter.onNext(1);
                System.out.println("信号发射:"+1);
                emitter.onNext(2);
                System.out.println("信号发射:"+2);
                emitter.onNext(3);
                System.out.println("信号发射:"+3);
                emitter.onNext(4);
                System.out.println("信号发射:"+4);
                emitter.onComplete();
                System.out.println("信号发射:onComplete");
            }
        };
        /**
         * 创建被观察者,并带上被观察者的订阅
         */
        Observable<Integer> observable = Observable.create(observableOnSubscribe);


        /**
         * 订阅解除器
         */
        final Disposable[] disposable = new Disposable[1];
        /**
         * 创建观察者
         */
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                disposable[0] = d;
                System.out.println("已经订阅:onSubscribe,获取解除器");
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("信号接收:onNext "+integer);
                if (integer==2){
                    disposable[0].dispose();
                    System.out.println("解除订阅:dispose");
                }

            }

            @Override
            public void onError(Throwable e) {
                System.out.println("信号接收:onError "+e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("信号接收:onComplete");
            }
        };

        /**
         * 订阅
         */
        System.out.println("开始订阅:subscribe");
        observable.subscribe(observer);
    }
复制代码

1、订阅

RxJava2订阅、发射、解除流程图

2、发射

RxJava2订阅、发射、解除流程图

3、解除

RxJava2订阅、发射、解除流程图

∞、备注

转载请注明出处。

原文  https://juejin.im/post/5da0474f5188253f5447c321
正文到此结束
Loading...