这段时间学习 RxJava 2.x 的笔记
// 1. 创建上游(被观察者) Observable observable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter)throws Exception { emitter.onNext(1); emitter.onError(); emitter.onComplete(); } }); // 2. 创建下游(观察者) Observer observer = new Observer<Integer>() { Disposable disposable = null; @Override public void onSubscribe(Disposable d){ Log.d(TAG, "onSubscribe: "); disposable = d; } @Override public void onNext(Integer o){ log.d(TAG, "onNext: " + o) } @Override public void onError(Throwable e){ Log.d(TAG, "onError: "); } @Override public void onComplete(){ Log.d(TAG, "onComplete: "); } }; // 3. 建立连接(订阅) observable.subscribe(observer); // 4. 打印结果: onSubscribe: onNext: 1 onComplete:
public final Disposable subscribe(){} public final Disposable subscribe(Consumer<?super T> onNext){} public final Disposable subscribe(Consumer<?super T> onNext, Consumer<? super Throwable> onError){} public final Disposable subscribe(Consumer<?super T> onNext, Consumer<? super Throwable> onError, Action onComplete){} public final Disposable subscribe(Consumer<?super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe){} public final void subscribe(Observer<?super T> observer){}
// 1. 创建上游 Flowable flowable = Flowable.create(new FlowableOnSubscribe() { @Override public void subscribe(FlowableEmitter emitter)throws Exception { emitter.onNext(1); emitter.onComplete(); // emitter.onError(); } }, BackpressureStrategy.ERROR); // 2. 创建下游 Subscriber subscriber = new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s){ Log.d(TAG, "onSubscribe: "); s.request(1); } @Override public void onNext(Integer integer){ Log.d(TAG, "onNext: " + integer); } @Override public void onError(Throwable t){ Log.d(TAG, "onError: "); } @Override public void onComplete(){ Log.d(TAG, "onComplete: "); } }; // 3. 建立连接(订阅) flowable.subscribe(subscriber);
MissingBackpressureException
// 1. 创建上游 Single<Integer> single = Single.create(new SingleOnSubscribe<Integer>() { @Override public void subscribe(SingleEmitter<Integer> e)throws Exception { for (int i = 0; i < 3; i++) { e.onSuccess(i); Log.d(TAG, "subscribe: " + i); } } }); // 2. 创建下游 SingleObserver<Integer> observer = new SingleObserver<Integer>() { @Override public void onSubscribe(Disposable d){ Log.d(TAG, "onSubscribe: "); } @Override public void onSuccess(Integer integer){ // 相当于 onNext() 和 onComplete() Log.d(TAG, "onSuccess: " + integer); } @Override public void onError(Throwable e){ Log.d(TAG, "onError: "); } }; // 3. 建立连接(订阅) single.subscribe(observer); // 4. 打印结果: onSubscribe: onSuccess: 0 subscribe: 0 subscribe: 1 subscribe: 2
// 1. 创建上游 Completable completable = Completable.create(new CompletableOnSubscribe() { @Override public void subscribe(CompletableEmitter e)throws Exception { for (int i = 0; i < 3; i++) { e.onComplete(); Log.d(TAG, "subscribe: " + i); } } }); // 2. 创建下游 CompletableObserver observer = new CompletableObserver() { @Override public void onSubscribe(Disposable d){ Log.d(TAG, "onSubscribe: "); } @Override public void onComplete(){ Log.d(TAG, "onComplete: "); } @Override public void onError(Throwable e){ Log.d(TAG, "onError: " + e.getMessage()); } }; // 3. 建立连接(订阅) completable.subscribe(observer); // 4. 打印结果: onSubscribe: onComplete: subscribe: 0 subscribe: 1 subscribe: 2
// 1. 创建上游 Maybe<Integer> maybe = Maybe.create(new MaybeOnSubscribe<Integer>() { @Override public void subscribe(MaybeEmitter<Integer> e)throws Exception { for (int i = 0; i < 3; i++) { e.onSuccess(i); // 不会起作用 e.onComplete(); } } }); // 2. 创建下游 MaybeObserver<Integer> observer = new MaybeObserver<Integer>() { @Override public void onSubscribe(Disposable d){ Log.d(TAG, "onSubscribe: "); } @Override public void onSuccess(Integer integer){ Log.d(TAG, "onSuccess: " + integer); } @Override public void onError(Throwable e){ Log.d(TAG, "onError: " + e.getMessage()); } @Override public void onComplete(){ Log.d(TAG, "onComplete: "); } }; // 3. 建立连接(订阅) maybe.subscribe(observer); // 4. 打印结果: onSubscribe: onSuccess: 0
Schedulers.immediate()
: 默认的 Scheduler
,直接在当前线程运行。 Schedulers.newThread()
: 总是启用新线程,并在新线程执行操作。 Schedulers.io()
: I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler
。行为模式和 newThread()
差不多,区别在于 io()
的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io()
比 newThread()
更有效率。不要把计算工作放在 io()
中,可以避免创建不必要的线程。 Schedulers.computation()
: 计算所使用的 Scheduler
。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler
使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation()
中,否则 I/O 操作的等待时间会浪费 CPU。 AndroidSchedulers.mainThread()
: Android 专用的,它指定的操作将在 Android 主线程运行。 onNext()
方法中完成的:如果 AtomicLong 不等于 0 就发送一个数据并且 AtomicLong 减 1,前 128 个数据都会到达下游的缓存队列中进行缓存;当 AtomicLong 为 0 时不再发送数据到下游的缓存队列,而是调用 ErrorAsyncEmitter 的实现方法 onOverflow()
, onOverflow()
方法里面调用 onError()
方法抛出 MissingBackpressureException 异常 runAsync()
方法中发送给外界:首先从下游缓存队列中取 100 个数据发送出去(即 onNext() 被调用),发送一个数据就 e++
,每当取到第 96 个数据的时候先 r = requested -= 96; e = 0L;
,然后再请求上游发送 96 个数据到下游的缓存队列(此时如果上游继续发送数据(e.onNext(i)),由于 AtomicLong 大于 0 会继续发送数据到下游缓存队列,否则就不发送数据到下游),与此同时下游继续从缓存队列取数据发送出去,发送一个数据就 e++
,直到 while(e != r)
不成立导致不再发送给外界。此时如果外界主动调用 s.request(n)
请求数据将继续发送数据给外界 >= 96
时抛出 MissingBackpressureException 异常。因为每次下游都是请求 96 个数据,96 保存在上游的 AtomicLong 中,发送一个数据就减 1,当 AtomicLong 为 0 时就抛出 MissingBackpressureException 异常 onNext()
方法中先用 queue.offer(t)
保存发送过的所有数据,然后再调用 drain()
方法完成数据发送:每发送一个数据就减 1,前 128 个数据都会到达下游的缓存队列中进行缓存,当 AtomicLong 为 0 时不再发送数据到下游的缓存队列 runAsync()
方法中发送给外界:首先从下游缓存队列中取 100 个数据发送出去(即 onNext() 被调用),发送一个数据就 e++
,每当取到第 96 个数据的时候先 r = requested -= 96; e = 0L;
,然后再请求上游发送 96 个数据到下游的缓存队列,与此同时继续取 4 个数据发送出去,发送一个数据就 e++
,这时发送了 4 个数据后 while(e != r)
不成立导致不再发送给外界 request()
方法设置 AtomicLong 的值为 96,再去调用 BufferAsyncEmitter 实现的 onRequested()
方法, onRequested()
中再调用 drain()
方法完成数据的发送。 drain()
方法会从 queue 中取出未发送过的数据发送给下游的缓存队列,然后下游再从缓存队列中取出数据发送给外界 request()
方法中去调用空方法 onRequested()
,而 onNext()
方法不再被调用导致不再发送数据到下游缓存队列,仅仅是从下游缓存队列中取出数据发送给外界 onNext()
方法中完成的:如果 AtomicLong 不等于 0 就发送一个数据并且 AtomicLong 减 1,前 128 个数据都会到达下游的缓存队列中进行缓存;当 AtomicLong 为 0 时不再发送数据到下游的缓存队列,而是调用 onOverflow()
, onOverflow()
是个空方法,也就是丢弃数据 runAsync()
方法中发送给外界:首先从下游缓存队列中取 100 个数据发送出去(即 onNext() 被调用),发送一个数据就 e++
,每当取到第 96 个数据的时候先 r = requested -= 96; e = 0L;
,然后再请求上游发送 96 个数据到下游的缓存队列,与此同时继续取 4 个数据发送出去,发送一个数据就 e++
,这时发送了 4 个数据后 while(e != r)
不成立导致不再发送给外界 request()
方法中去调用空方法 onRequested()
,而 onNext()
方法不再被调用导致不再发送数据到下游缓存队列,仅仅是从下游缓存队列中取出数据发送给外界 onNext()
方法中先用 queue(AtomicReference对象) 保存当前发送的数据, 所以发送完所有数据后 queue 保存的是最后一个数据
,然后再调用 drain()
方法完成数据发送:每发送一个数据就减 1,前 128 个数据都会到达下游的缓存队列中进行缓存,当 AtomicLong 为 0 时不再发送数据到下游的缓存队列 runAsync()
方法中发送给外界:首先从下游缓存队列中取 100 个数据发送出去(即 onNext() 被调用),发送一个数据就 e++
,每当取到第 96 个数据的时候先 r = requested -= 96; e = 0L;
,然后再请求上游发送 96 个数据到下游的缓存队列,与此同时继续取 4 个数据发送出去,发送一个数据就 e++
,这时发送了 4 个数据后 while(e != r)
不成立导致不再发送给外界 request()
方法中去调用 LatestAsyncEmitter 实现的 onRequested()
方法, onRequested()
中再调用 drain()
方法完成数据的发送。 drain()
方法会把 queue 中保存的最后一个数据发送给下游的缓存队列,然后下游再从缓存队列中取出数据发送给外界,所以 LATEST 策略总是可以请求到上游的最后一个数据 onBackpressureBuffer()
、 onBackpressureDrop()
、 onBackpressureLatest()
)来指定背压策略 Observable.just(1, 2).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d){ Log.d(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer){ Log.d(TAG, "onNext: " + integer); } @Override public void onError(Throwable e){ Log.d(TAG, "onError: " + e.getMessage()); } @Override public void onComplete(){ Log.d(TAG, "onComplete: "); } }); // 打印结果: onSubscribe: onNext: 1 onNext: 2 onComplete:
fromArray(T... items)
public static <T> Observable<T>fromArray(T... items){ ObjectHelper.requireNonNull(items, "items is null"); if (items.length == 0) { return empty(); } else if (items.length == 1) { return just(items[0]); } return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items)); }
int[] ints = new int[]{1, 2}; Observable.fromArray(1, 2).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d){ Log.d(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer){ // 如果传入的是数组,此处的参数也是数组 Log.d(TAG, "onNext: " + integer); } @Override public void onError(Throwable e){ Log.d(TAG, "onError: " + e.getMessage()); } @Override public void onComplete(){ Log.d(TAG, "onComplete: "); } }); // 打印结果: onSubscribe: onNext: 1 onNext: 2 onComplete:
Observable.empty().subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d){ Log.d(TAG, "onSubscribe: "); } @Override public void onNext(Object o){ Log.d(TAG, "onNext: " + o); } @Override public void onError(Throwable e){ Log.d(TAG, "onError: "); } @Override public void onComplete(){ Log.d(TAG, "onComplete: "); } }); // 打印结果: onSubscribe: onComplete:
List<Integer> list = new ArrayList<>(); list.add(1); list.add(2); Observable.fromIterable(list).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d){ Log.d(TAG, "onSubscribe: "); } @Override public void onNext(Integer integer){ Log.d(TAG, "onNext: " + integer); } @Override public void onError(Throwable e){ Log.d(TAG, "onError: " + e.getMessage()); } @Override public void onComplete(){ Log.d(TAG, "onComplete: "); } }); // 输出结果: onSubscribe: onNext: 1, Thread[main,5,main] onNext: 2, Thread[main,5,main] onComplete:
timer(long delay, TimeUnit unit) timer(long delay, TimeUnit unit, Scheduler scheduler)
Observable.timer(1, TimeUnit.SECONDS).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d){ Log.d(TAG, "onSubscribe: "); } @Override public void onNext(Long aLong){ Log.d(TAG, "onNext: " + aLong); } @Override public void onError(Throwable e){ Log.d(TAG, "onError: " + e.getMessage()); } @Override public void onComplete(){ Log.d(TAG, "onComplete: "); } }); // 输入结果: onSubscribe: Thread[main,5,main] onNext: 0, Thread[RxComputationThreadPool-1,5,main] onComplete: Thread[RxComputationThreadPool-1,5,main]
interval(long period, TimeUnit unit) // 每隔 period 发送一次 onNext() 事件 interval(long period, TimeUnit unit, Scheduler scheduler) // 每隔 period 发送一次 onNext() 事件,可指定发送事件所在的线程 interval(long initialDelay, long period, TimeUnit unit) // 一开始延迟 initialDelay,后面每隔 period 发送一次 onNext() 事件 interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) // 一开始延迟 initialDelay,后面每隔 period 发送一次 onNext() 事件,可指定发送事件所在的线程
Observable.interval(1, TimeUnit.SECONDS).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d){ Log.d(TAG, "onSubscribe: " + Thread.currentThread()); } @Override public void onNext(Long aLong){ Log.d(TAG, "onNext: " + aLong + ", " + Thread.currentThread()); } @Override public void onError(Throwable e){ Log.d(TAG, "onError: " + e.getMessage()); } @Override public void onComplete(){ Log.d(TAG, "onComplete: " + Thread.currentThread()); } });
intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit) intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
Observable.intervalRange(3, 3, 0, 1, TimeUnit.SECONDS).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d){ Log.d(TAG, "onSubscribe: " + Thread.currentThread()); } @Override public void onNext(Long aLong){ Log.d(TAG, "onNext: " + aLong + ", " + Thread.currentThread()); } @Override public void onError(Throwable e){ Log.d(TAG, "onError: " + e.getMessage()); } @Override public void onComplete(){ Log.d(TAG, "onComplete: " + Thread.currentThread()); } }); // 输出结果: onSubscribe: Thread[main,5,main] onNext: 3, Thread[RxComputationThreadPool-1,5,main] onNext: 4, Thread[RxComputationThreadPool-1,5,main] onNext: 5, Thread[RxComputationThreadPool-1,5,main] onComplete: Thread[RxComputationThreadPool-1,5,main]
range(final int start, final int count) rangeLong(long start, long count)
Observable.range(1, 3).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d){ Log.d(TAG, "onSubscribe: " + Thread.currentThread()); } @Override public void onNext(Integer integer){ Log.d(TAG, "onNext: " + integer + ", " + Thread.currentThread()); } @Override public void onError(Throwable e){ Log.d(TAG, "onError: " + e.getMessage()); } @Override public void onComplete(){ Log.d(TAG, "onComplete: " + Thread.currentThread()); } }); // 输出结果: onSubscribe: Thread[main,5,main] onNext: 1, Thread[main,5,main] onNext: 2, Thread[main,5,main] onNext: 3, Thread[main,5,main] onComplete: Thread[main,5,main]
// observable1 Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter)throws Exception { emitter.onNext(i); } }).subscribeOn(Schedulers.io()); // observable2 Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter)throws Exception { emitter.onNext("A"); } }).subscribeOn(Schedulers.io()); // zip Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() { @Override public String apply(Integer integer, String s)throws Exception { return integer + s; } }).observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<String>() { @Override public void accept(String s)throws Exception { Log.d(TAG, s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable)throws Exception { Log.w(TAG, throwable); } });
// 每隔 2 秒从上游取出一个事件发送给下游 Observable.create(...).sample(2, TimeUnit.SECONDS)
Observable.create(...).filter(new Predicate<Object>() { @Override public boolean test(Object o)throws Exception { // 返回 true 才继续往下走 return ...; } })
take(long count) // 发送 count 个事件给下游 take(long time, TimeUnit unit) // 发送多久 take(long time, TimeUnit unit, Scheduler scheduler) // 指定线程发送多久
// 发送 3 个事件 Flowable.interval(1, TimeUnit.SECONDS) .take(3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer)throws Exception { Log.d(TAG, "accept: " + integer); } }); // 发送 300 毫秒 Flowable.interval(1, TimeUnit.SECONDS) .take(3000, TimeUnit.MILLISECONDS) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer)throws Exception { Log.d(TAG, "accept: " + integer); } });