RxJava是ReactiveX在JVM上的一个实现,使用可观察序列来编写异步和基于事件的程序的库。它扩展了观察者模式以支持数据/事件序列,并添加了允许您以声明方式组合序列的运算符,同时抽象出对低级线程,同步,线程安全和并发数据结构等问题的关注。
观察者模式也被称为发布-订阅(Publish/Subscribe)模式,它属于行为型模式的一种。观察者模式定义了一种一对多的依赖关系,一个主题对象可被多个观察者对象同时监听。当这个主题对象状态变化时,会通知所有观察者对象并作出相应处理逻辑。
1.创建抽象被观察者(Subject):
public interface Star { /** * 添加粉丝 */ void addFan(Fan fan); /** * 取消粉丝 */ void removeFan(Fan fan); /** * 分享动态 */ void notifyFan(String message); } 复制代码
2.创建抽象观察者(Observer)
public interface Fan { /** * 更新动态 */ void update(String message); } 复制代码
3.创建具体被观察者(Concrete Subject 具体明星)
public class AStar implements Star{ private List<Fan> fanList = null; public AStar(){ fanList = new ArrayList<Fan>(); } @Override public void addFan(Fan fan){ fanList.add(fan); } @Override public void removeFan(Fan fan){ fanList.remove(fan); } @Override public void notifyFan(String message){ for(Fan fan : fanList){ fan.update("AStar 发布了 ** 信息"); } } } 复制代码
4.创建具体观察者(Concrere Observer 具体粉丝)
public class AFan implements Fan{ private String fanName; public AFan(String fanName){ this.fanName = fanName; } @Override public void update(String message){ Log.d("AFan 收到了 AStar 发布的消息"); } } 复制代码
Observable(被观察者),Observer(观察者),subscribe(订阅)。
Observable 是一个抽象类,实现了ObservableSource抽象接口。
public abstract class Observable<T> implements ObservableSource<T> { ...... } 复制代码
ObservableSource中subscribe()用来订阅观察者,所以ObservableSource相当于抽象被观察者。
public interface ObservableSource<T> { /** * Subscribes the given Observer to this ObservableSource instance. * @param observer the Observer, not null * @throws NullPointerException if {@code observer} is null */ void subscribe(@NonNull Observer<? super T> observer); } 复制代码
通过ObservableSource的subscribe()方法可知抽象观察者为里面的参数对象Observer。
public interface Observer<T> { /** * Provides the Observer with the means of cancelling (disposing) the * connection (channel) with the Observable in both * synchronous (from within {@link #onNext(Object)}) and asynchronous manner. * @param d the Disposable instance whose {@link Disposable#dispose()} can * be called anytime to cancel the connection * @since 2.0 */ void onSubscribe(@NonNull Disposable d); /** * Provides the Observer with a new item to observe. * <p> * The {@link Observable} may call this method 0 or more times. * <p> * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or * {@link #onError}. * * @param t * the item emitted by the Observable */ void onNext(@NonNull T t); /** * Notifies the Observer that the {@link Observable} has experienced an error condition. * <p> * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or * {@link #onComplete}. * * @param e * the exception encountered by the Observable */ void onError(@NonNull Throwable e); /** * Notifies the Observer that the {@link Observable} has finished sending push-based notifications. * <p> * The {@link Observable} will not call this method if it calls {@link #onError}. */ void onComplete(); } 复制代码
/** * 创建Observable */ Observable observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onNext("Hello"); } }); /** * Provides an API (via a cold Observable) that bridges the reactive world with the callback-style world. * @param <T> the element type * @param source the emitter that is called when an Observer subscribes to the returned {@code Observable} * @return the new Observable instance * @see ObservableOnSubscribe * @see ObservableEmitter * @see Cancellable */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } 复制代码
通过create方法源码可知ObservableCreate为具体被观察者。
Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }; 复制代码
上述实现observer接口的observer为具体观察者。
Rxjava订阅实现
observable.subscribe(observer); 复制代码
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } } 复制代码