处理完相应的操作,将结果通知被观察者
Observable observable = Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext("are"); emitter.onNext("you"); emitter.onNext("ok"); emitter.onComplete(); } }); 复制代码
接收被观察者的通知
Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "); // d.dispose(); //取消订阅 } @Override public void onNext(String s) { Log.d(TAG, "onNext: "+s); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "); } @Override public void onComplete() { Log.d(TAG, "onComplete: "); } }; observable.subscribe(observer); 复制代码
调度程序执行的线程
调度器类型 | 效果 |
---|---|
Schedulers.computation( ) | 用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量 |
Schedulers.from(executor) | 使用指定的Executor作为调度器 |
Schedulers.io( ) | 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器 |
Schedulers.newThread( ) | 为每个任务创建一个新线程 |
Schedulers.trampoline( ) | 当其它排队的任务完成后,在当前线程排队开始执行 |
AndroidSchedulers.mainThread() | Android的主线程 |
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("are"); emitter.onNext("you"); emitter.onNext("ok"); emitter.onComplete(); } }).observeOn(AndroidSchedulers.mainThread()) .subscribeOn(Schedulers.io()) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: "); // d.dispose(); //取消订阅 } @Override public void onNext(String s) { Log.d(TAG, "onNext: "+s); } @Override public void onError(Throwable e) { Log.d(TAG, "onError: "); } @Override public void onComplete() { Log.d(TAG, "onComplete: "); } }); 复制代码
创建事件序列的方法 create interval defer emptyNeverError repeat timer from just range 变换操作 mapCast flatMap2contactMap flatMapExample flatMapIterable buffer groupBy scan window 过滤操作/条件操作符 filter element distinct skip take ignoreElements debounce ofType all contains isEmpty defaultIfEmpty amb 组合操作 concat merge startWith zip combineLast reduce collect count 功能操作符/辅助操作 delay doSeries retry subscribeOn observeOn RxKotlin扩展库 rkExExample 额外其他 compose 复制代码