使用 RxSwift 限制对服务 class 的并发访问
Limiting concurrent access to a service class with RxSwift
给定这样的服务class:
class Service {
let networkService = NetworkService()
func handleJobA(input: String) -> Observable<ResultA> {
return networkService
.computeA(input)
.map { [=11=].a }
}
}
当我像这样从调用方使用它时:
let service = Service()
Observable
.from(["Hello", "World"])
.flatMap {
service.handleJobA([=12=])
}
.subscribe()
然后这将同时向 service
发送多个请求。我想让流等到每个请求完成。这是可以使用 merge
运算符实现的。
Observable
.from(["Hello", "World"])
.flatMap {
Observable.just(
service.handleJobA([=13=])
)
}
.merge(maxConcurrent: 1)
.subscribe()
到目前为止一切顺利 - 该服务不会同时执行多项 handleJobA
任务。
然而,并发是一个服务细节,调用者不应该关心它。事实上,该服务在稍后阶段可能会决定允许不同的并发值。
其次,当我添加一个新方法handleJobB
时,它不能与作业A同时处于活动状态,反之亦然。
所以我的问题是:
- 如何将 maxConcurrency 限制为 handleJobA observable 作为实现细节?
- 哪个 RxSwift 模式允许对任何服务方法进行限制?
也许你想要 concat
运算符,我在下面写了一些测试代码,看看你是否想要这个:
func sleepAndPrint(label:String) -> Observable<String> {
return Observable.create { obser -> Disposable in
DispatchQueue.global().async {
sleep(3)
print("\(label) come")
obser.onNext(label)
obser.onCompleted()
}
return Disposables.create()
}
}
Observable.from(["hello","world","swift"])
// we need observable of observable sequences so just use map
// Observable<Observable<String>> in this case
.map{
sleepAndPrint(label: [=10=])
}
// Concatenates all inner observable sequences, as long as the previous observable sequence terminated successfully.
.concat()
.subscribe(onNext:{
print("subscribe: " + [=10=])
})
.addDisposableTo(disposeBag)
prints :
hello come
subscribe: hello
world come
subscribe: world
swift come
subscribe: swift
您需要一个专用于该服务的串行调度程序。这是一个可以粘贴到 playground 的示例:
/// playground
import RxSwift
class Service {
func handleJobA(input: String) -> Observable<String> {
return Observable.create { observer in
print("start job a")
sleep(3)
observer.onNext(input)
print("complete job a")
observer.onCompleted()
return Disposables.create()
}.subscribeOn(scheduler)
}
func handleJobB(input: String) -> Observable<String> {
return Observable.create { observer in
print("start job b")
sleep(3)
observer.onNext(input)
print("complete job b")
observer.onCompleted()
return Disposables.create()
return Disposables.create()
}.subscribeOn(scheduler)
}
let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "Service")
}
let service = Service()
_ = Observable.from(["hello","world","swift"])
.flatMap { service.handleJobA(input: [=10=]) }
.subscribe(onNext:{
print("result " + [=10=])
})
_ = Observable.from(["hello","world","swift"])
.flatMap { service.handleJobB(input: [=10=]) }
.subscribe(onNext:{
print("result " + [=10=])
})
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
您可以分享您的观察结果。而且一次只会执行一个,即使会从多个地方调用:
func handleJobA(input: String) -> Observable<ResultA> {
return networkService
.computeA(input)
.map { [=10=].a }
.share() // Add .share() and this observable will be shared if called multiple times
}
给定这样的服务class:
class Service {
let networkService = NetworkService()
func handleJobA(input: String) -> Observable<ResultA> {
return networkService
.computeA(input)
.map { [=11=].a }
}
}
当我像这样从调用方使用它时:
let service = Service()
Observable
.from(["Hello", "World"])
.flatMap {
service.handleJobA([=12=])
}
.subscribe()
然后这将同时向 service
发送多个请求。我想让流等到每个请求完成。这是可以使用 merge
运算符实现的。
Observable
.from(["Hello", "World"])
.flatMap {
Observable.just(
service.handleJobA([=13=])
)
}
.merge(maxConcurrent: 1)
.subscribe()
到目前为止一切顺利 - 该服务不会同时执行多项 handleJobA
任务。
然而,并发是一个服务细节,调用者不应该关心它。事实上,该服务在稍后阶段可能会决定允许不同的并发值。
其次,当我添加一个新方法handleJobB
时,它不能与作业A同时处于活动状态,反之亦然。
所以我的问题是:
- 如何将 maxConcurrency 限制为 handleJobA observable 作为实现细节?
- 哪个 RxSwift 模式允许对任何服务方法进行限制?
也许你想要 concat
运算符,我在下面写了一些测试代码,看看你是否想要这个:
func sleepAndPrint(label:String) -> Observable<String> {
return Observable.create { obser -> Disposable in
DispatchQueue.global().async {
sleep(3)
print("\(label) come")
obser.onNext(label)
obser.onCompleted()
}
return Disposables.create()
}
}
Observable.from(["hello","world","swift"])
// we need observable of observable sequences so just use map
// Observable<Observable<String>> in this case
.map{
sleepAndPrint(label: [=10=])
}
// Concatenates all inner observable sequences, as long as the previous observable sequence terminated successfully.
.concat()
.subscribe(onNext:{
print("subscribe: " + [=10=])
})
.addDisposableTo(disposeBag)
prints :
hello come
subscribe: hello
world come
subscribe: world
swift come
subscribe: swift
您需要一个专用于该服务的串行调度程序。这是一个可以粘贴到 playground 的示例:
/// playground
import RxSwift
class Service {
func handleJobA(input: String) -> Observable<String> {
return Observable.create { observer in
print("start job a")
sleep(3)
observer.onNext(input)
print("complete job a")
observer.onCompleted()
return Disposables.create()
}.subscribeOn(scheduler)
}
func handleJobB(input: String) -> Observable<String> {
return Observable.create { observer in
print("start job b")
sleep(3)
observer.onNext(input)
print("complete job b")
observer.onCompleted()
return Disposables.create()
return Disposables.create()
}.subscribeOn(scheduler)
}
let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "Service")
}
let service = Service()
_ = Observable.from(["hello","world","swift"])
.flatMap { service.handleJobA(input: [=10=]) }
.subscribe(onNext:{
print("result " + [=10=])
})
_ = Observable.from(["hello","world","swift"])
.flatMap { service.handleJobB(input: [=10=]) }
.subscribe(onNext:{
print("result " + [=10=])
})
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
您可以分享您的观察结果。而且一次只会执行一个,即使会从多个地方调用:
func handleJobA(input: String) -> Observable<ResultA> {
return networkService
.computeA(input)
.map { [=10=].a }
.share() // Add .share() and this observable will be shared if called multiple times
}