flatMapFirst,但延迟了最新的项目

flatMapFirst, but delaying the latest item

我正在寻找一个运算符(或一个运算符链),像 flatMapFirst,在等待当前可观察对象完成时丢弃项目,当它完成时,立即跨越另一个流以获取最新的项目。

我希望这张图能更好地解释它:

x -> [X1,X2,X3]

Input:  -a-----------b---c--d-------------------------
Output: ---A1-A2-A3----B1--B2--B3--D1-D2-D3-----------

如您所见,c 被跳过,因为 d 在它之后到达。但还要注意 d 到达时 b 仍在处理中,一旦 b 完成,d 就开始处理。

这个运算符允许我完成计算,而不是 flatMapLatest 可能从可观察到可观察切换,而不会给出完整的计算结果。它的行为类似于 flatMapFirst 但最后一个元素仍被处理,当输入元素流没有任何元素时允许在空闲期间保持一致状态。

concatMap 可能是这里的答案,但如果太多项目在 bd 之间,输出流将延迟最新项目的计算太久。

flatMapFirst (d is discarded)

Input:  -a-----------b---c--d----------------
Output: ---A1-A2-A3----B1--B2--B3------------

flatMapLatest (periods of starvation can happen)

Input:  -abcdefghijklmnop-----
Output: -------------------P1-

concatMap (too much work can get scheduled)

Input:  -a-----------b-c-d-e-f------------------------------------------
Output: ---A1-A2-A3----B1--B2--B3-C1-C2-C3--D1-D2-D3-E1-E2-E3--F1-F2-F3-

如有疑问,您可以随时编写自己的运算符。 (请注意,在下面,我只测试了快乐的路径,并没有放入任何线程保护。):

extension ObservableType {
    public func specialOp<O>(_ selector: @escaping (Self.E) throws -> O) -> RxSwift.Observable<O.E> where O : ObservableConvertibleType {
        return Observable<O.E>.create { result in
            var bag: DisposeBag! = DisposeBag()
            var current: Observable<O.E>? = nil
            var last: E? = nil
            var innerComplete: Bool = false
            func handleSub(_ element: E) {
                do {
                    current = try selector(element).asObservable()
                    current!.subscribe { subEvent in
                        switch subEvent {
                        case .completed:
                            current = nil
                            if let next = last {
                                handleSub(next)
                                last = nil
                            }
                            else if innerComplete {
                                result.onCompleted()
                                bag = nil
                            }
                        case .error(let error):
                            result.onError(error)
                        case .next(let sumElement):
                            result.onNext(sumElement)
                        }
                    }.disposed(by: bag)
                }
                catch {
                    result.onError(error)
                }
            }
            self.subscribe { event in
                switch event {
                case .completed:
                    innerComplete = true
                case .error(let error):
                    result.onError(error)
                case .next(let element):
                    if current == nil {
                        handleSub(element)
                    }
                    else {
                        last = element
                    }
                }
            }.disposed(by: bag)
            return Disposables.create {
                bag = nil
            }
        }
    }
}