Rx swift 使用flatMap后保持原始流的原始顺序
Rx swift keeping the original order of the original stream after using flatMap
在中间应用 flatMap 后,我试图保持流的原始顺序。
下面是详细说明我的意思的图表:
----2-4-1----------------(原流)
------------1--2--------4--(网络activity - 用带有延迟的flatMap表示)
----------------2--------4-1(想要的结果)
这里有一个代码来详细说明情况:
persistMessageEventBus.flatMap({ num -> Observable<Int> in
print("aaab Doing \(num)")
let t2g = Observable.just(num).delay(Double(num), scheduler: MainScheduler.instance).do(onNext:{ num in print("aaab Done async \(num)")})
return t2g
}).concatMap({ num -> Observable<Int> in
print("aaab Done map \(num)")
return Observable.just(num)
}).subscribe(onNext: { num in
print("aaab done \(num)")
}).addDisposableTo(disposeBag)
persistMessageEventBus.onNext(2)
persistMessageEventBus.onNext(4)
persistMessageEventBus.onNext(1)
输出为:
aaab Doing 2
aaab Doing 4
aaab Doing 1
aaab Done async 1
aaab Done map 1
aaab done 1
aaab Done async 2
aaab Done map 2
aaab done 2
aaab Done async 4
aaab Done map 4
aaab done 4
想要的输出是:
aaab Doing 2
aaab Doing 4
aaab Doing 1
aaab Done async 1
aaab Done async 2
aaab Done map 2
aaab done 2
aaab Done async 4
aaab Done map 4
aaab done 4
aaab Done map 1
aaab done 1
RxSwift 中有类似的东西吗?
改为使用 .concatMap(),它保证原始顺序。
更新#1
然后显然需要索引和一些缓冲。
typealias Indexed = (num: Int, index: Int)
class Buffer {
let ordered = PublishSubject<Int>()
private var current = 0
private var buffer: [Int: Int] = [:]
func onNext(_ indexed: Indexed) {
self.buffer[indexed.index] = indexed.num
for index in self.buffer.keys.sorted() {
if index == current {
ordered.onNext(self.buffer[index]!)
self.buffer.remove(at: self.buffer.index(forKey: index)!)
current += 1
}
}
}
}
let buffer = Buffer()
buffer
.ordered
.subscribe(onNext: { num in
print("aaab done \(num)")
})
.disposed(by: disposeBag)
persistMessageEventBus
.mapWithIndex { (num, index) -> Indexed in
return (num: num, index: index)
}
.flatMap({ indexed -> Observable<Indexed> in
print("aaab Doing \(indexed.num)")
let t2g = Observable.just(indexed).delay(Double(indexed.num), scheduler: MainScheduler.instance).do(onNext: { indexed in print("aaab Done async \(indexed.num)") })
return t2g
})
.subscribe(onNext: { indexed in
buffer.onNext(indexed)
})
.disposed(by: disposeBag)
persistMessageEventBus.onNext(2)
persistMessageEventBus.onNext(4)
persistMessageEventBus.onNext(1)
aaab Done async 1
aaab done 2
aaab Done async 2
aaab done 4
aaab Done async 4
aaab done 1
在中间应用 flatMap 后,我试图保持流的原始顺序。
下面是详细说明我的意思的图表:
----2-4-1----------------(原流)
------------1--2--------4--(网络activity - 用带有延迟的flatMap表示)
----------------2--------4-1(想要的结果)
这里有一个代码来详细说明情况:
persistMessageEventBus.flatMap({ num -> Observable<Int> in
print("aaab Doing \(num)")
let t2g = Observable.just(num).delay(Double(num), scheduler: MainScheduler.instance).do(onNext:{ num in print("aaab Done async \(num)")})
return t2g
}).concatMap({ num -> Observable<Int> in
print("aaab Done map \(num)")
return Observable.just(num)
}).subscribe(onNext: { num in
print("aaab done \(num)")
}).addDisposableTo(disposeBag)
persistMessageEventBus.onNext(2)
persistMessageEventBus.onNext(4)
persistMessageEventBus.onNext(1)
输出为:
aaab Doing 2
aaab Doing 4
aaab Doing 1
aaab Done async 1
aaab Done map 1
aaab done 1
aaab Done async 2
aaab Done map 2
aaab done 2
aaab Done async 4
aaab Done map 4
aaab done 4
想要的输出是:
aaab Doing 2
aaab Doing 4
aaab Doing 1
aaab Done async 1
aaab Done async 2
aaab Done map 2
aaab done 2
aaab Done async 4
aaab Done map 4
aaab done 4
aaab Done map 1
aaab done 1
RxSwift 中有类似的东西吗?
改为使用 .concatMap(),它保证原始顺序。
更新#1
然后显然需要索引和一些缓冲。
typealias Indexed = (num: Int, index: Int)
class Buffer {
let ordered = PublishSubject<Int>()
private var current = 0
private var buffer: [Int: Int] = [:]
func onNext(_ indexed: Indexed) {
self.buffer[indexed.index] = indexed.num
for index in self.buffer.keys.sorted() {
if index == current {
ordered.onNext(self.buffer[index]!)
self.buffer.remove(at: self.buffer.index(forKey: index)!)
current += 1
}
}
}
}
let buffer = Buffer()
buffer
.ordered
.subscribe(onNext: { num in
print("aaab done \(num)")
})
.disposed(by: disposeBag)
persistMessageEventBus
.mapWithIndex { (num, index) -> Indexed in
return (num: num, index: index)
}
.flatMap({ indexed -> Observable<Indexed> in
print("aaab Doing \(indexed.num)")
let t2g = Observable.just(indexed).delay(Double(indexed.num), scheduler: MainScheduler.instance).do(onNext: { indexed in print("aaab Done async \(indexed.num)") })
return t2g
})
.subscribe(onNext: { indexed in
buffer.onNext(indexed)
})
.disposed(by: disposeBag)
persistMessageEventBus.onNext(2)
persistMessageEventBus.onNext(4)
persistMessageEventBus.onNext(1)
aaab Done async 1 aaab done 2 aaab Done async 2 aaab done 4 aaab Done async 4 aaab done 1