flatMapFirst,但延迟了最新的项目
flatMapFirst, but delaying the latest item
我正在寻找一个运算符(或一个运算符链),像 flatMapFirst,在等待当前可观察对象完成时丢弃项目,当它完成时,立即跨越另一个流以获取最新的项目。
我希望这张图能更好地解释它:
x -> [X1,X2,X3]
Input: -a-----------b---c--d-------------------------
Output: ---A1-A2-A3----B1--B2--B3--D1-D2-D3-----------
如您所见,c
被跳过,因为 d
在它之后到达。但还要注意 d
到达时 b
仍在处理中,一旦 b
完成,d
就开始处理。
这个运算符允许我完成计算,而不是 flatMapLatest
可能从可观察到可观察切换,而不会给出完整的计算结果。它的行为类似于 flatMapFirst
但最后一个元素仍被处理,当输入元素流没有任何元素时允许在空闲期间保持一致状态。
concatMap
可能是这里的答案,但如果太多项目在 b
和 d
之间,输出流将延迟最新项目的计算太久。
flatMapFirst (d is discarded)
Input: -a-----------b---c--d----------------
Output: ---A1-A2-A3----B1--B2--B3------------
flatMapLatest (periods of starvation can happen)
Input: -abcdefghijklmnop-----
Output: -------------------P1-
concatMap (too much work can get scheduled)
Input: -a-----------b-c-d-e-f------------------------------------------
Output: ---A1-A2-A3----B1--B2--B3-C1-C2-C3--D1-D2-D3-E1-E2-E3--F1-F2-F3-
如有疑问,您可以随时编写自己的运算符。 (请注意,在下面,我只测试了快乐的路径,并没有放入任何线程保护。):
extension ObservableType {
public func specialOp<O>(_ selector: @escaping (Self.E) throws -> O) -> RxSwift.Observable<O.E> where O : ObservableConvertibleType {
return Observable<O.E>.create { result in
var bag: DisposeBag! = DisposeBag()
var current: Observable<O.E>? = nil
var last: E? = nil
var innerComplete: Bool = false
func handleSub(_ element: E) {
do {
current = try selector(element).asObservable()
current!.subscribe { subEvent in
switch subEvent {
case .completed:
current = nil
if let next = last {
handleSub(next)
last = nil
}
else if innerComplete {
result.onCompleted()
bag = nil
}
case .error(let error):
result.onError(error)
case .next(let sumElement):
result.onNext(sumElement)
}
}.disposed(by: bag)
}
catch {
result.onError(error)
}
}
self.subscribe { event in
switch event {
case .completed:
innerComplete = true
case .error(let error):
result.onError(error)
case .next(let element):
if current == nil {
handleSub(element)
}
else {
last = element
}
}
}.disposed(by: bag)
return Disposables.create {
bag = nil
}
}
}
}
我正在寻找一个运算符(或一个运算符链),像 flatMapFirst,在等待当前可观察对象完成时丢弃项目,当它完成时,立即跨越另一个流以获取最新的项目。
我希望这张图能更好地解释它:
x -> [X1,X2,X3]
Input: -a-----------b---c--d-------------------------
Output: ---A1-A2-A3----B1--B2--B3--D1-D2-D3-----------
如您所见,c
被跳过,因为 d
在它之后到达。但还要注意 d
到达时 b
仍在处理中,一旦 b
完成,d
就开始处理。
这个运算符允许我完成计算,而不是 flatMapLatest
可能从可观察到可观察切换,而不会给出完整的计算结果。它的行为类似于 flatMapFirst
但最后一个元素仍被处理,当输入元素流没有任何元素时允许在空闲期间保持一致状态。
concatMap
可能是这里的答案,但如果太多项目在 b
和 d
之间,输出流将延迟最新项目的计算太久。
flatMapFirst (d is discarded)
Input: -a-----------b---c--d----------------
Output: ---A1-A2-A3----B1--B2--B3------------
flatMapLatest (periods of starvation can happen)
Input: -abcdefghijklmnop-----
Output: -------------------P1-
concatMap (too much work can get scheduled)
Input: -a-----------b-c-d-e-f------------------------------------------
Output: ---A1-A2-A3----B1--B2--B3-C1-C2-C3--D1-D2-D3-E1-E2-E3--F1-F2-F3-
如有疑问,您可以随时编写自己的运算符。 (请注意,在下面,我只测试了快乐的路径,并没有放入任何线程保护。):
extension ObservableType {
public func specialOp<O>(_ selector: @escaping (Self.E) throws -> O) -> RxSwift.Observable<O.E> where O : ObservableConvertibleType {
return Observable<O.E>.create { result in
var bag: DisposeBag! = DisposeBag()
var current: Observable<O.E>? = nil
var last: E? = nil
var innerComplete: Bool = false
func handleSub(_ element: E) {
do {
current = try selector(element).asObservable()
current!.subscribe { subEvent in
switch subEvent {
case .completed:
current = nil
if let next = last {
handleSub(next)
last = nil
}
else if innerComplete {
result.onCompleted()
bag = nil
}
case .error(let error):
result.onError(error)
case .next(let sumElement):
result.onNext(sumElement)
}
}.disposed(by: bag)
}
catch {
result.onError(error)
}
}
self.subscribe { event in
switch event {
case .completed:
innerComplete = true
case .error(let error):
result.onError(error)
case .next(let element):
if current == nil {
handleSub(element)
}
else {
last = element
}
}
}.disposed(by: bag)
return Disposables.create {
bag = nil
}
}
}
}