Android LiveData - 如何限制结果

Android LiveData - how to throttle results

我有一个 liveData 设置,它发出一些整数。但我想限制排放量,使其每 5 秒才发生一次。但在前 5 秒过去后,可以取消限制。

我在这里观察到这样的计数:

count1.observe(this, new Observer() {
      @Override public void onChanged(@Nullable Integer i) {
        //Do something with "integer"
      }
    });

但我希望限制 onChanged() 回调,使其在前 5 秒内最多调用 2 次。前 5 秒过去后,我不再需要节流了。我不确定如何执行此操作,因为我没有看到任何用于实时数据处理流的选项。我尝试了 Transformation,但它只有 map 和 switchMap。如何才能做到这一点 ? enter code here

在您的情况下,您应该使用 rxjava 的间隔运算符。

Interval Operator create an Observable that emits a sequence of integers spaced by a given time interval. It is used when we want to do a task again and again after some interval.

示例:

val disposable =
Observable.interval(0, 2, TimeUnit.SECONDS)
    .flatMap {
        return@flatMap Observable.create<String> { emitter ->
            Log.d("IntervalExample", "Create")
            emitter.onNext("MindOrks")
            emitter.onComplete()
        }
    }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe {
        Log.d("IntervalExample", it)
    }
compositeDisposable.add(disposable)

此处任务会间隔2秒重复执行

需要注意的一件事:它将永远持续下去。

如何停止?

有两种停止方式。以下是两种停止方法。

  1. 使用 compositeDisposable.dispose()

  2. 使用如下所示的 take(n) 运算符

    Observable.interval(0, 2, TimeUnit.SECONDS).take(5).flatMap {
    return@flatMap Observable.create<String> { emitter ->
        Log.d("IntervalExample", "Create")
        emitter.onNext("MindOrks")
        emitter.onComplete()
        }
    }
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe {
        Log.d("IntervalExample", it)
    }
    

由于我们在take(5)中传递了5作为参数,任务只会执行5次,间隔为2秒。

这是使用 Handler 节流的 MediatorLiveData 实现(在 Kotlin 中,因为 Java 有很多样板文件)。

如果源实时数据更新过于频繁,结果发布将会延迟:

import android.os.Handler
import android.os.Looper
import androidx.lifecycle.LiveData
import androidx.lifecycle.MediatorLiveData

/**
 * LiveData throttling value emissions so they don't happen more often than [delayMs].
 */
class ThrottledLiveData<T>(source: LiveData<T>, delayMs: Long) : MediatorLiveData<T>() {
    val handler = Handler(Looper.getMainLooper())
    var delayMs = delayMs
        private set

    private var isValueDelayed = false
    private var delayedValue: T? = null
    private var delayRunnable: Runnable? = null
        set(value) {
            field?.let { handler.removeCallbacks(it) }
            value?.let { handler.postDelayed(it, delayMs) }
            field = value
        }
    private val objDelayRunnable = Runnable { if (consumeDelayedValue()) startDelay() }

    init {
        addSource(source) { newValue ->
            if (delayRunnable == null) {
                value = newValue
                startDelay()
            } else {
                isValueDelayed = true
                delayedValue = newValue
            }
        }
    }

    /** Start throttling or modify the delay. If [newDelay] is `0` (default) reuse previous delay value. */
    fun startThrottling(newDelay: Long = 0L) {
        require(newDelay >= 0L)
        when {
            newDelay > 0 -> delayMs = newDelay
            delayMs < 0 -> delayMs *= -1
            delayMs > 0 -> return
            else -> throw IllegalArgumentException("newDelay cannot be zero if old delayMs is zero")
        }
    }

    /** Stop throttling, if [immediate] emit any pending value now. */
    fun stopThrottling(immediate: Boolean = false) {
        if (delayMs <= 0) return
        delayMs *= -1
        if (immediate) consumeDelayedValue()
    }

    override fun onInactive() {
        super.onInactive()
        consumeDelayedValue()
    }

    // start counting the delay or clear it if conditions are not met
    private fun startDelay() {
        delayRunnable = if (delayMs > 0 && hasActiveObservers()) objDelayRunnable else null
    }

    private fun consumeDelayedValue(): Boolean {
        delayRunnable = null
        return if (isValueDelayed) {
            value = delayedValue
            delayedValue = null
            isValueDelayed = false
            true
        } else false
    }
}

通过将源实时数据作为第一个参数传递并观察它来使用它:

val throttledCount = ThrottledLiveData(count1, 2500L)  // maximum of one update per 2.5 sec
throttledCount.observe(this, Observer { i: Int ->
    //Do something with "integer"
})

如果你想在 5 秒后停止节流,只需 post 一个延迟的可运行程序将禁用它:

val disableThrottle = Runnable { throttledCount.stopThrottling() }
throttledCount.handler.postDelayed(disableThrottle, 5000L)