框架的核心思想,就是消息的发布和订阅,使用订阅者模式实现,其原理图大概如下所示,摘自网络。
发布和订阅之间的依赖关系,其原理图大概如下所示,摘自网络。
订阅/发布模式和观察者模式之间有着微弱的区别,个人觉得订阅/发布模式是观察者模式的一种增强版。两者区别如下所示,摘自网络。
为何使用liveData
该liveDataBus优势
为了方便理解,LiveDataBus原理图如下所示
订阅和注册的流程图
订阅注册原理图
为何用LiveDataBus替代EventBus和RxBus
我这里先用最简单的代码实现liveDataBus,然后用一下,看一下会出现什么问题,代码如下所示:
public final class LiveDataBus1 { private final Map<String, MutableLiveData<Object>> bus; private LiveDataBus1() { bus = new HashMap<>(); } private static class SingletonHolder { private static final LiveDataBus1 DATA_BUS = new LiveDataBus1(); } public static LiveDataBus1 get() { return SingletonHolder.DATA_BUS; } public <T> MutableLiveData<T> getChannel(String target, Class<T> type) { if (!bus.containsKey(target)) { bus.put(target, new MutableLiveData<>()); } return (MutableLiveData<T>) bus.get(target); } public MutableLiveData<Object> getChannel(String target) { return getChannel(target, Object.class); } }
那么如何发送消息和接收消息呢,注意两者的key需要保持一致,否则无法接收?具体代码如下所示:
//发送消息 LiveDataBus1.get().getChannel("yc_bus").setValue(text); //接收消息 LiveDataBus1.get().getChannel("yc_bus", String.class) .observe(this, new Observer<String>() { @Override public void onChanged(@Nullable String newText) { // 更新数据 tvText.setText(newText); } });
遇到的问题:
然后看一下LiveData的订阅方法observe源码
// 注释只能在主线程中调用该方法 @MainThread public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) { // 当前绑定的组件(activity or fragment)状态为DESTROYED的时候, 则会忽视当前的订阅请求 if (owner.getLifecycle().getCurrentState() == DESTROYED) { // ignore return; } // 转为带生命周期感知的观察者包装类 LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer); ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper); // 对应观察者只能与一个owner绑定,否则抛出异常 if (existing != null && !existing.isAttachedTo(owner)) { throw new IllegalArgumentException("Cannot add the same observer" + " with different lifecycles"); } if (existing != null) { return; } // lifecycle注册 owner.getLifecycle().addObserver(wrapper); }
紧接着,来看一下LiveData的更新数据方法
@MainThread protected void setValue(T value) { assertMainThread("setValue"); // 这里的 mVersion,它本问题关键,每次更新数据都会自增,默认值是 -1。 mVersion++; mData = value; dispatchingValue(null); }
private void dispatchingValue(@Nullable ObserverWrapper initiator) { // mDispatchingValue的判断主要是为了解决并发调用dispatchingValue的情况 // 当对应数据的观察者在执行的过程中, 如有新的数据变更, 则不会再次通知到观察者。所以观察者内的执行不应进行耗时工作 if (mDispatchingValue) { mDispatchInvalidated = true; return; } mDispatchingValue = true; do { mDispatchInvalidated = false; if (initiator != null) { // 等下重点看这里的代码 considerNotify(initiator); initiator = null; } else { for (Iterator<Map.Entry<Observer<T>, ObserverWrapper>> iterator = mObservers.iteratorWithAdditions(); iterator.hasNext(); ) { // 等下重点看这里的代码 considerNotify(iterator.next().getValue()); if (mDispatchInvalidated) { break; } } } } while (mDispatchInvalidated); mDispatchingValue = false; }
private void considerNotify(ObserverWrapper observer) { if (!observer.mActive) { return; } // 检查最新的状态b4调度。也许它改变了状态,但我们还没有得到事件。 // 我们还是先检查观察者。活动,以保持它作为活动的入口。 // 因此,即使观察者移动到一个活动状态,如果我们没有收到那个事件,我们最好不要通知一个更可预测的通知顺序。 if (!observer.shouldBeActive()) { observer.activeStateChanged(false); return; } if (observer.mLastVersion >= mVersion) { return; } observer.mLastVersion = mVersion; //noinspection unchecked observer.mObserver.onChanged((T) mData); }
为何订阅者会马上收到订阅之前发布的最新消息?
首先看一下postValue源代码,如下所示:
protected void postValue(T value) { boolean postTask; synchronized (mDataLock) { postTask = mPendingData == NOT_SET; mPendingData = value; } if (!postTask) { return; } ArchTaskExecutor.getInstance().postToMainThread(mPostValueRunnable); } private final Runnable mPostValueRunnable = new Runnable() { @Override public void run() { Object newValue; synchronized (mDataLock) { newValue = mPendingData; mPendingData = NOT_SET; } //noinspection unchecked setValue((T) newValue); } };
能不能从Map容器mObservers中取到LifecycleBoundObserver,然后再更改version呢?答案是肯定的,通过查看SafeIterableMap的源码我们发现有一个protected的get方法。因此,在调用observe的时候,我们可以通过反射拿到LifecycleBoundObserver,再把LifecycleBoundObserver的version设置成和LiveData一致即可。
@Override public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) { super.observe(owner, observer); hook(observer); } private void hook(@NonNull Observer<T> observer) { try { Class<LiveData> classLiveData = LiveData.class; Field fieldObservers = classLiveData.getDeclaredField("mObservers"); fieldObservers.setAccessible(true); Object objectObservers = fieldObservers.get(this); Class<?> classObservers = objectObservers.getClass(); Method methodGet = classObservers.getDeclaredMethod("get", Object.class); methodGet.setAccessible(true); Object objectWrapperEntry = methodGet.invoke(objectObservers, observer); Object objectWrapper = null; if (objectWrapperEntry instanceof Map.Entry) { objectWrapper = ((Map.Entry) objectWrapperEntry).getValue(); } if (objectWrapper != null) { Class<?> classObserverWrapper = objectWrapper.getClass().getSuperclass(); Field fieldLastVersion = null; if (classObserverWrapper != null) { fieldLastVersion = classObserverWrapper.getDeclaredField("mLastVersion"); fieldLastVersion.setAccessible(true); Field fieldVersion = classLiveData.getDeclaredField("mVersion"); fieldVersion.setAccessible(true); Object objectVersion = fieldVersion.get(this); fieldLastVersion.set(objectWrapper, objectVersion); } } } catch (Exception e){ e.printStackTrace(); } }
同时还需要注意,在实现MutableLiveData<T>自定义类BusMutableLiveData中,需要重写这几个方法。代码如下所示:
/** * 在给定的观察者的生命周期内将给定的观察者添加到观察者列表所有者。 * 事件是在主线程上分派的。如果LiveData已经有数据集合,它将被传递给观察者。 * @param owner owner
*/ public void observeSticky(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) { super.observe(owner, observer); } /** * 将给定的观察者添加到观察者列表中。这个调用类似于{@link LiveData#observe(LifecycleOwner, Observer)} * 和一个LifecycleOwner, which总是积极的。这意味着给定的观察者将接收所有事件,并且永远不会 被自动删除。 * 您应该手动调用{@link #removeObserver(Observer)}来停止 观察这LiveData。 * @param observer observer */ public void observeStickyForever(@NonNull Observer<T> observer) { super.observeForever(observer); } ```
首先看看MutableLiveData源代码,如下所示,这里重点展示测试数据案例
public void postValue(T value) { super.postValue(value); }
然后使用for循环,使用postValue发送100条消息事件,代码如下所示:
public void postValueCountTest() { sendCount = 100; receiveCount = 0; ExecutorService threadPool = Executors.newFixedThreadPool(2); for (int i = 0; i < sendCount; i++) { threadPool.execute(new Runnable() { @Override public void run() { LiveDataBus2.get().getChannel(Constant.LIVE_BUS3).postValue("test_1_data"+sendCount); } }); } new Handler().postDelayed(new Runnable() { @Override public void run() { BusLogUtils.d("sendCount: " + sendCount + " | receiveCount: " + receiveCount); Toast.makeText(ThirdActivity4.this, "sendCount: " + sendCount + " | receiveCount: " + receiveCount, Toast.LENGTH_LONG).show(); } }, 1000); } //接收消息 LiveDataBus2.get() .getChannel(Constant.LIVE_BUS3, String.class) .observe(this, new Observer<String>() { @Override public void onChanged(@Nullable String s) { receiveCount++; BusLogUtils.d("接收消息--ThirdActivity4------yc_bus---1-"+s+"----"+receiveCount); } });
然后看一下打印日志,是不是发现了什么问题?发现根本没有100条数据……
2020-03-03 10:25:51.397 4745-4745/com.ycbjie.yclivedatabus D/BusLogUtils: 接收消息--ThirdActivity4------yc_bus---1-test_1_data100----1 2020-03-03 10:25:51.397 4745-4745/com.ycbjie.yclivedatabus D/BusLogUtils: 接收消息--ThirdActivity4------yc_bus---1-test_1_data100----2 2020-03-03 10:25:51.397 4745-4745/com.ycbjie.yclivedatabus D/BusLogUtils: 接收消息--ThirdActivity4------yc_bus---1-test_1_data100----3 2020-03-03 10:25:51.403 4745-4745/com.ycbjie.yclivedatabus D/BusLogUtils: 接收消息--ThirdActivity4------yc_bus---1-test_1_data100----4
既然post是在子线程中发送消息事件,那么可不可以使用handler将它放到主线程中处理事件了,是可以的,代码如下所示
/** * 子线程发送事件
*/ @Override public void postValue(T value) { //注意,去掉super方法, //super.postValue(value); mainHandler.post(new PostValueTask(value)); } private BusWeakHandler mainHandler = new BusWeakHandler(Looper.getMainLooper()); private class PostValueTask implements Runnable { private T newValue; public PostValueTask(@NonNull T newValue) { this.newValue = newValue; } @Override public void run() { setValue(newValue); } } ```
可以知道,通过postValue可以在子线程发送消息,那么发送延迟消息也十分简单,代码如下所示:
/** * 子线程发送事件
*/ @Override public void postValue(T value) { //注意,去掉super方法, //super.postValue(value); mainHandler.post(new PostValueTask(value)); } /** * 发送延迟事件 * @param value value * @param delay 延迟时间 */ @Override public void postValueDelay(T value, long delay) { mainHandler.postDelayed(new PostValueTask(value) , delay); //mainHandler.postAtTime(new PostValueTask(value) , delay); } ```
测试用例,延迟5秒钟发送事件,代码如下所示。具体可以看demo钟的案例!
LiveDataBus.get().with(Constant.LIVE_BUS4).postValueDelay("test_4_data",5000);
轮训延迟事件,比如有的页面需要实现,每间隔5秒钟就刷新一次页面数据,常常用于活动页面。在购物商城这类需求很常见
@Override public void postValueInterval(final T value, final long interval) { mainHandler.postDelayed(new Runnable() { @Override public void run() { setValue(value); mainHandler.postDelayed(this,interval); } },interval); }
测试用例,轮训延迟3秒钟发送事件,代码如下所示。具体可以看demo钟的案例!
LiveDataBus.get().with(Constant.LIVE_BUS5).postValueInterval("test_5_data",3000);
这里遇到了一个问题,假如有多个页面有这种轮训发送事件的需求,显然这个是实现不了的。那么可不可以把每个轮训runnable记录一个名称区别开来代码更更改如下
/** * 发送延迟事件,间隔轮训 * @param value value
*/ @Deprecated @Override public void postValueInterval(final T value, final long interval,@NonNull String taskName) { if(taskName.isEmpty()){ return; } IntervalValueTask intervalTask = new IntervalValueTask(value,interval); intervalTasks.put(taskName,intervalTask); mainHandler.postDelayed(intervalTask,interval); } private class IntervalValueTask implements Runnable { private T newValue; private long interval; public IntervalValueTask(T newValue, long interval) { this.newValue = newValue; this.interval = interval; } @Override public void run() { setValue(newValue); mainHandler.postDelayed(this,interval); } } ```
轮训总不可以一直持续下去吧,这个时候可以添加一个手动关闭轮训的方法。代码如下所示:
/**
*/ @Deprecated @Override public void stopPostInterval(@NonNull String taskName) { IntervalValueTask intervalTask = intervalTasks.get(taskName); if(intervalTask!= null){ //移除callback mainHandler.removeCallbacks(intervalTask); intervalTasks.remove(taskName); } } ```
代码如下所示
public class SafeCastObserver<T> implements Observer<T> { @NonNull private final Observer<T> observer; public SafeCastObserver(@NonNull Observer<T> observer) { this.observer = observer; } @Override public void onChanged(@Nullable T t) { //捕获异常,避免出现异常之后,收不到后续的消息事件 try { //注意为了避免转换出现的异常,try-catch捕获 observer.onChanged(t); } catch (ClassCastException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } } }
生命周期感知能力就是当在Android平台的LifecycleOwner(如Activity)中使用的时候,只需要订阅消息,而不需要取消订阅消息。LifecycleOwner的生命周期结束的时候,会自动取消订阅。这带来了两个好处: