转载

使用RxJava实现缓存

在这篇文章中将使用reactiveX创建一个缓存。RxJava / Kotlin没有本机缓存实现。因此,我必须使用RxJava和Kotlin为单个元素创建自己的缓存(可扩展为Observable,Maybe等)。

主要思想是在一定时间内返回相同的元素。在那之后,我们将不得不从头开始执行所有操作。

此缓存有三个组件: SingleTransformer , SingleSource 和 Consumer 。

消费者是一个简单的回调。它会接受我们想要的任何东西,因为它有一个void accept(T t)方法。当Observer执行onSuccess方法时执行此方法。

SingleSource是管道将订阅的源。它有一个void subscribe(SingleObserver <T>observer)方法,其中SingleObserver只发出一个元素。当有人订阅管道时执行此方法。

SingleTransformer对于组合单曲很有用。它有一个SingleSource<Downstream> apply(Single<Upstream> upstream) 方法,其中上游是传入元素,下游是传出元素。创建管道时将执行此方法。所以,它必须只执行一次。否则,将始终创建LastElementSeen,并且缓存将不起作用。

首先,我们可以创建我们的消费者:

<b>class</b> LastElementSeen<T>(<b>private</b> val timeout: Long, <b>private</b> val unit: TimeUnit) : Consumer<T> {
<b>private</b> <b>var</b> lastEmissionTimestamp: Long = 0
    <b>var</b> value: T? = <b>null</b>
override fun accept(latest: T) {
        lastEmissionTimestamp = DateTime.now().millis
        value = latest
    }
<font><i>// I used JodaTime: https://www.joda.org/joda-time/ for this piece of code.</i></font><font>
    fun isValid(): Boolean {
        <b>return</b> value?.let { DateTime.now().minus(lastEmissionTimestamp).isBefore(unit.toMillis(timeout)) } ?: false
    }
}
</font>

该元素具有我们想要的任何对象,并且在接受时具有最后的发射时间戳。我们可以调用LastElementSeen,因为它只代表那个。此外,此对象还有责任是否仍然有效或已过期。

下一步是创建一个自定义SingleSource的实现来决定元素是否有效,我们必须发出相同的元素,否则我们必须再次订阅。

<b>class</b> LastElementSeenSingle<T>(<b>private</b> <b>var</b> upstream: Single<T>, <b>private</b> val lastElementSeen: LastElementSeen<T>) : SingleSource<T> {
override fun subscribe(observer: SingleObserver<in T>) {
        <b>if</b> (lastElementSeen.isValid()) {
            lastElementSeen.value?.let(observer::onSuccess)
        } <b>else</b> {
            upstream.subscribe(observer)
        }
    }
}

最后一步是创建我们的SingleTransformer:

<b>class</b> SingleRxCache<T>(<b>private</b> val timeout: Long, <b>private</b> val unit: TimeUnit) : SingleTransformer<T, T> {
override fun apply(upstream: Single<T>): SingleSource<T> {
        val lastElementSeen = LastElementSeen<T>(timeout, unit)
        <b>return</b> LastElementSeenSingle(upstream.doOnSuccess(lastElementSeen), lastElementSeen)
    }
}

我们必须创建LastElementSeen和LastElementSeenSingle并使用upstream.doOnSuccess(lastElementSeen)方法从流中发出元素。

所以,现在我们有了反应式缓存。我们来试试吧!快速测试:

@Test
    fun reactiveCacheTest() {
  val single = Single.create<Long> { emitter ->
            println(<font>"Creating single"</font><font>)
            emitter.onSuccess(counter++)
        }.map {
            println(</font><font>"Map"</font><font>)
            it
        }.compose(SingleRxCache<Long>(3, TimeUnit.SECONDS))
<b>for</b> (i in 1..5) {
            single.subscribe { value -> println(value) }
            Thread.sleep(1000)
        }
    }
</font>

输出:

Creating single
Map
1
1
1
Creating single
Map
2
2

在 gist中 看到完整的代码。

原文  https://www.jdon.com/51328
正文到此结束
Loading...