转载

Android高手进阶(RxJava2)

RxJava 已经成为了 Android 开发必备的技能,鉴于大部分敏捷开发团队,掌握 RxJavaRetroft 快速开发网络框架,能大幅度减少网络框架重构时间。

有大量文章教程在写 RxJava 如何使用,如扔物线的RxJava详解,还有南尘的RxJava2.x 教程,那我为什么写这多余的文章呢?

本文不是一个指导如何使用 RxJava 特性和基本的探索,而是从更高的角度上挖掘它,去了解代码库是怎么做的,内部工作原理又是怎样的,相比其他网络请求框架有什么优势?

概述

Android高手进阶(RxJava2)
RxJava 在 GitHub 主页上的简介是 "Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM."

翻译过来就是:活性扩展JVM库编写异步和基于事件的程序使用Java VM可观察序列。

很费解的一句话,你可以这么理解,压缩版能优简化代码的异步请求库。

使用:

1. 添加 RxJava , Retrofit , Rxandroid 相关依赖

implementation 'com.squareup.retrofit2:retrofit:2.5.0'
    implementation 'com.squareup.retrofit2:converter-gson:2.5.0'
    implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0'
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
    implementation 'io.reactivex.rxjava2:rxjava:2.2.8'
复制代码

2. 添加网络权限

<uses-permission android:name="android.permission.INTERNET"/>
复制代码

3. 创建实体类

访问 小木箱 github 仓库,通过get请求得到了以下报文:

Android高手进阶(RxJava2)

然后,通过 Gsonformat 得到相关实体类对象:

class MicroKibacoRepo {


   private int id;
   private String node_id;
   private String name;
   private String full_name;
  // ---为了避免浪费篇幅,省略无用代码--- 
}
复制代码

4. 创建⼀个 Single interface 作为 Web Service 的请求集合,在⾥⾯⽤注解( Annotation )写⼊需要配置的请求方法

public interface Api {
    @GET("users/{username}/repos")
    Single<List<MicroKibacoRepo>> listRepos(@Path("username") String user);
}
复制代码

5. 在正式代码里⽤ Retrofit 创建出 interface 的实例

Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("https://api.github.com/")
                .addConverterFactory(GsonConverterFactory.create(gson))
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .build();

Api api = retrofit.create(Api.class);
复制代码

6. 通过 observeOn , observeOnsubscribe 等订阅事件切换线程达到网络请求效果。

api.listRepos("MicroKibaco")
                .subscribeOn(Schedulers.io()) // 切换网络请求
                .observeOn(AndroidSchedulers.mainThread()) // 切换到UI线程
                .subscribe(new SingleObserver<List<MicroKibacoRepo>>() {
                    @Override
                    public void onSubscribe(Disposable d) {
// 订阅者模式,我向你订阅事件
                        mDisposable = d;
                    }

                    @Override
                    public void onSuccess(List<MicroKibacoRepo> microKibacoRepos) {
                        text_view.setText(microKibacoRepos.get(0).getName());
                    }

                    @Override
                    public void onError(Throwable e) {
                        String msg = e.getMessage();

                        if (msg == null) {

                            msg = e.getClass().getName();

                        }

                        text_view.setText(msg);

                    }
                });
复制代码

框架架构

一个简单的 RxJava 网络处理库就做好了,下面我们来分析一下,具体 Api 的底层实现: RxJava 的整体结构是一条链,其中:

  1. 链的最上游: 生产者 Observable
  2. 链的最下游: 观察者 Observer
  3. 链的中间: 各个中介点,既是下游 的 Observable ,又是上游的 Observer

Single

我们访问 Singlejust 方法发现: 里面有一个关键的钩子方法: onAssembly

public static <T> Single<T> just(final T item) {
        ObjectHelper.requireNonNull(item, "item is null");
        // 钩子方法
        return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
    }
    
        public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
    
         // 查询静态对象,是否存在
         Function<? super Single, ? extends Single> f = onSingleAssembly;
            // 如果存在,额外处理
            if (f != null) {
                return apply(f, source);
            }
            
             // 如果不存在,直接返回
            return source;
        }
复制代码

整个 Single.just 内部其实创建了一个SingleJust, SingleJust 里面有一个关键方法: subscribeActualsubscribeActual 有一个比较重要的抽象方法 onSubscribeonSuccess

public final class SingleJust<T> extends Single<T> {

    final T value;

    public SingleJust(T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposables.disposed());
        observer.onSuccess(value);
    }

}
复制代码

整个方法做的事情就是优先执行 onSubscribe ,后执行 onSuccess

// just 方法创建了一个上层被观察的对象
 Single<String> single = Single.just("1");
        single = single.subscribeOn(Schedulers.io());
        single = single.subscribeOn(AndroidSchedulers.mainThread());
        
 // subscribe 把观察者传进来
        single.subscribe(new SingleObserver<String>() {
        
 // subscribe 内部会调用 subscribeActual       
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(String s) {
                text_view.setText(s);
            }

            @Override
            public void onError(Throwable e) {
                String msg = e.getMessage();

                if (msg == null) {

                    msg = e.getClass().getName();

                }

                text_view.setText(msg);
            }
        });
复制代码

Disposabledisposed 方法其实是:在我不需要订阅关系的时候,我们切断这种关系。

操作符 Operator( map() 等等)

  1. 基于原 Observable 创建一个新的 Observable
  2. Observable 内部创建一个 Observer
  3. 通过定制 Observable 的 subscribeActual() 方法和 Observer 的 onXxx()方法,来实现自己的中介角色(例如:数据切换,线程切换)
public final class SingleMap<T, R> extends Single<R> {
    final SingleSource<? extends T> source;

    final Function<? super T, ? extends R> mapper;

    public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }

    static final class MapSingleObserver<T, R> implements SingleObserver<T> {

        final SingleObserver<? super R> t;

        final Function<? super T, ? extends R> mapper;

        MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
            this.t = t;
            this.mapper = mapper;
        }

        @Override
        public void onSubscribe(Disposable d) {
            t.onSubscribe(d);
        }

        @Override
        public void onSuccess(T value) {
            R v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                onError(e);
                return;
            }

            t.onSuccess(v);
        }

        @Override
        public void onError(Throwable e) {
            t.onError(e);
        }
    }
}

复制代码

Disposable

可以通过 dispose() 方法来让上游停止工作,达到 『丢弃』 的效果

subscribeOn()

原理: 在 Scheduler 指定的线程里启动 subscribe()

效果

  • 切换起源 Observable 的线程
  • 当多次调用 subscribeOn() 的时候,只有最上面的会对起源 Observable 起作用
Android高手进阶(RxJava2)

observeOn()

原理: 在内部创建的 Observer 的 onNext() onError() onSuccess() 等回调方法里面,通过 Scheduler 指定的线程 来调用下级 Observer 的 对应回调方法

效果

  • 切换 observeOn() 下面的 Observer 的回调所在的线程
  • 当多次调用 observeOn()的时候 ,每个都会进行一次线程切换,影响范围是它下面的每个 Observer (除非又遇到新的 observeOn())

Scheduler 的原理

  1. Schedulers.newThread() 和 Schedulers.io():

    • 当 scheduleDirect() 被调用的时候,会创建一个 Worker,这个 Worker 的内部会有一个 Executor,由 一个 Executor 来完成实际线程切换
    • scheduleDirect() 还会创建出一个 Disposable 对象,交给外侧 Observer,让它 能执行 dispose() 操作,取消订阅链
    • newThread() 和 io() 的区别在于,io() 可能会对 Executor 进行重写
  2. AndroidSchedulers.mainThread():

    通过内部的 Handle 把任务放到主线程去做

原文  https://juejin.im/post/5d63e5a16fb9a06b155dcb9d
正文到此结束
Loading...