Rxjava单元素缓冲区
Rxjava single element buffer
我有一个案例,有一个对象在随机时间发布对象,我想每秒将其收集到缓冲区中并通过一些策略(例如最大分数)进行过滤以确保只有一个对象每秒在缓冲区中。
subject
.buffer(1L, TimeUnit.SECONDS)
.filter {
isNotEmpty
}
.doOnNext {
// I get all object in the one second
// That waste too much memory, the non-max object shouldn't be put into the buffer
_.asScala.max(byScore)
}
.ignoreElements
.subscribeOn(Schedulers.io)
.subscribe
此代码将在一秒钟内保存所有对象,return 我。
这不是我想要的。
有什么解决办法吗?
您可以使用以下版本的 buffer
运算符:
.buffer(long timespan, TimeUnit unit, Scheduler scheduler, int count,
Callable<U> bufferSupplier,
boolean restartTimerOnMaxSize)
它允许您定义自定义 bufferSupplier
- 用于存储缓冲值的集合。然后你可以创建你的自定义版本的集合,你最多存储一个项目,在我们的例子中,如果有新的,更大的,替换现有的值:
class SingleItemMaxCollection : ArrayList<Long>() {
override fun add(element: Long): Boolean {
return when {
size == 1 && get(0) < element -> { super.set(0, element); true }
size == 0 -> { super.add(element); true }
else -> false
}
}
}
演示,如何在一些模拟数据(每 400 毫秒发射一次的项目)上使用它:
class SO65020891 {
private fun dataProvider() = Observable.just(1L, 2L, 3L, 4L, 5L, 6L)
.concatMap { Observable.just(it).delay(400, TimeUnit.MILLISECONDS) }
private fun getCollection(): () -> SingleItemMaxCollection = { SingleItemMaxCollection() }
fun getBufferedMax(): Observable<Long> {
return dataProvider()
.buffer(1, TimeUnit.SECONDS, Schedulers.computation(), 2, getCollection(), false)
.filter { it.isNotEmpty() }
.map { it[0] }
}
}
最后,一些验证:
class SO65020891Test {
@Test
fun maxEmittedValuesReturnedWithinWindows() {
val tested = SO65020891()
val values = tested.getBufferedMax().blockingIterable().toList()
assertEquals(listOf(2L, 4L, 6L), values)
}
}
我有一个案例,有一个对象在随机时间发布对象,我想每秒将其收集到缓冲区中并通过一些策略(例如最大分数)进行过滤以确保只有一个对象每秒在缓冲区中。
subject
.buffer(1L, TimeUnit.SECONDS)
.filter {
isNotEmpty
}
.doOnNext {
// I get all object in the one second
// That waste too much memory, the non-max object shouldn't be put into the buffer
_.asScala.max(byScore)
}
.ignoreElements
.subscribeOn(Schedulers.io)
.subscribe
此代码将在一秒钟内保存所有对象,return 我。
这不是我想要的。
有什么解决办法吗?
您可以使用以下版本的 buffer
运算符:
.buffer(long timespan, TimeUnit unit, Scheduler scheduler, int count,
Callable<U> bufferSupplier,
boolean restartTimerOnMaxSize)
它允许您定义自定义 bufferSupplier
- 用于存储缓冲值的集合。然后你可以创建你的自定义版本的集合,你最多存储一个项目,在我们的例子中,如果有新的,更大的,替换现有的值:
class SingleItemMaxCollection : ArrayList<Long>() {
override fun add(element: Long): Boolean {
return when {
size == 1 && get(0) < element -> { super.set(0, element); true }
size == 0 -> { super.add(element); true }
else -> false
}
}
}
演示,如何在一些模拟数据(每 400 毫秒发射一次的项目)上使用它:
class SO65020891 {
private fun dataProvider() = Observable.just(1L, 2L, 3L, 4L, 5L, 6L)
.concatMap { Observable.just(it).delay(400, TimeUnit.MILLISECONDS) }
private fun getCollection(): () -> SingleItemMaxCollection = { SingleItemMaxCollection() }
fun getBufferedMax(): Observable<Long> {
return dataProvider()
.buffer(1, TimeUnit.SECONDS, Schedulers.computation(), 2, getCollection(), false)
.filter { it.isNotEmpty() }
.map { it[0] }
}
}
最后,一些验证:
class SO65020891Test {
@Test
fun maxEmittedValuesReturnedWithinWindows() {
val tested = SO65020891()
val values = tested.getBufferedMax().blockingIterable().toList()
assertEquals(listOf(2L, 4L, 6L), values)
}
}