RxJava
已经成为了 Android
开发必备的技能,鉴于大部分敏捷开发团队,掌握 RxJava
和 Retroft
快速开发网络框架,能大幅度减少网络框架重构时间。
有大量文章教程在写 RxJava
如何使用,如扔物线的RxJava详解,还有南尘的RxJava2.x 教程,那我为什么写这多余的文章呢?
本文不是一个指导如何使用 RxJava
特性和基本的探索,而是从更高的角度上挖掘它,去了解代码库是怎么做的,内部工作原理又是怎样的,相比其他网络请求框架有什么优势?
RxJava
在 GitHub 主页上的简介是
"Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM."
翻译过来就是:活性扩展JVM库编写异步和基于事件的程序使用Java VM可观察序列。
很费解的一句话,你可以这么理解,压缩版能优简化代码的异步请求库。
RxJava
, Retrofit
, Rxandroid
相关依赖 implementation 'com.squareup.retrofit2:retrofit:2.5.0' implementation 'com.squareup.retrofit2:converter-gson:2.5.0' implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0' implementation 'io.reactivex.rxjava2:rxandroid:2.1.1' implementation 'io.reactivex.rxjava2:rxjava:2.2.8' 复制代码
<uses-permission android:name="android.permission.INTERNET"/> 复制代码
访问 小木箱 github 仓库,通过get请求得到了以下报文:
然后,通过 Gsonformat 得到相关实体类对象:
class MicroKibacoRepo { private int id; private String node_id; private String name; private String full_name; // ---为了避免浪费篇幅,省略无用代码--- } 复制代码
Single interface
作为 Web Service
的请求集合,在⾥⾯⽤注解( Annotation
)写⼊需要配置的请求方法 public interface Api { @GET("users/{username}/repos") Single<List<MicroKibacoRepo>> listRepos(@Path("username") String user); } 复制代码
Retrofit
创建出 interface
的实例 Retrofit retrofit = new Retrofit.Builder() .baseUrl("https://api.github.com/") .addConverterFactory(GsonConverterFactory.create(gson)) .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) .build(); Api api = retrofit.create(Api.class); 复制代码
observeOn
, observeOn
和 subscribe
等订阅事件切换线程达到网络请求效果。 api.listRepos("MicroKibaco") .subscribeOn(Schedulers.io()) // 切换网络请求 .observeOn(AndroidSchedulers.mainThread()) // 切换到UI线程 .subscribe(new SingleObserver<List<MicroKibacoRepo>>() { @Override public void onSubscribe(Disposable d) { // 订阅者模式,我向你订阅事件 mDisposable = d; } @Override public void onSuccess(List<MicroKibacoRepo> microKibacoRepos) { text_view.setText(microKibacoRepos.get(0).getName()); } @Override public void onError(Throwable e) { String msg = e.getMessage(); if (msg == null) { msg = e.getClass().getName(); } text_view.setText(msg); } }); 复制代码
一个简单的 RxJava
网络处理库就做好了,下面我们来分析一下,具体 Api
的底层实现: RxJava
的整体结构是一条链,其中:
Observable
。 Observer
。 Observable
,又是上游的 Observer
。 我们访问 Single
的 just
方法发现: 里面有一个关键的钩子方法: onAssembly
。
public static <T> Single<T> just(final T item) { ObjectHelper.requireNonNull(item, "item is null"); // 钩子方法 return RxJavaPlugins.onAssembly(new SingleJust<T>(item)); } public static <T> Single<T> onAssembly(@NonNull Single<T> source) { // 查询静态对象,是否存在 Function<? super Single, ? extends Single> f = onSingleAssembly; // 如果存在,额外处理 if (f != null) { return apply(f, source); } // 如果不存在,直接返回 return source; } 复制代码
整个 Single.just
内部其实创建了一个SingleJust, SingleJust
里面有一个关键方法: subscribeActual
而 subscribeActual
有一个比较重要的抽象方法 onSubscribe
和 onSuccess
。
public final class SingleJust<T> extends Single<T> { final T value; public SingleJust(T value) { this.value = value; } @Override protected void subscribeActual(SingleObserver<? super T> observer) { observer.onSubscribe(Disposables.disposed()); observer.onSuccess(value); } } 复制代码
整个方法做的事情就是优先执行 onSubscribe
,后执行 onSuccess
。
// just 方法创建了一个上层被观察的对象 Single<String> single = Single.just("1"); single = single.subscribeOn(Schedulers.io()); single = single.subscribeOn(AndroidSchedulers.mainThread()); // subscribe 把观察者传进来 single.subscribe(new SingleObserver<String>() { // subscribe 内部会调用 subscribeActual @Override public void onSubscribe(Disposable d) { } @Override public void onSuccess(String s) { text_view.setText(s); } @Override public void onError(Throwable e) { String msg = e.getMessage(); if (msg == null) { msg = e.getClass().getName(); } text_view.setText(msg); } }); 复制代码
而 Disposable
的 disposed
方法其实是:在我不需要订阅关系的时候,我们切断这种关系。
public final class SingleMap<T, R> extends Single<R> { final SingleSource<? extends T> source; final Function<? super T, ? extends R> mapper; public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) { this.source = source; this.mapper = mapper; } @Override protected void subscribeActual(final SingleObserver<? super R> t) { source.subscribe(new MapSingleObserver<T, R>(t, mapper)); } static final class MapSingleObserver<T, R> implements SingleObserver<T> { final SingleObserver<? super R> t; final Function<? super T, ? extends R> mapper; MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) { this.t = t; this.mapper = mapper; } @Override public void onSubscribe(Disposable d) { t.onSubscribe(d); } @Override public void onSuccess(T value) { R v; try { v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value."); } catch (Throwable e) { Exceptions.throwIfFatal(e); onError(e); return; } t.onSuccess(v); } @Override public void onError(Throwable e) { t.onError(e); } } } 复制代码
可以通过 dispose() 方法来让上游停止工作,达到 『丢弃』 的效果
原理: 在 Scheduler 指定的线程里启动 subscribe()
原理: 在内部创建的 Observer 的 onNext() onError() onSuccess() 等回调方法里面,通过 Scheduler 指定的线程 来调用下级 Observer 的 对应回调方法
Schedulers.newThread() 和 Schedulers.io():
AndroidSchedulers.mainThread():
通过内部的 Handle 把任务放到主线程去做