本文总结项目中常用的 Rxjava2 操作符。
Single
:大多数场景都是单值发射,所以使用 Single
即可覆盖大部分场景。
Single.fromCallable(new Callable<Integer>() { @Override public Integer call() throws Exception { Logger.d(TAG, "test: fromCallable() invoked on %s", Thread.currentThread().getName()); return generateRandom(); } }).subscribeOn(Schedulers.io()).subscribe(); 复制代码
从名字 Callable
就能看出,这是个回调函数,在 io 线程运行
Single.defer(new Callable<SingleSource<Integer>>() { @Override public SingleSource<Integer> call() throws Exception { Logger.d(TAG, "test: defer() invoked on %s", Thread.currentThread().getName()); return Single.just(generateRandom()); } }).subscribeOn(Schedulers.io()).subscribe(); 复制代码
注意,不能直接使用 Single.just(generateRandom()).subscribeOn(Schedulers.io()).subscribe()
,这么写将直接在当前线程调用 generateRandom()
,无法实现在 io 线程执行的效果。
需要关注结果的场景,建议都订阅 Consumer<Throwable>
,因为 RxJava
内部捕获了 Exception
,导致外部无感知
Single.fromCallable(new Callable<Integer>() { @Override public Integer call() throws Exception { return generateRandom(); } }).subscribeOn(Schedulers.io()).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Logger.d(TAG, "test: accept(Integer integer) invoked on %s", Thread.currentThread().getName()); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Logger.d(TAG, "test: accept(Throwable throwable) invoked on %s", Thread.currentThread().getName()); } }); 复制代码
Single.fromCallable(new Callable<Integer>() { @Override public Integer call() throws Exception { return generateRandom(); } }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Logger.d(TAG, "test: accept(Integer integer) invoked on %s", Thread.currentThread().getName()); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Logger.d(TAG, "test: accept(Integer integer) invoked on %s", Thread.currentThread().getName()); } }); 复制代码
Single.fromCallable(new Callable<Integer>() { @Override public Integer call() throws Exception { return generateRandom(); } }).map(new Function<Integer, String>() { @Override public String apply(Integer integer) throws Exception { return String.valueOf(integer + "_mapped"); } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Logger.d(TAG, "test: " + s); } }); 复制代码
Single.defer(new Callable<SingleSource<String>>() { @Override public SingleSource<String> call() throws Exception { return getUserId(); } }).flatMap(new Function<String, SingleSource<UserInfo>>() { @Override public SingleSource<UserInfo> apply(String userId) throws Exception { return getUserInfo(userId); } }).subscribeOn(Schedulers.io()).subscribe(new Consumer<UserInfo>() { @Override public void accept(UserInfo userInfo) throws Exception { Logger.d(TAG, "test: get userInfo success: " + userInfo); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Logger.e(TAG, "test: get userInfo error.", throwable); } }); 复制代码
Single<IBook> novel = Single.fromCallable(new Callable<Novel>() { @Override public Novel call() throws Exception { return getNovel(); } }).map(new Function<Novel, IBook>() { @Override public IBook apply(Novel novel) throws Exception { return new NovelAdapter(novel); } }).subscribeOn(Schedulers.io()); Single<IBook> rxJava2Tutorial = Single.fromCallable(new Callable<RxJava2Tutorial>() { @Override public RxJava2Tutorial call() throws Exception { return getRxJava2Tutorial(); } }).map(new Function<RxJava2Tutorial, IBook>() { @Override public IBook apply(RxJava2Tutorial rxJava2Tutorial) throws Exception { return new RxJava2TutorialAdapter(rxJava2Tutorial); } }).subscribeOn(Schedulers.io()); Single.zip(novel, rxJava2Tutorial, new BiFunction<IBook, IBook, List<IBook>>() { @Override public List<IBook> apply(IBook iBook, IBook iBook2) throws Exception { List<IBook> books = new ArrayList<>(2); books.add(iBook); books.add(iBook2); return books; } }).subscribe(new Consumer<List<IBook>>() { @Override public void accept(List<IBook> iBooks) throws Exception { Logger.d(TAG, "test: books are " + iBooks); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Logger.d(TAG, "test: get books error.", throwable); } }); 复制代码
Observable
:使用场景较少,比如搜索功能需要不断发射搜索关键字。
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(final ObservableEmitter<String> emitter) throws Exception { mEditText.addTextChangedListener(new TextWatcher() { @Override public void beforeTextChanged(CharSequence s, int start, int count, int after) { } @Override public void onTextChanged(CharSequence s, int start, int before, int count) { } @Override public void afterTextChanged(Editable s) { if (!emitter.isDisposed()) { emitter.onNext(s.toString().trim()); } } }); } }).debounce(200, TimeUnit.MILLISECONDS).observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(String keyword) throws Exception { mTextView.setText(search(keyword)); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Logger.e(TAG, "test: emitter keyword error.", throwable); } }); 复制代码
Flowable
:使用场景最少,目前仅发射下载进度时可以用上。