在很久之前就一直想整理一下rxjava,但是一直没有时间,最近是因为离职了,总算有时间整理一下了。因为打算每篇博客都记录一个框架。所以为了描述清楚,本篇博客可能略长(包含rxjava的简介,使用,背压,原理等),希望你们能认真的读完,收获肯定还是有的,也会采用大量的图来介绍,这样可以加深理解。也可以当一个工具博客,需要的使用的话随时查阅。
后续还会继续出背压和原理篇,敬请期待
什么是rxjava? 是一种事件驱动的基于异步数据流的编程模式,整个数据流就像一条河流,它可以被观测(监听),过滤,操控或者与其他数据流合并为一条新的数据流。
三要素
好了,因为秉持着要有图的思想,在介绍rxjava各个操作符的时候,会采用大量的图示来表示,图示来源于官方,这里先给大家介绍一下怎么看。 ok,进入到撸码环节
1.首先要在 build.gradle 文件中添加依赖
implementation 'io.reactivex.rxjava2:rxjava:2.1.4' implementation 'io.reactivex.rxjava2:rxandroid:2.0.2' 复制代码
2.依赖搭建完毕了,我们先写个最简单的案例,一共3步走
// 创建被观察者 Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("你好呀"); emitter.onNext("我爱中国"); emitter.onNext("祝愿祖国繁荣富强"); emitter.onComplete(); } }); 复制代码
// 创建观察者 Observer observer = new Observer<String>(){ @Override public void onSubscribe(Disposable d) { Log.i("lybj", "准备监听"); } @Override public void onNext(String s) { Log.i("lybj", s); } @Override public void onError(Throwable e) { Log.i("lybj", "error"); } @Override public void onComplete() { Log.i("lybj", "监听完毕"); } }; 复制代码
// 订阅 observable.subscribe(observer); 复制代码
这就完事了,看下结果
是不是很简单,几个概念再介绍一下
其实rxjava,打个比方,就类似花洒的头,数据流就类似水流,它的被观察者(observable)的各种操作符就是花洒的那个头,可以有各种模式,比如中间喷水的,周围喷水的,喷水雾的等等。根据操作符的不同,可以改变数据的各种样式,根据花洒头的不同,可以把水流改成各种样式。 接下来,就来学习下observable的丰富的操作符。
创建被观察者对象
// 创建被观察者 Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("你好呀"); emitter.onNext("我爱中国"); emitter.onNext("祝愿祖国繁荣富强"); emitter.onComplete(); } }).subscribe(new Observer<String>(){ // 关联观察者 @Override public void onSubscribe(Disposable d) { Log.i("lybj", "准备监听"); } @Override public void onNext(String s) { Log.i("lybj", s); } @Override public void onError(Throwable e) { Log.i("lybj", "error"); } @Override public void onComplete() { Log.i("lybj", "监听完毕"); } }); 复制代码
可以直接链式调用关联观察者
通过上面的图,应该很形象的说明了,主要作用就是创建一个被观察者,并发送事件,但是发送的事件不可以超过10个以上。
Observable.just("小明", "小红", "小兰").subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.i("lybj", "准备监听"); } @Override public void onNext(String s) { Log.i("lybj", s+"来了"); } @Override public void onError(Throwable e) { Log.i("lybj", "Error"); } @Override public void onComplete() { Log.i("lybj", "完毕"); } }); 复制代码
当到指定时间后就会发送一个 0 的值给观察者。 在项目中,可以做一些延时的处理,类似于Handler中的延时
Observable.timer(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.i("lybj", aLong+""); } }); 复制代码
延迟2秒后,将结果发送给观察者,Consumer和Observer是创建观察者的两种写法,相当于观察者中的onNext方法。
每隔一段时间就会发送一个事件,这个事件是从0开始,不断增1的数字。 类似于项目中的timer,做计时器
Observable.interval(3,TimeUnit.SECONDS).subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.i("lybj", aLong+""); } }); 复制代码
可以指定发送事件的开始值和数量,其他与 interval() 的功能一样。
Observable.intervalRange(100, 4, 0, 10, TimeUnit.SECONDS).subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.i("lybj", aLong+""); } }); 复制代码
参数依次是:开始值,循环执行的次数,初始延迟时间,执行间隔时间,时间单位
同时发送一定范围的事件序列。
Observable.range(0,10).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i("lybj", integer+""); } }); 复制代码
作用与 range() 一样,只是数据类型为 Long
Observable.rangeLong(0,10).subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.i("lybj", aLong+""); } }); 复制代码
private void empty_never_error(){ Observable.empty().subscribe(new Observer(){ @Override public void onSubscribe(Disposable d) { Log.i("lybj", "准备监听"); } @Override public void onNext(Object o) { Log.i("lybj", o+""); } @Override public void onError(Throwable e) { Log.i("lybj", "onError"); } @Override public void onComplete() { Log.i("lybj", "onComplete"); } }); 复制代码
如果是empty() 则:
如果是error() 则:
如果是never()则:
map 可以将被观察者发送的数据类型转变成其他的类型
Observable.just("中国", "祖国", "中国军人") .map(new Function<String, String>() { @Override public String apply(String s) throws Exception { return "我爱" + s; } }) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.i("lybj", s); } }); 复制代码
简单来讲,就是可以对发射过来的数据进行再加工,再传给观察者
这个方法可以将事件序列中的元素进行整合加工,返回一个新的被观察者。 flatMap() 其实与 map() 类似,但是 flatMap() 返回的是一个 Observerable,map()只是返回数据,如果在元素再加工的时候,想再使用上面的创建操作符的话,建议使用flatMap(),而非map()。
Observable.just("中国", "祖国", "中国军人", "贪官") .flatMap(new Function<String, ObservableSource<String>>() { @Override public ObservableSource<String> apply(String s) throws Exception { if(s.equals("贪官")){ return Observable.error(new Exception("贪官不能被喜欢")); } return Observable.just("我爱"+s); } }) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.i("lybj", s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.i("lybj", throwable.getMessage()); } }); 复制代码
new Consumer方法监听的是Observable.error()
concatMap() 和 flatMap() 基本上是一样的,只不过 concatMap() 转发出来的事件是有序的,而 flatMap() 是无序的。
Observable.just("中国", "祖国", "中国军人", "贪官") .concatMap(new Function<String, ObservableSource<String>>() { @Override public ObservableSource<String> apply(String s) throws Exception { if(s.equals("贪官")){ return Observable.error(new Exception("贪官不能被喜欢")); } return Observable.just("我爱"+s); } }) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.i("lybj", s); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.i("lybj", throwable.getMessage()); } }); 复制代码
从需要发送的事件当中获取一定数量的事件,并将这些事件放到缓冲区当中一并发出。
buffer 有两个参数,一个是 count,另一个 skip。count 缓冲区元素的数量,skip 就代表缓冲区满了之后,发送下一次事件序列的时候要跳过多少元素。
Observable.just("1", "2", "3", "4", "5") .buffer(2,1) .subscribe(new Consumer<List<String>>() { @Override public void accept(List<String> strings) throws Exception { Log.d("lybj", "缓冲区大小: " + strings.size()); for (String s : strings){ Log.d("lybj", s); } } }); 复制代码
将发射的数据通过一个函数进行变换,然后将变换后的结果作为参数跟下一个发射的数据一起继续通过那个函数变换,这样依次连续发射得到最终结果。
Observable.just(1, 2, 3, 4, 5) .scan(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { Log.i("lybj", "integer01: " + integer + " integer02: "+ integer2); return integer + integer2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i("lybj", "accept: " + integer); } }); 复制代码
简单来说,先将第一个元素返回给观察者,然后将1,2的和返给观察者,然后将上一次计算的和,当第一个元素,也就是3,第2个元素,是一直按顺序取值,取第3个元素也就是3,那么将,3+3 =6,返回给观察者,以此类推,将6作为第一个元素,第二个元素取值4,将6+4=10返回给观察者。
发送事件时,将这些事件分为按数量重新分组。window 中的 count 的参数就是代表指定的数量,例如将 count 指定为2,那么每发2个数据就会将这2个数据分成一组。
window与buffer区别:window是把数据分割成了Observable,buffer是把数据分割成List
Observable.just("鲁班", "孙尚香", "亚索","火女","盖伦") .window(2) .subscribe(new Consumer<Observable<String>>() { @Override public void accept(Observable<String> stringObservable) throws Exception { Log.i("lybj", "分组开始"); stringObservable.subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.i("lybj", s); } }); } }); 复制代码
可以将多个观察者组合在一起,然后按照之前发送顺序发送事件。需要注意的是,concat() 最多只可以发送4个事件。
private void concat(){ Observable.concat( Observable.just(1, 2, 3), Observable.just(4, 5), Observable.just(6, 7), Observable.just(8, 9)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i("lybj", integer+""); } }); } 复制代码
与 concat() 作用一样,不过 concatArray() 可以发送多于 4 个被观察者。
Observable.concatArray(Observable.just(1, 2, 3, 4), Observable.just(5, 6), Observable.just(7, 8, 9, 10), Observable.just(11, 12, 13), Observable.just(14, 15), Observable.just(16)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i("lybj", integer+""); } }); 复制代码
这个方法与 concat() 作用基本一样,但是 concat() 是串行发送事件,而 merge() 并行发送事件,也是只能发送4个。
Observable.merge(Observable.just(1, 2, 3, 4), Observable.just(5, 6), Observable.just(7, 8, 9, 10), Observable.just(11, 12, 13)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i("lybj", integer+""); } }); 复制代码
zip操作符用于将多个数据源合并,并生成一个新的数据源。新生成的数据源严格按照合并前的数据源的数据发射顺序,并且新数据源的数据个数等于合并前发射数据个数最少的那个数据源的数据个数。
Observable.zip(Observable.just(1, 2, 3), Observable.just("A", "B", "C", "D", "E"), new BiFunction<Integer, String, String>(){ @Override public String apply(Integer o1, String o2) throws Exception { return o1 +"_"+ o2; } }) .subscribe(new Consumer<String>() { @Override public void accept(String o) throws Exception { Log.i("lybj", o); } }); 复制代码
在发送事件之前追加事件,startWith() 追加一个事件,startWithArray() 可以追加多个事件。追加的事件会先发出。
Observable.just(1, 2, 3) .startWithArray(4, 5) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i("lybj", integer+""); } }); 复制代码
返回被观察者发送事件的数量。
Observable.just(2, 3, 4, 5, 6) .count() .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.i("lybj", "事件数量:" + aLong); } }); 复制代码
延迟一段事件发送事件。
Observable.just(1,2,3,4) .delay(3, TimeUnit.SECONDS) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i("lybj", integer+""); } }); 复制代码
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }).doOnEach(new Consumer<Notification<Integer>>() { @Override public void accept(Notification<Integer> integerNotification) throws Exception { Log.i("lybj", "doOnEach 方法执行了, 结果:"+ integerNotification.getValue()); } }).doOnNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i("lybj", "doOnNext 方法执行了, 结果:"+ integer); } }).doAfterNext(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i("lybj", "doAfterNext 方法执行了, 结果:"+ integer); } }).doOnComplete(new Action() { @Override public void run() throws Exception { Log.i("lybj", "doOnComplete 方法执行了"); } }).doOnError(new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.i("lybj", "doOnError 方法执行了"); } }).doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable disposable) throws Exception { Log.i("lybj", "doOnSubscribe 方法执行了"); } }).doOnDispose(new Action() { @Override public void run() throws Exception { Log.i("lybj", "doOnDispose 方法执行了"); } }).doOnTerminate(new Action() { @Override public void run() throws Exception { Log.i("lybj", "doOnTerminate 方法执行了"); } }).doAfterTerminate(new Action() { @Override public void run() throws Exception { Log.i("lybj", "doAfterTerminate 方法执行了"); } }).doFinally(new Action() { @Override public void run() throws Exception { Log.i("lybj", "doFinally 方法执行了"); } }).subscribe(new Observer<Integer>() { private Disposable disposable; @Override public void onSubscribe(Disposable d) { disposable = d; Log.i("lybj", "------观察者onSubscribe()执行"); } @Override public void onNext(Integer integer) { Log.i("lybj", "------观察者onNext()执行:"+integer); if(integer == 2){ // disposable.dispose(); // 取消订阅 } } @Override public void onError(Throwable e) { Log.i("lybj", "------观察者onError()执行"); } @Override public void onComplete() { Log.i("lybj", "------观察者onComplete()执行"); } }); 复制代码
当接受到一个 onError() 事件之后回调,返回的值会回调 onNext() 方法,并正常结束该事件序列。
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("小明:到"); emitter.onError(new IllegalStateException("error")); emitter.onNext("小方:到"); } }).onErrorReturn(new Function<Throwable, String>() { @Override public String apply(Throwable throwable) throws Exception { Log.i("lybj", "小红请假了"); return "小李:到"; } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { Log.i("lybj", s); } @Override public void onError(Throwable e) { Log.i("lybj", e.getMessage()); } @Override public void onComplete() { } }); 复制代码
当接收到 onError() 事件时,返回一个新的 Observable,并正常结束事件序列。
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("小明"); emitter.onNext("小方"); emitter.onNext("小红"); emitter.onError(new NullPointerException("error")); } }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends String>>() { @Override public ObservableSource<? extends String> apply(Throwable throwable) throws Exception { return Observable.just("1", "2", "3"); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.i("lybj", "准备监听"); } @Override public void onNext(String s) { Log.i("lybj", s); } @Override public void onError(Throwable e) { Log.i("lybj", e.getMessage()); } @Override public void onComplete() { Log.i("lybj", "onComplete"); } }); 复制代码
与 onErrorResumeNext() 作用基本一致,但是这个方法只能捕捉 Exception。
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("小明"); emitter.onNext("小方"); emitter.onNext("小红"); emitter.onError(new Error("error")); } }).onExceptionResumeNext(new Observable<String>() { @Override protected void subscribeActual(Observer observer) { observer.onNext("小张"); } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.i("lybj", "准备监听"); } @Override public void onNext(String s) { Log.i("lybj", s); } @Override public void onError(Throwable e) { Log.i("lybj", e.getMessage()); } @Override public void onComplete() { Log.i("lybj", "onComplete"); } }); 复制代码
如果出现错误事件,则会重新发送所有事件序列。times 是代表重新发的次数。
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("1"); emitter.onNext("2"); emitter.onError(new IllegalStateException()); } }).retry(2) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.i("lybj", "准备监听"); } @Override public void onNext(String s) { Log.i("lybj", s); } @Override public void onError(Throwable e) { Log.i("lybj", e.getMessage()); } @Override public void onComplete() { Log.i("lybj", "onComplete"); } }); 复制代码
出现错误事件之后,可以通过此方法判断是否继续发送事件。
Observable.create(new ObservableOnSubscribe<String>() { public void subscribe(@NonNull ObservableEmitter<String> emitter){ emitter.onNext("1"); emitter.onNext("2"); emitter.onNext("3"); emitter.onError(new NullPointerException("error")); emitter.onNext("4"); emitter.onNext("5"); } }).retryUntil(new BooleanSupplier() { @Override public boolean getAsBoolean() throws Exception { Log.i("lybj", "getAsBoolean"); return true; } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { Log.i("lybj", s); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); 复制代码
重复发送被观察者的事件,times 为发送次数。
Observable.just(1,2,3) .repeat(2) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i("lybj", integer+""); } }); 复制代码
subscribeOn(): 指定被观察者的线程,如果多次调用此方法,只有第一次有效。 observeOn(): 指定观察者的线程
Observable.create(new ObservableOnSubscribe<String>() { public void subscribe(@NonNull ObservableEmitter<String> emitter){ emitter.onNext("1"); Log.i("lybj", Thread.currentThread().getName()); } }).subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.i("lybj", s); Log.i("lybj", Thread.currentThread().getName()); } }); 复制代码
如果返回 true 则会发送事件,否则不会发送
Observable.just(1,2,3,4,5) .filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { if(integer > 4){ return true; } return false; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i("lybj", integer+""); } }); 复制代码
可以过滤不符合该类型事件
Observable.just(1, 2, 3, "小明", "小方") .ofType(String.class) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.i("lybj", s+""); } }); 复制代码
跳过正序某些事件,count 代表跳过事件的数量
Observable.just(1,2,3,4,5,6,7) .skip(2) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i("lybj", integer+""); } }); 复制代码
过滤事件序列中的重复事件。
Observable.just(1,2,3,1,4,1,2) .distinct() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i("lybj", integer+""); } }); 复制代码
过滤掉连续重复的事件
Observable.just(1,2,3,3,1,5,6) .distinctUntilChanged() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i("lybj", integer+""); } }); 复制代码
控制观察者接收的事件的数量。
Observable.just(1,2,3,4,5,6) .take(3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i("lybj", integer+""); } }); 复制代码
如果两件事件发送的时间间隔小于设定的时间间隔则前一件事件就不会发送给观察者。 简单来说就是防抖动,比如按钮控制快速点击等。
Observable.just(1,2,3,4,5) .map(new Function<Integer, Integer>() { @Override public Integer apply(Integer integer) throws Exception { Thread.sleep(900); return integer; } }) .debounce(1,TimeUnit.SECONDS) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i("lybj", integer+""); } }); 复制代码
firstElement(): 取事件序列的第一个元素。
lastElement(): 取事件序列的最后一个元素。
elementAt(): 以指定取出事件序列中事件,但是输入的 index 超出事件序列的总数的话就不会出现任何结果。
Observable.just(1,2,3,4) .firstElement() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i("lybj", integer+""); } }); 复制代码
判断事件序列是否全部满足某个事件,如果都满足则返回 true,反之则返回 false。
Observable.just(1, 2, 3, 4, 5) .all(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer <= 4; } }).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.i("lybj", aBoolean+""); } }); 复制代码
takeWhile(): 从左边开始,将满足条件的元素取出来,直到遇到第一个不满足条件的元素,则终止 takeUntil(): 从左边开始,将满足条件的元素取出来,直到遇到第一个满足条件的元素,则终止 filter(): 是将所有满足条件的数据都取出。
Observable.just(1, 2, 3, 4, 5) .takeWhile(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer < 3; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i("lybj", integer+""); } }); 复制代码
从左边开始,根据条件跳过元素
Observable.just(1,2,3,4,5,3,2,1,7) .skipWhile(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer < 3; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.i("lybj", integer+""); } }); 复制代码
isEmpty(): 判断事件序列是否为空。
defaultIfEmpty(): 如果观察者只发送一个 onComplete() 事件,则可以利用这个方法发送一个值。
Observable.create(new ObservableOnSubscribe<String>() { public void subscribe(@NonNull ObservableEmitter<String> emitter){ emitter.onComplete(); } }).isEmpty() .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.i("lybj", aBoolean+""); } }); 复制代码
判断事件序列中是否含有某个元素,如果有则返回 true,如果没有则返回 false。
在Observable.just(1,2,3,4,5,6) .contains(2) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.i("lybj", aBoolean+""); } }); 复制代码
判断两个 Observable 发送的事件是否相同。
Observable.sequenceEqual(Observable.just("小明", "小方", "小李"), Observable.just("小明", "小方", "小李", "小张")) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.i("lybj", aBoolean+""); } }); 复制代码