如果在 RxJava/RxKotlin 中找不到所有条目,则检查列表并超时
Checking a list and timing out if all entries not found in RxJava/RxKotlin
我有一个场景,我有一个函数 scanForTargets,returns 一个 FoundNumber 类型的 Observable。在 FoundNumber 中,我只需要一个可以从中获取的 ID 字段。当每个元素返回到 scanResults Observable 中时,我想检查名称字段是否与目标列表中的名称之一相匹配。如果是这样,那么我想发出那个。例如,如果我要查找数字 1 和 2,并且 scanForTargets() 返回 1、2、3 和 4,那么我希望 scanForValues 仅返回 1 和 2。
需要注意的是,我只想继续这样做,直到:
1)一段时间过去了(在这种情况下我抛出错误)
2) 在超时前找到String列表中的所有项。
到目前为止我所拥有的看起来像这样,但我无法让它为我工作主要是因为停止的快捷方式 once/if 所有目标都在超时之前找到。
fun scanForValues(targetList: List<String>): Observable<FoundNumber> {
val scanResult = scanForTargets()
return scanResult.doOnNext {scanResult -> Log.d(TAG, "Found potential target: " + scanResult.name) }
.filter(TargetPredicate(targetList)) //See if it's one of those we want
.timeout(5, TimeUnit.SECONDS) //Wait a max of 5 seconds to find all items
.doOnError { Log.w(TAG, "Failed to scan"}") }
.map{s->scanResult.name}
}
class TargetPredicate(private val targetList: List<String>) : Predicate<ScanResult> { override fun test(scanResult: ScanResult): Boolean {
if(scanResult == null) {
return false
}
return scanResult.name in targetList
}
}
如果找到列表中的所有项目,如何添加停止检查?我不能只添加另一个谓词吗?
谢谢。
更新:根据要求,这里有一些数据可以说明我的意思。
假设 scanForTargets() 和支持代码如下所示:
var emittedList: List<String?> = listOf(null, "0", "1", "2", "3")
fun scanForTargets(): Observable<FoundNumber> = Observable
.intervalRange(0, emittedList.size.toLong(), 0, 1, TimeUnit.SECONDS)
.map { index -> FoundNumber(emittedList[index.toInt()]) }
data class FoundNumber(val targetId: String?)
现在如果用 1 和 2 的列表调用 scanForValues,那么它应该发射回 1 和 2 的 Observable。
不,这不是添加另一个filter
那么简单。
一个可能的解决方案是使用 scan
从包含您的目标的集合中删除项目,并在集合变空时完成。
示例:
val targets = listOf("a", "b", "c")
fun scanForTarget(): Observable<String> = Observable.just("a", "b")
fun scanForValues(targets: List<String>): Completable {
val initial = targets.toMutableSet()
return scanForTarget()
.timeout(5, TimeUnit.SECONDS)
.scan(initial) { acc, next -> acc.remove(next); acc }
.filter { it.isEmpty() }
.singleOrError()
.toCompletable()
}
注意:Completable
是一种特殊类型的发布者,只能向 onComplete
或 onError
发出信号。
更新: 回复问题更新。
您问题中的新示例将不起作用,因为 null
值在 RxJava2
.
中不允许
假设您解决了这个问题,以下解决方案可能会对您有所帮助。
fun scanForValues(targets: List<String>): Observable<String> {
val accumulator: Pair<Set<String>, String?> = targets.toSet() to null
return scanForTarget()
.timeout(5, TimeUnit.SECONDS)
.scan(accumulator) { acc, next ->
val (set, previous) = acc
val item = if (next in set) next else null
(set - next) to item // return set and nullable item
}
.filter { it.second != null } // item not null
.take(initial.size) // limit to the number of items
.map { it.second } // unwrap the item from the pair
.map { FoundNumber(it) } // wrap in your class
}
不再只使用 Set<String>
作为累加器,现在我们还添加了项。
该项目可以为空,这使我们能够检查给定项目是否存在。
请注意,没有 null
值通过可观察流传递。在这种情况下,null
值被包裹在 Pair<Set<String>, String?>
中,它们永远不会是 null
本身。
我有一个场景,我有一个函数 scanForTargets,returns 一个 FoundNumber 类型的 Observable。在 FoundNumber 中,我只需要一个可以从中获取的 ID 字段。当每个元素返回到 scanResults Observable 中时,我想检查名称字段是否与目标列表中的名称之一相匹配。如果是这样,那么我想发出那个。例如,如果我要查找数字 1 和 2,并且 scanForTargets() 返回 1、2、3 和 4,那么我希望 scanForValues 仅返回 1 和 2。
需要注意的是,我只想继续这样做,直到: 1)一段时间过去了(在这种情况下我抛出错误) 2) 在超时前找到String列表中的所有项。
到目前为止我所拥有的看起来像这样,但我无法让它为我工作主要是因为停止的快捷方式 once/if 所有目标都在超时之前找到。
fun scanForValues(targetList: List<String>): Observable<FoundNumber> {
val scanResult = scanForTargets()
return scanResult.doOnNext {scanResult -> Log.d(TAG, "Found potential target: " + scanResult.name) }
.filter(TargetPredicate(targetList)) //See if it's one of those we want
.timeout(5, TimeUnit.SECONDS) //Wait a max of 5 seconds to find all items
.doOnError { Log.w(TAG, "Failed to scan"}") }
.map{s->scanResult.name}
}
class TargetPredicate(private val targetList: List<String>) : Predicate<ScanResult> { override fun test(scanResult: ScanResult): Boolean {
if(scanResult == null) {
return false
}
return scanResult.name in targetList
}
}
如果找到列表中的所有项目,如何添加停止检查?我不能只添加另一个谓词吗?
谢谢。
更新:根据要求,这里有一些数据可以说明我的意思。
假设 scanForTargets() 和支持代码如下所示:
var emittedList: List<String?> = listOf(null, "0", "1", "2", "3")
fun scanForTargets(): Observable<FoundNumber> = Observable
.intervalRange(0, emittedList.size.toLong(), 0, 1, TimeUnit.SECONDS)
.map { index -> FoundNumber(emittedList[index.toInt()]) }
data class FoundNumber(val targetId: String?)
现在如果用 1 和 2 的列表调用 scanForValues,那么它应该发射回 1 和 2 的 Observable。
不,这不是添加另一个filter
那么简单。
一个可能的解决方案是使用 scan
从包含您的目标的集合中删除项目,并在集合变空时完成。
示例:
val targets = listOf("a", "b", "c")
fun scanForTarget(): Observable<String> = Observable.just("a", "b")
fun scanForValues(targets: List<String>): Completable {
val initial = targets.toMutableSet()
return scanForTarget()
.timeout(5, TimeUnit.SECONDS)
.scan(initial) { acc, next -> acc.remove(next); acc }
.filter { it.isEmpty() }
.singleOrError()
.toCompletable()
}
注意:Completable
是一种特殊类型的发布者,只能向 onComplete
或 onError
发出信号。
更新: 回复问题更新。
您问题中的新示例将不起作用,因为 null
值在 RxJava2
.
假设您解决了这个问题,以下解决方案可能会对您有所帮助。
fun scanForValues(targets: List<String>): Observable<String> {
val accumulator: Pair<Set<String>, String?> = targets.toSet() to null
return scanForTarget()
.timeout(5, TimeUnit.SECONDS)
.scan(accumulator) { acc, next ->
val (set, previous) = acc
val item = if (next in set) next else null
(set - next) to item // return set and nullable item
}
.filter { it.second != null } // item not null
.take(initial.size) // limit to the number of items
.map { it.second } // unwrap the item from the pair
.map { FoundNumber(it) } // wrap in your class
}
不再只使用 Set<String>
作为累加器,现在我们还添加了项。
该项目可以为空,这使我们能够检查给定项目是否存在。
请注意,没有 null
值通过可观察流传递。在这种情况下,null
值被包裹在 Pair<Set<String>, String?>
中,它们永远不会是 null
本身。