RxSwift - 重复可观察直到谓词

RxSwift - Repeat observable until predicate

我是 RxSwift 的新手,两天来一直在努力解决以下问题。

我包装了一个闭包,它从 API:

中读取部分 JSON 格式的字符串
func readResult() -> Observable<String> {
    return Observable<String>.create { observable -> Disposable in
        API.readValue() { (result: Result<String>) in
            switch result {
            case .success(let value): observable.onNext(value)
            case .failure(let error): observable.onError(error)
            }
            observable.onCompleted()
        }
        return Disposables.create()
    }
}

由于readValue的结果只包含了一部分JSON格式的字符串,我需要递归调用这个方法来获取完整的字符串。因此,重要的是只有在前一个阅读完成后才开始新的阅读。

我尝试使用 Observable.timerscan 来累积结果,直到我可以成功解码 json,但是使用 timer 并不能保证之前的阅读完毕。

我也考虑过使用concat但是因为我事先不知道完整的JSON字符串的长度,所以我不能这样写:

Observable.concat(readResult(), readResult())

如何确保 readResult 函数被调用,直到我能够成功解码生成的 JSON 字符串?

原则上,.reduce() 应该是完成这项工作的正确工具。

不确定为什么要用困难的方式从头开始构建 Observable,而不是使用 .from() 工厂方法。

我可能会这样做(伪代码):

let subject = PublishSubject<Observable<String>>.create()
let result = subject.switchLatest().reduce { /* update result */ }
subject.onNext(
        Observable.from( /* service call */ ).subscribeOn( /* some serial scheduler */ )
) // repeat as needed

更新

在评论中查看更具体的解决方案。