Android LiveData - 如何限制结果
Android LiveData - how to throttle results
我有一个 liveData 设置,它发出一些整数。但我想限制排放量,使其每 5 秒才发生一次。但在前 5 秒过去后,可以取消限制。
我在这里观察到这样的计数:
count1.observe(this, new Observer() {
@Override public void onChanged(@Nullable Integer i) {
//Do something with "integer"
}
});
但我希望限制 onChanged() 回调,使其在前 5 秒内最多调用 2 次。前 5 秒过去后,我不再需要节流了。我不确定如何执行此操作,因为我没有看到任何用于实时数据处理流的选项。我尝试了 Transformation,但它只有 map 和 switchMap。如何才能做到这一点 ? enter code here
在您的情况下,您应该使用 rxjava 的间隔运算符。
Interval Operator create an Observable that emits a sequence of integers spaced by a given time interval. It is used when we want to do a task again and again after some interval.
示例:
val disposable =
Observable.interval(0, 2, TimeUnit.SECONDS)
.flatMap {
return@flatMap Observable.create<String> { emitter ->
Log.d("IntervalExample", "Create")
emitter.onNext("MindOrks")
emitter.onComplete()
}
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
Log.d("IntervalExample", it)
}
compositeDisposable.add(disposable)
此处任务会间隔2秒重复执行
需要注意的一件事:它将永远持续下去。
如何停止?
有两种停止方式。以下是两种停止方法。
使用 compositeDisposable.dispose()
使用如下所示的 take(n)
运算符
Observable.interval(0, 2, TimeUnit.SECONDS).take(5).flatMap {
return@flatMap Observable.create<String> { emitter ->
Log.d("IntervalExample", "Create")
emitter.onNext("MindOrks")
emitter.onComplete()
}
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
Log.d("IntervalExample", it)
}
由于我们在take(5)
中传递了5作为参数,任务只会执行5次,间隔为2秒。
这是使用 Handler
节流的 MediatorLiveData
实现(在 Kotlin 中,因为 Java 有很多样板文件)。
如果源实时数据更新过于频繁,结果发布将会延迟:
import android.os.Handler
import android.os.Looper
import androidx.lifecycle.LiveData
import androidx.lifecycle.MediatorLiveData
/**
* LiveData throttling value emissions so they don't happen more often than [delayMs].
*/
class ThrottledLiveData<T>(source: LiveData<T>, delayMs: Long) : MediatorLiveData<T>() {
val handler = Handler(Looper.getMainLooper())
var delayMs = delayMs
private set
private var isValueDelayed = false
private var delayedValue: T? = null
private var delayRunnable: Runnable? = null
set(value) {
field?.let { handler.removeCallbacks(it) }
value?.let { handler.postDelayed(it, delayMs) }
field = value
}
private val objDelayRunnable = Runnable { if (consumeDelayedValue()) startDelay() }
init {
addSource(source) { newValue ->
if (delayRunnable == null) {
value = newValue
startDelay()
} else {
isValueDelayed = true
delayedValue = newValue
}
}
}
/** Start throttling or modify the delay. If [newDelay] is `0` (default) reuse previous delay value. */
fun startThrottling(newDelay: Long = 0L) {
require(newDelay >= 0L)
when {
newDelay > 0 -> delayMs = newDelay
delayMs < 0 -> delayMs *= -1
delayMs > 0 -> return
else -> throw IllegalArgumentException("newDelay cannot be zero if old delayMs is zero")
}
}
/** Stop throttling, if [immediate] emit any pending value now. */
fun stopThrottling(immediate: Boolean = false) {
if (delayMs <= 0) return
delayMs *= -1
if (immediate) consumeDelayedValue()
}
override fun onInactive() {
super.onInactive()
consumeDelayedValue()
}
// start counting the delay or clear it if conditions are not met
private fun startDelay() {
delayRunnable = if (delayMs > 0 && hasActiveObservers()) objDelayRunnable else null
}
private fun consumeDelayedValue(): Boolean {
delayRunnable = null
return if (isValueDelayed) {
value = delayedValue
delayedValue = null
isValueDelayed = false
true
} else false
}
}
通过将源实时数据作为第一个参数传递并观察它来使用它:
val throttledCount = ThrottledLiveData(count1, 2500L) // maximum of one update per 2.5 sec
throttledCount.observe(this, Observer { i: Int ->
//Do something with "integer"
})
如果你想在 5 秒后停止节流,只需 post 一个延迟的可运行程序将禁用它:
val disableThrottle = Runnable { throttledCount.stopThrottling() }
throttledCount.handler.postDelayed(disableThrottle, 5000L)
我有一个 liveData 设置,它发出一些整数。但我想限制排放量,使其每 5 秒才发生一次。但在前 5 秒过去后,可以取消限制。
我在这里观察到这样的计数:
count1.observe(this, new Observer() {
@Override public void onChanged(@Nullable Integer i) {
//Do something with "integer"
}
});
但我希望限制 onChanged() 回调,使其在前 5 秒内最多调用 2 次。前 5 秒过去后,我不再需要节流了。我不确定如何执行此操作,因为我没有看到任何用于实时数据处理流的选项。我尝试了 Transformation,但它只有 map 和 switchMap。如何才能做到这一点 ? enter code here
在您的情况下,您应该使用 rxjava 的间隔运算符。
Interval Operator create an Observable that emits a sequence of integers spaced by a given time interval. It is used when we want to do a task again and again after some interval.
示例:
val disposable =
Observable.interval(0, 2, TimeUnit.SECONDS)
.flatMap {
return@flatMap Observable.create<String> { emitter ->
Log.d("IntervalExample", "Create")
emitter.onNext("MindOrks")
emitter.onComplete()
}
}
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
Log.d("IntervalExample", it)
}
compositeDisposable.add(disposable)
此处任务会间隔2秒重复执行
需要注意的一件事:它将永远持续下去。
如何停止?
有两种停止方式。以下是两种停止方法。
使用 compositeDisposable.dispose()
使用如下所示的
take(n)
运算符Observable.interval(0, 2, TimeUnit.SECONDS).take(5).flatMap { return@flatMap Observable.create<String> { emitter -> Log.d("IntervalExample", "Create") emitter.onNext("MindOrks") emitter.onComplete() } } .observeOn(AndroidSchedulers.mainThread()) .subscribe { Log.d("IntervalExample", it) }
由于我们在take(5)
中传递了5作为参数,任务只会执行5次,间隔为2秒。
这是使用 Handler
节流的 MediatorLiveData
实现(在 Kotlin 中,因为 Java 有很多样板文件)。
如果源实时数据更新过于频繁,结果发布将会延迟:
import android.os.Handler
import android.os.Looper
import androidx.lifecycle.LiveData
import androidx.lifecycle.MediatorLiveData
/**
* LiveData throttling value emissions so they don't happen more often than [delayMs].
*/
class ThrottledLiveData<T>(source: LiveData<T>, delayMs: Long) : MediatorLiveData<T>() {
val handler = Handler(Looper.getMainLooper())
var delayMs = delayMs
private set
private var isValueDelayed = false
private var delayedValue: T? = null
private var delayRunnable: Runnable? = null
set(value) {
field?.let { handler.removeCallbacks(it) }
value?.let { handler.postDelayed(it, delayMs) }
field = value
}
private val objDelayRunnable = Runnable { if (consumeDelayedValue()) startDelay() }
init {
addSource(source) { newValue ->
if (delayRunnable == null) {
value = newValue
startDelay()
} else {
isValueDelayed = true
delayedValue = newValue
}
}
}
/** Start throttling or modify the delay. If [newDelay] is `0` (default) reuse previous delay value. */
fun startThrottling(newDelay: Long = 0L) {
require(newDelay >= 0L)
when {
newDelay > 0 -> delayMs = newDelay
delayMs < 0 -> delayMs *= -1
delayMs > 0 -> return
else -> throw IllegalArgumentException("newDelay cannot be zero if old delayMs is zero")
}
}
/** Stop throttling, if [immediate] emit any pending value now. */
fun stopThrottling(immediate: Boolean = false) {
if (delayMs <= 0) return
delayMs *= -1
if (immediate) consumeDelayedValue()
}
override fun onInactive() {
super.onInactive()
consumeDelayedValue()
}
// start counting the delay or clear it if conditions are not met
private fun startDelay() {
delayRunnable = if (delayMs > 0 && hasActiveObservers()) objDelayRunnable else null
}
private fun consumeDelayedValue(): Boolean {
delayRunnable = null
return if (isValueDelayed) {
value = delayedValue
delayedValue = null
isValueDelayed = false
true
} else false
}
}
通过将源实时数据作为第一个参数传递并观察它来使用它:
val throttledCount = ThrottledLiveData(count1, 2500L) // maximum of one update per 2.5 sec
throttledCount.observe(this, Observer { i: Int ->
//Do something with "integer"
})
如果你想在 5 秒后停止节流,只需 post 一个延迟的可运行程序将禁用它:
val disableThrottle = Runnable { throttledCount.stopThrottling() }
throttledCount.handler.postDelayed(disableThrottle, 5000L)