如果在 RxJava/RxKotlin 中找不到所有条目,则检查列表并超时

Checking a list and timing out if all entries not found in RxJava/RxKotlin

我有一个场景,我有一个函数 scanForTargets,returns 一个 FoundNumber 类型的 Observable。在 FoundNumber 中,我只需要一个可以从中获取的 ID 字段。当每个元素返回到 scanResults Observable 中时,我想检查名称字段是否与目标列表中的名称之一相匹配。如果是这样,那么我想发出那个。例如,如果我要查找数字 1 和 2,并且 scanForTargets() 返回 1、2、3 和 4,那么我希望 scanForValues 仅返回 1 和 2。

需要注意的是,我只想继续这样做,直到: 1)一段时间过去了(在这种情况下我抛出错误) 2) 在超时前找到String列表中的所有项。

到目前为止我所拥有的看起来像这样,但我无法让它为我工作主要是因为停止的快捷方式 once/if 所有目标都在超时之前找到。

fun scanForValues(targetList: List<String>): Observable<FoundNumber> {
    val scanResult = scanForTargets()

    return scanResult.doOnNext {scanResult -> Log.d(TAG, "Found potential target: " + scanResult.name) }
            .filter(TargetPredicate(targetList)) //See if it's one of those we want
            .timeout(5, TimeUnit.SECONDS) //Wait a max of 5 seconds to find all items
            .doOnError { Log.w(TAG, "Failed to scan"}") }
            .map{s->scanResult.name}  
}

class TargetPredicate(private val targetList: List<String>) : Predicate<ScanResult> { override fun test(scanResult: ScanResult): Boolean {
        if(scanResult == null) {
            return false
        }
        return scanResult.name in targetList 
    }
}

如果找到列表中的所有项目,如何添加停止检查?我不能只添加另一个谓词吗?

谢谢。

更新:根据要求,这里有一些数据可以说明我的意思。

假设 scanForTargets() 和支持代码如下所示:

var emittedList: List<String?> = listOf(null, "0", "1", "2", "3")


fun scanForTargets(): Observable<FoundNumber> = Observable
    .intervalRange(0, emittedList.size.toLong(), 0, 1, TimeUnit.SECONDS)
    .map { index -> FoundNumber(emittedList[index.toInt()]) }

data class FoundNumber(val targetId: String?)

现在如果用 1 和 2 的列表调用 scanForValues,那么它应该发射回 1 和 2 的 Observable。

不,这不是添加另一个filter那么简单。

一个可能的解决方案是使用 scan 从包含您的目标的集合中删除项目,并在集合变空时完成。

示例:

val targets = listOf("a", "b", "c")

fun scanForTarget(): Observable<String> = Observable.just("a", "b")

fun scanForValues(targets: List<String>): Completable {
    val initial = targets.toMutableSet()
    return scanForTarget()
            .timeout(5, TimeUnit.SECONDS)
            .scan(initial) { acc, next -> acc.remove(next); acc }
            .filter { it.isEmpty() }
            .singleOrError()
            .toCompletable()
}

注意:Completable 是一种特殊类型的发布者,只能向 onCompleteonError 发出信号。


更新: 回复问题更新。

您问题中的新示例将不起作用,因为 null 值在 RxJava2.

中不允许

假设您解决了这个问题,以下解决方案可能会对您有所帮助。

fun scanForValues(targets: List<String>): Observable<String> {
    val accumulator: Pair<Set<String>, String?> = targets.toSet() to null
    return scanForTarget()
            .timeout(5, TimeUnit.SECONDS)
            .scan(accumulator) { acc, next -> 
                val (set, previous) = acc
                val item = if (next in set) next else null
                (set - next) to item     // return set and nullable item
            }
            .filter { it.second != null } // item not null
            .take(initial.size)           // limit to the number of items
            .map { it.second }            // unwrap the item from the pair
            .map { FoundNumber(it) }      // wrap in your class
}

不再只使用 Set<String> 作为累加器,现在我们还添加了项。

该项目可以为空,这使我们能够检查给定项目是否存在。

请注意,没有 null 值通过可观察流传递。在这种情况下,null 值被包裹在 Pair<Set<String>, String?> 中,它们永远不会是 null 本身。