Rxjava单元素缓冲区

Rxjava single element buffer

我有一个案例,有一个对象在随机时间发布对象,我想每秒将其收集到缓冲区中并通过一些策略(例如最大分数)进行过滤以确保只有一个对象每秒在缓冲区中。

subject
    .buffer(1L, TimeUnit.SECONDS)
    .filter {
        isNotEmpty
    }
    .doOnNext {
        // I get all object in the one second
        // That waste too much memory, the non-max object shouldn't be put into the buffer
        _.asScala.max(byScore)
    }
    .ignoreElements
    .subscribeOn(Schedulers.io)
    .subscribe

此代码将在一秒钟内保存所有对象,return 我。

这不是我想要的。

有什么解决办法吗?

您可以使用以下版本的 buffer 运算符:

.buffer(long timespan, TimeUnit unit, Scheduler scheduler, int count,
       Callable<U> bufferSupplier,
       boolean restartTimerOnMaxSize)

它允许您定义自定义 bufferSupplier - 用于存储缓冲值的集合。然后你可以创建你的自定义版本的集合,你最多存储一个项目,在我们的例子中,如果有新的,更大的,替换现有的值:

class SingleItemMaxCollection : ArrayList<Long>() {

    override fun add(element: Long): Boolean {
        return when {
            size == 1 && get(0) < element -> { super.set(0, element); true }
            size == 0 -> { super.add(element); true }
            else -> false
        }
    }
}

演示,如何在一些模拟数据(每 400 毫秒发射一次的项目)上使用它:

class SO65020891 {

    private fun dataProvider() = Observable.just(1L, 2L, 3L, 4L, 5L, 6L)
        .concatMap { Observable.just(it).delay(400, TimeUnit.MILLISECONDS) }

    private fun getCollection(): () -> SingleItemMaxCollection = { SingleItemMaxCollection() }

    fun getBufferedMax(): Observable<Long> {
        return dataProvider()
            .buffer(1, TimeUnit.SECONDS, Schedulers.computation(), 2, getCollection(), false)
            .filter { it.isNotEmpty() }
            .map { it[0] }
    }
}

最后,一些验证:

class SO65020891Test {

    @Test
    fun maxEmittedValuesReturnedWithinWindows() {
        val tested = SO65020891()

        val values = tested.getBufferedMax().blockingIterable().toList()

        assertEquals(listOf(2L, 4L, 6L), values)
    }
}