有可能很多人会问, LiveData
和 Rxjava
的区别是什么? 为何 Google 要在 Rxjava
很成熟的时候开发 LiveData
? 我想, LiveData
可以作为更好的 rxlifecycle
来使用。在使用 Rxjava
做数据流管理时,一个比较头疼的问题是,当数据回来时, Activty/Fragment
可能已经处于 onStop
的状态了,这个时候是不适合刷新 UI 的,很有可能触发 crash。 因此有人开发了 rxlifecycle
来解决这些问题,但是使用者必须继承于 RxActivity/RxFragment
或者实现接口 LifecycleProvider
。而 LiveData
则是官方给出的 lifecycle 友好型数据管理者。 它可以在 Rxjava 数据流 和 UI 刷新之间建立完美的沟通桥梁。当然 LiveData
还可以和 Room
配合使用,这个之后再说。
LiveData
本质也是一个观察者模式的应用,通过 setValue/postValue
来驱动 Observer
做出改变。 而 LiveData
又作为 lifecycle
的观察者,根据 lifecycle
的改变而表现出不同的行为。其核心方法就是 observe
了。
public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) { if (owner.getLifecycle().getCurrentState() == DESTROYED) { // ignore return; } // LifecycleBoundObserver 作为 lifecycle 的观察者。 LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer); // 加入 map 中 ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper); if (existing != null && !existing.isAttachedTo(owner)) { throw new IllegalArgumentException("Cannot add the same observer" + " with different lifecycles"); } if (existing != null) { return; } owner.getLifecycle().addObserver(wrapper); }
这里重点关注 LifecycleBoundObserver
, 它实现了 GenericLifecycleObserver
接口,因此在 lifecycle 改变时,会执行到 onStateChanged
方法
class LifecycleBoundObserver extends ObserverWrapper implements GenericLifecycleObserver { @NonNull final LifecycleOwner mOwner; LifecycleBoundObserver(@NonNull LifecycleOwner owner, Observer<T> observer) { super(observer); mOwner = owner; } @Override boolean shouldBeActive() { return mOwner.getLifecycle().getCurrentState().isAtLeast(STARTED); } @Override public void onStateChanged(LifecycleOwner source, Lifecycle.Event event) { // 如果 Activity/Fragment/... 已经处于 DESTROYED 状态,则移除 observer。这里的移除是从 lifecycle 里移除 LifecycleBoundObserver,以及从 LiveData 里移除 mObserver if (mOwner.getLifecycle().getCurrentState() == DESTROYED) { removeObserver(mObserver); return; } // 通知 activeStateChanged activeStateChanged(shouldBeActive()); } @Override boolean isAttachedTo(LifecycleOwner owner) { return mOwner == owner; } @Override void detachObserver() { mOwner.getLifecycle().removeObserver(this); } }
activeStateChanged
实现为:
void activeStateChanged(boolean newActive) { if (newActive == mActive) { return; } // immediately set active state, so we'd never dispatch anything to inactive // owner mActive = newActive; boolean wasInactive = LiveData.this.mActiveCount == 0; LiveData.this.mActiveCount += mActive ? 1 : -1; // 如果 active 的 Observer 数量从 0 变为 1,则执行钩子函数 onActive if (wasInactive && mActive) { onActive(); } // 如果 active 的 Observer 数量从 1 变为 0,则执行钩子函数 onInactive if (LiveData.this.mActiveCount == 0 && !mActive) { onInactive(); } if (mActive) { // 处于 active 状态, 则触发一次数据更新, 传参表示只处理这个 Observer 的派发 dispatchingValue(this); } }
我们可以通过 setValue
与 postValue
来更新数据, setValue
在主线程调用, postValue
在子线程调用,最终都会通过 dispatchingValue
来通知 Observer 更新数据。
private void dispatchingValue(@Nullable ObserverWrapper initiator) { if (mDispatchingValue) { // 重入问题,设置标志位,给上层处理 mDispatchInvalidated = true; return; } mDispatchingValue = true; do { mDispatchInvalidated = false; if (initiator != null) { // 如果 initiator 不为 null, 则标示只更新特定的 observer. considerNotify(initiator); initiator = null; } else { for (Iterator<Map.Entry<Observer<T>, ObserverWrapper>> iterator = mObservers.iteratorWithAdditions(); iterator.hasNext(); ) { considerNotify(iterator.next().getValue()); if (mDispatchInvalidated) { break; } } } } while (mDispatchInvalidated); mDispatchingValue = false; }
经过上一篇文章,见到 mDispatchingValue
和 mDispatchInvalidated
我们就应该想到这是解决重入问题的方式。 这个方法会为每一个 observer 调用 considerNotify
方法:
private void considerNotify(ObserverWrapper observer) { // 如果 Activity/Fragment 已经不是可见状态时,就直接返回,不更新 UI。这样很多因为更新 UI 时机不对的问题就不会发生了。 if (!observer.mActive) { return; } if (!observer.shouldBeActive()) { observer.activeStateChanged(false); return; } // 每次 setValue 和 postValue 都会更新版本号,每个 observer 也记录上次更新的版本号,这样通过对比,可以避免布标要的更新 if (observer.mLastVersion >= mVersion) { return; } observer.mLastVersion = mVersion; //通知 observer 更新数据或 UI observer.mObserver.onChanged((T) mData); }
LiveData
的主要逻辑就是这些, 它也是学习运用 lifecycle
的一个很好的例子。除此之外, LiveData
提供了一些辅助类,帮助我们更好的开发。
MediatorLiveData
的作用和 Rxjava
中 merge
操作符有点像。 都是合并几个流到一个流中。使用方式如下:
val liveData1: LiveData<Integer> = ...; val liveData2: LiveData<Integer> = ...; val liveDataMerger = new MediatorLiveData<>(); liveDataMerger.addSource(liveData1){value -> liveDataMerger.setValue(value)}; liveDataMerger.addSource(liveData2){value -> liveDataMerger.setValue(value)}; liveDataMerger.observe(lifecycleOwner){ // onChange }
简单看下它的源码:
public <S> void addSource(@NonNull LiveData<S> source, @NonNull Observer<S> onChanged) { // Source 是将 onChanged 包裹了一下,决定何时开始 observe 与何时取消 observe。 // 因为 source 处于数据上游,在没有下游的情况下,是没必要触发 onChanged 的 Source<S> e = new Source<>(source, onChanged); Source<?> existing = mSources.putIfAbsent(source, e); if (existing != null && existing.mObserver != onChanged) { throw new IllegalArgumentException( "This source was already added with the different observer"); } if (existing != null) { return; } // 在有 active 的 Observer之后,才开始监听 source 的 changed。 // 有了下游,上游的数据才有往下传递的必要。 if (hasActiveObservers()) { e.plug(); } } // removeSource 时,取消上游的订阅 public <S> void removeSource(@NonNull LiveData<S> toRemote) { Source<?> source = mSources.remove(toRemote); if (source != null) { source.unplug(); } } @CallSuper @Override protected void onActive() { // onActive 就代表有下游产生,这个时候才连接上游 source for (Map.Entry<LiveData<?>, Source<?>> source : mSources) { source.getValue().plug(); } } @CallSuper @Override protected void onInactive() { // onInactive 就代表没有下游了,这个时候就不需要接收上游的数据改动了。 for (Map.Entry<LiveData<?>, Source<?>> source : mSources) { source.getValue().unplug(); } }
ComputableLiveData
是一个还没有对外开放的类,不过了解它还是有必要的,因为在 Room
里就会用到这个类。
ComputableLiveData
是个抽象类,需要子类实现 compute
方法。 而 compute
方法会在什么时机被执行呢?
ComputableLiveData
的 onActive
被调用时,即有观察者出现时。 invalidate()
方法时。
invalidate()
会触发 mInvalidationRunnable
在主线程里执行:
final Runnable mInvalidationRunnable = new Runnable() { @MainThread @Override public void run() { boolean isActive = mLiveData.hasActiveObservers(); // mInvalid 会被多线程访问与赋值,因此做了原子性访问控制 // 只有从 false 变为 true, 才需要触发 refresh // 如果原本就是 true,代表原本处于 mInvalid 状态,要么是已经在 compute, 要么就是还没有 observer. if (mInvalid.compareAndSet(false, true)) { if (isActive) { mExecutor.execute(mRefreshRunnable); } } } };
具体触发 compute 和 数据更新的操作是在 mRefreshRunnable
中实现的。 代码虽然不多,但其实挺不好读的,因为涉及到了多线程访问, 多次 invalidate 会触发多次 refresh, 如果处理不当,可能会出现数据乱序更新,这里强烈推荐 Advanced RxJava
的前两篇文章,对于并发下的思维模式有很大帮助。
final Runnable mRefreshRunnable = new Runnable() { @WorkerThread @Override public void run() { boolean computed; do { computed = false; if (mComputing.compareAndSet(false, true)) { // (1) try { T value = null; while (mInvalid.compareAndSet(true, false)) { // (2) computed = true; // (3) value = compute(); } if (computed) { // (4) mLiveData.postValue(value); } } finally { mComputing.set(false); // (5) } } } while (computed && mInvalid.get()); // (6) } };
compute()
LiveData
还默认提供了两个工具方法: map
与 switchMap
。 比较简单,但是很有用。可以算作是 MediatorLiveData 的应用吧
// 数据转换, 从 X 转换为 Y public static <X, Y> LiveData<Y> map(@NonNull LiveData<X> source, @NonNull final Function<X, Y> func) { final MediatorLiveData<Y> result = new MediatorLiveData<>(); result.addSource(source, new Observer<X>() { @Override public void onChanged(@Nullable X x) { result.setValue(func.apply(x)); } }); return result; } // 根据 trigger 和 func 生成新的 LiveData<Y>, 下游将监听 LiveData<Y>。类似于 rxjava 的 flatMap。 public static <X, Y> LiveData<Y> switchMap(@NonNull LiveData<X> trigger, @NonNull final Function<X, LiveData<Y>> func) { final MediatorLiveData<Y> result = new MediatorLiveData<>(); result.addSource(trigger, new Observer<X>() { LiveData<Y> mSource; @Override public void onChanged(@Nullable X x) { LiveData<Y> newLiveData = func.apply(x); if (mSource == newLiveData) { return; } if (mSource != null) { result.removeSource(mSource); } mSource = newLiveData; if (mSource != null) { result.addSource(mSource, new Observer<Y>() { @Override public void onChanged(@Nullable Y y) { result.setValue(y); } }); } } }); return result; }
LiveData
源码总体比较简单,使用也比较容易,但功能确实实用。