RxSwift。依次执行单独的 Observables
RxSwift. Execute separate Observables sequently
我试图让我的 Observable 仅在前一个 Observable 完成时执行。我不能使用 flatMap,因为订阅可以从不同的地方调用,而且这个 Observables 之间没有联系。具体来说:我让我的 CollectionView 从服务器加载更多内容,并在该用户单击 "Send comment" 按钮后 2 秒,而 CollectionView 仍在加载其批处理。所以我想等到 CollectionView 更新完成,然后才执行我的评论的发布请求。我创建了一个名为 ObservableQueue 的 class,它工作得很好。但我需要知道它是否存在内存泄漏、死锁等问题,或者我只是遗漏了什么。这是:
extension CompositeDisposable {
@discardableResult
func insert(disposeAction: @escaping () -> ()) -> DisposeKey? {
return insert(Disposables.create(with: disposeAction))
}
}
class ObservableQueue {
private let lock = NSRecursiveLock()
private let relay = BehaviorRelay(value: 0)
private let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "ObservableQueue.scheduler")
func enqueue<T>(_ observable: Observable<T>) -> Observable<T> {
return Observable.create({ observer -> Disposable in
let disposable = CompositeDisposable()
let relayDisposable = self
.relay
.observeOn(self.scheduler)
.filter({ value -> Bool in
if value > 0 {
return false
}
self.lock.lock(); defer { self.lock.unlock() }
if self.relay.value > 0 {
return false
}
self.relay.accept(self.relay.value + 1)
disposable.insert {
self.lock.lock(); defer { self.lock.unlock() }
self.relay.accept(self.relay.value - 1)
}
return true
})
.take(1)
.flatMapLatest { _ in observable }
.subscribe { observer.on([=10=]) }
_ = disposable.insert(relayDisposable)
return disposable
})
}
}
然后我可以这样使用它:
let queue = ObservableQueue()
...
// first observable
let observable1 = Observable
.just(0)
.delay(5, scheduler: MainScheduler.instance)
queue
.enqueue(observable1)
.subscribe(onNext: { _ in
print("here1")
})
.disposed(by: rx.disposeBag)
// second observable
let observable2 = Observable
.just(0)
.delay(5, scheduler: MainScheduler.instance)
queue
.enqueue(observable2)
.subscribe(onNext: { _ in
print("here2")
})
.disposed(by: rx.disposeBag)
// third observable
let observable3 = Observable
.just(0)
.delay(5, scheduler: MainScheduler.instance)
queue
.enqueue(observable3)
.subscribe(onNext: { _ in
print("here3")
})
.disposed(by: rx.disposeBag)
我会给你一些我认为对你以后有帮助的建议。
尽可能避免 Observable.create
,这是 "brute force" 可观察对象的创建,它根本不处理背压,你必须自己实现,不是一件容易的事
通常对于 HTTP api 调用,你不需要 Observable,你应该使用 Single
或 Completable
因为你只希望从你的服务器得到一个响应,而不是响应流。
你应该小心 onNext/on...
里面的 strong self
,根据经验,如果订阅观察者的 class 有处理包,你应该使用 weak self
.
现在对于你的特殊情况,如果你只需要这对观察者(获取并发送评论),我认为队列有点过分了。您可以简单地在 "fetch" 观察者的 do(onNext:)
方法上调用 post 评论观察者(如果可用)。每次触发 "onNext" 事件时调用 Do on next。
如果您仍然需要队列,我会选择 OperationQueue
,它只对操作进行排队,并且有一个类似于 observeOperationchanges() -> Observeble<Operation>
的方法,每次操作完成时都会触发该方法。这样你订阅一次,多次入队,但这可能不符合你的需求。
我会使用 .combineLatest() 在两个 observables 都发出一些东西后产生一个事件。参见 http://rxmarbles.com/#combineLatest
CLGeocoder 也有同样的问题。根据文档,您不能在处理先前的请求时调用其中一种地理编码器方法,这与您尝试执行的操作非常相似。在此要点 (https://gist.github.com/danielt1263/64bda2a32c18b8c28e1e22085a05df5a) 中,您会发现我在后台线程上进行可观察调用并使用信号量保护作业。这就是钥匙,你需要的是信号量,而不是锁。
像这样的东西应该适合你:
class ObservableQueue {
private let semaphore = DispatchSemaphore(value: 1)
private let scheduler = ConcurrentDispatchQueueScheduler(qos: .userInitiated)
func enqueue<T>(_ observable: Observable<T>) -> Observable<T> {
let _semaphore = semaphore // To avoid the use of self in the block below
return Observable.create { observer in
_semaphore.wait()
let disposable = observable.subscribe { event in
switch event {
case .next:
observer.on(event)
case .error, .completed:
observer.on(event)
}
}
return Disposables.create {
disposable.dispose()
_semaphore.signal()
}
}
.subscribeOn(scheduler)
}
}
我试图让我的 Observable 仅在前一个 Observable 完成时执行。我不能使用 flatMap,因为订阅可以从不同的地方调用,而且这个 Observables 之间没有联系。具体来说:我让我的 CollectionView 从服务器加载更多内容,并在该用户单击 "Send comment" 按钮后 2 秒,而 CollectionView 仍在加载其批处理。所以我想等到 CollectionView 更新完成,然后才执行我的评论的发布请求。我创建了一个名为 ObservableQueue 的 class,它工作得很好。但我需要知道它是否存在内存泄漏、死锁等问题,或者我只是遗漏了什么。这是:
extension CompositeDisposable {
@discardableResult
func insert(disposeAction: @escaping () -> ()) -> DisposeKey? {
return insert(Disposables.create(with: disposeAction))
}
}
class ObservableQueue {
private let lock = NSRecursiveLock()
private let relay = BehaviorRelay(value: 0)
private let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "ObservableQueue.scheduler")
func enqueue<T>(_ observable: Observable<T>) -> Observable<T> {
return Observable.create({ observer -> Disposable in
let disposable = CompositeDisposable()
let relayDisposable = self
.relay
.observeOn(self.scheduler)
.filter({ value -> Bool in
if value > 0 {
return false
}
self.lock.lock(); defer { self.lock.unlock() }
if self.relay.value > 0 {
return false
}
self.relay.accept(self.relay.value + 1)
disposable.insert {
self.lock.lock(); defer { self.lock.unlock() }
self.relay.accept(self.relay.value - 1)
}
return true
})
.take(1)
.flatMapLatest { _ in observable }
.subscribe { observer.on([=10=]) }
_ = disposable.insert(relayDisposable)
return disposable
})
}
}
然后我可以这样使用它:
let queue = ObservableQueue()
...
// first observable
let observable1 = Observable
.just(0)
.delay(5, scheduler: MainScheduler.instance)
queue
.enqueue(observable1)
.subscribe(onNext: { _ in
print("here1")
})
.disposed(by: rx.disposeBag)
// second observable
let observable2 = Observable
.just(0)
.delay(5, scheduler: MainScheduler.instance)
queue
.enqueue(observable2)
.subscribe(onNext: { _ in
print("here2")
})
.disposed(by: rx.disposeBag)
// third observable
let observable3 = Observable
.just(0)
.delay(5, scheduler: MainScheduler.instance)
queue
.enqueue(observable3)
.subscribe(onNext: { _ in
print("here3")
})
.disposed(by: rx.disposeBag)
我会给你一些我认为对你以后有帮助的建议。
尽可能避免
Observable.create
,这是 "brute force" 可观察对象的创建,它根本不处理背压,你必须自己实现,不是一件容易的事通常对于 HTTP api 调用,你不需要 Observable,你应该使用
Single
或Completable
因为你只希望从你的服务器得到一个响应,而不是响应流。你应该小心
onNext/on...
里面的strong self
,根据经验,如果订阅观察者的 class 有处理包,你应该使用weak self
.
现在对于你的特殊情况,如果你只需要这对观察者(获取并发送评论),我认为队列有点过分了。您可以简单地在 "fetch" 观察者的 do(onNext:)
方法上调用 post 评论观察者(如果可用)。每次触发 "onNext" 事件时调用 Do on next。
如果您仍然需要队列,我会选择 OperationQueue
,它只对操作进行排队,并且有一个类似于 observeOperationchanges() -> Observeble<Operation>
的方法,每次操作完成时都会触发该方法。这样你订阅一次,多次入队,但这可能不符合你的需求。
我会使用 .combineLatest() 在两个 observables 都发出一些东西后产生一个事件。参见 http://rxmarbles.com/#combineLatest
CLGeocoder 也有同样的问题。根据文档,您不能在处理先前的请求时调用其中一种地理编码器方法,这与您尝试执行的操作非常相似。在此要点 (https://gist.github.com/danielt1263/64bda2a32c18b8c28e1e22085a05df5a) 中,您会发现我在后台线程上进行可观察调用并使用信号量保护作业。这就是钥匙,你需要的是信号量,而不是锁。
像这样的东西应该适合你:
class ObservableQueue {
private let semaphore = DispatchSemaphore(value: 1)
private let scheduler = ConcurrentDispatchQueueScheduler(qos: .userInitiated)
func enqueue<T>(_ observable: Observable<T>) -> Observable<T> {
let _semaphore = semaphore // To avoid the use of self in the block below
return Observable.create { observer in
_semaphore.wait()
let disposable = observable.subscribe { event in
switch event {
case .next:
observer.on(event)
case .error, .completed:
observer.on(event)
}
}
return Disposables.create {
disposable.dispose()
_semaphore.signal()
}
}
.subscribeOn(scheduler)
}
}