使用 RxSwift 限制对服务 class 的并发访问

Limiting concurrent access to a service class with RxSwift

给定这样的服务class:

class Service {
    let networkService = NetworkService()

    func handleJobA(input: String) -> Observable<ResultA> {
        return networkService
            .computeA(input)
            .map { [=11=].a }
    }
}

当我像这样从调用方使用它时:

let service = Service()

Observable
    .from(["Hello", "World"])
    .flatMap {
        service.handleJobA([=12=])
    }
    .subscribe()

然后这将同时向 service 发送多个请求。我想让流等到每个请求完成。这是可以使用 merge 运算符实现的。

Observable
    .from(["Hello", "World"])
    .flatMap {
        Observable.just(
            service.handleJobA([=13=])
        )
    }
    .merge(maxConcurrent: 1)
    .subscribe()

到目前为止一切顺利 - 该服务不会同时执行多项 handleJobA 任务。

然而,并发是一个服务细节,调用者不应该关心它。事实上,该服务在稍后阶段可能会决定允许不同的并发值。

其次,当我添加一个新方法handleJobB时,它不能与作业A同时处于活动状态,反之亦然。

所以我的问题是:

  1. 如何将 maxConcurrency 限制为 handleJobA observable 作为实现细节?
  2. 哪个 RxSwift 模式允许对任何服务方法进行限制?

也许你想要 concat 运算符,我在下面写了一些测试代码,看看你是否想要这个:

 func sleepAndPrint(label:String) -> Observable<String> {


        return Observable.create { obser -> Disposable in
            DispatchQueue.global().async {
                sleep(3)
                print("\(label) come")
                obser.onNext(label)
                obser.onCompleted()
            }

            return Disposables.create()
        }
    }


Observable.from(["hello","world","swift"])
    // we need observable of observable sequences so just use map
    // Observable<Observable<String>> in this case
    .map{
        sleepAndPrint(label: [=10=])
    }
    // Concatenates all inner observable sequences, as long as the previous observable sequence terminated successfully.
    .concat()

    .subscribe(onNext:{
        print("subscribe: " + [=10=])
    })
    .addDisposableTo(disposeBag)

    prints : 
      hello come
      subscribe: hello
      world come
      subscribe: world
      swift come
      subscribe: swift

您需要一个专用于该服务的串行调度程序。这是一个可以粘贴到 playground 的示例:

/// playground

import RxSwift

class Service {

    func handleJobA(input: String) -> Observable<String> {

        return Observable.create { observer in
            print("start job a")
            sleep(3)
            observer.onNext(input)
            print("complete job a")
            observer.onCompleted()
            return Disposables.create()
        }.subscribeOn(scheduler)
    }

    func handleJobB(input: String) -> Observable<String> {
        return Observable.create { observer in
            print("start job b")
            sleep(3)
            observer.onNext(input)
            print("complete job b")
            observer.onCompleted()
            return Disposables.create()
            return Disposables.create()
        }.subscribeOn(scheduler)
    }

    let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "Service")
}


let service = Service()

_ = Observable.from(["hello","world","swift"])
    .flatMap { service.handleJobA(input: [=10=]) }
    .subscribe(onNext:{
        print("result " + [=10=])
    })

_ = Observable.from(["hello","world","swift"])
    .flatMap { service.handleJobB(input: [=10=]) }
    .subscribe(onNext:{
        print("result " + [=10=])
    })

import PlaygroundSupport

PlaygroundPage.current.needsIndefiniteExecution = true

您可以分享您的观察结果。而且一次只会执行一个,即使会从多个地方调用:

    func handleJobA(input: String) -> Observable<ResultA> {
        return networkService
            .computeA(input)
            .map { [=10=].a }
            .share() // Add .share() and this observable will be shared if called multiple times
    }