RxSwift 跳过事件直到自己的序列完成
RxSwift Skip Events Until Own Sequence has finished
我有一个 observable(我们称它为 trigger)可以在短时间内发射多次。当它发出时,我正在执行网络请求,并且我正在使用扫描运算符存储结果。
我的问题是我想等到请求完成后再做。 (但是就像现在一样,如果触发器发出 2 个可观察对象,fetchData 是否完成并不重要,它会再次执行)
奖金:我也想每 X 秒只使用第一个(去抖动不是解决方案,因为它可以一直发射,我想每 X 秒获得 1 个,它也不是油门,因为如果一个 observable 发射 2 次真的很快我会得到第一个和第二个延迟 X 秒)
代码:
trigger.flatMap { [unowned self] _ in
self.fetchData()
}.scan([], accumulator: { lastValue, newValue in
return lastValue + newValue
})
和获取数据:
func fetchData() -> Observable<[ReusableCellVMContainer]>
触发器:
let trigger = Observable.of(input.viewIsLoaded, handle(input.isNearBottomEdge)).merge()
对不起,我误解了你在我下面的回答中想要完成的事情。
将实现您想要的操作符是 flatMapFirst
。这将忽略来自触发器的事件,直到 fetchData()
完成。
trigger
.flatMapFirst { [unowned self] _ in
self.fetchData()
}
.scan([], accumulator: { lastValue, newValue in
return lastValue + newValue
})
如果有帮助,我将在下面留下我之前的答案(如果有的话,它有 "bonus" 答案。)
您遇到的问题称为 "back pressure",即当可观察对象产生值的速度快于观察者可以处理的速度时。
在这种特殊情况下,我建议您不要限制数据获取请求,而是将每个请求映射到一个键,然后按顺序发出数组:
trigger
.enumerated()
.flatMap { [unowned self] count, _ in
Observable.combineLatest(Observable.just(count), self.fetchData())
}
.scan(into: [Int: Value](), accumulator: { lastValue, newValue in
lastValue[newValue.0] = newValue.1
})
.map { [=11=].sorted(by: { [=11=].key < .key }).map { [=11=].value }}
要完成上述工作,您需要:
extension ObservableType {
func enumerated() -> Observable<(Int, E)> {
let shared = share()
let counter = shared.scan(0, accumulator: { prev, _ in return prev + 1 })
return Observable.zip(counter, shared)
}
}
这样,您的网络请求会尽快开始,但您不会失去发出它们的顺序。
对于您的 "bonus",buffer
运算符将完全按照您的要求进行操作。类似于:
trigger.buffer(timeSpan: seconds, count: Int.max, scheduler: MainScheduler.instance)
.map { [=13=].first }
我有一个 observable(我们称它为 trigger)可以在短时间内发射多次。当它发出时,我正在执行网络请求,并且我正在使用扫描运算符存储结果。
我的问题是我想等到请求完成后再做。 (但是就像现在一样,如果触发器发出 2 个可观察对象,fetchData 是否完成并不重要,它会再次执行)
奖金:我也想每 X 秒只使用第一个(去抖动不是解决方案,因为它可以一直发射,我想每 X 秒获得 1 个,它也不是油门,因为如果一个 observable 发射 2 次真的很快我会得到第一个和第二个延迟 X 秒)
代码:
trigger.flatMap { [unowned self] _ in
self.fetchData()
}.scan([], accumulator: { lastValue, newValue in
return lastValue + newValue
})
和获取数据:
func fetchData() -> Observable<[ReusableCellVMContainer]>
触发器:
let trigger = Observable.of(input.viewIsLoaded, handle(input.isNearBottomEdge)).merge()
对不起,我误解了你在我下面的回答中想要完成的事情。
将实现您想要的操作符是 flatMapFirst
。这将忽略来自触发器的事件,直到 fetchData()
完成。
trigger
.flatMapFirst { [unowned self] _ in
self.fetchData()
}
.scan([], accumulator: { lastValue, newValue in
return lastValue + newValue
})
如果有帮助,我将在下面留下我之前的答案(如果有的话,它有 "bonus" 答案。)
您遇到的问题称为 "back pressure",即当可观察对象产生值的速度快于观察者可以处理的速度时。
在这种特殊情况下,我建议您不要限制数据获取请求,而是将每个请求映射到一个键,然后按顺序发出数组:
trigger
.enumerated()
.flatMap { [unowned self] count, _ in
Observable.combineLatest(Observable.just(count), self.fetchData())
}
.scan(into: [Int: Value](), accumulator: { lastValue, newValue in
lastValue[newValue.0] = newValue.1
})
.map { [=11=].sorted(by: { [=11=].key < .key }).map { [=11=].value }}
要完成上述工作,您需要:
extension ObservableType {
func enumerated() -> Observable<(Int, E)> {
let shared = share()
let counter = shared.scan(0, accumulator: { prev, _ in return prev + 1 })
return Observable.zip(counter, shared)
}
}
这样,您的网络请求会尽快开始,但您不会失去发出它们的顺序。
对于您的 "bonus",buffer
运算符将完全按照您的要求进行操作。类似于:
trigger.buffer(timeSpan: seconds, count: Int.max, scheduler: MainScheduler.instance)
.map { [=13=].first }