我如何检查 RxSwift 中 ReplaySubject 中的所有历史值?

How can I check all historic values in a ReplaySubject in RxSwift?

我有一个简单的结构 Foo:

struct Foo {
    let bar: String
}

现在我创建了 Foo 的无限 ReplaySubject

let subject = ReplaySubject<Foo>.createUnbounded()

我现在如何理解(未终止的)流是否有一个 Foo,其 bar 等于 abc? (这可能是第 1、第 3 或第 20 个元素。)

首先,这是一个疯狂的请求。在使用 Rx 时,你不应该考虑 "what was"。相反,您应该考虑总是如此。您应该考虑不变量...

也就是说,下面的运算符将为您发出索引。由于主体能够不断地发出事件,因此操作员被设计为实时工作。可以这样使用:

let indexes = subject.indexOfElementSatisfying { [=10=].bar == "abc" }

这里是:

extension ObservableConvertibleType {

    /**
     Emits the index of all the values in the stream that satisfy the predicate.

     - parameter pred: The predicate that determines whether the value satisfies the condition
     - returns: An observable sequence of indexes to those elements.
     */
    func indexOfElementSatisfying(_ pred: @escaping (E) throws -> Bool) -> Observable<Int> {
        return asObservable()
            .enumerated()
            .filter { try pred([=11=].element) }
            .map { [=11=].index }
    }
}