RxSwift:如何 运行 多个 Observables 并行

RxSwift: How to Run Multiple Observables parallelly

我是秦娜。我是 rxswift 的新手。我有一个任务,就像我有一些函数一样,就像可观察的一样。目前这些所有的可观察对象都是顺序执行的。因此,我的应用程序性能正在下降。我想并行执行所有这些功能。这些功能彼此不依赖。我试过使用 zip 函数,但这些是 运行 顺序。

    return Observable.create({ (observer) -> Disposable in
_ = Observable.zip(
self.createOrAlterTable(call).subscribeOn(ConcurrentMainScheduler.instance),
self.formSchemaToSQLite(call,lastSyncTime: lastSyncTime).subscribeOn(ConcurrentMainScheduler.instance),
self.menuTableRecord(call, lastSyncTime: lastSyncTime).subscribeOn(ConcurrentMainScheduler.instance),
self.recordsTOSqlite(call,lastSyncTime: lastSyncTime).subscribeOn(ConcurrentMainScheduler.instance),
self.imagerecordsTOSqlite(call,lastSyncTime: lastSyncTime).subscribeOn(ConcurrentMainScheduler.instance),
self.syncGraphDBNode(call).subscribeOn(ConcurrentMainScheduler.instance),
self.syncSchemasToSQLServer(call).subscribeOn(ConcurrentMainScheduler.instance),
self.syncRecordsToSQLServer(call).subscribeOn(ConcurrentMainScheduler.instance)
).observeOn(MainScheduler.asyncInstance)
.subscribe(onNext: {
xdata,ydata,adata,bdata,cdata,ddata,edata,fdata in
print("All Operations Done");
})
return Disposables.create()

以上所有函数如下所示

func syncSchemasToSQLServer(_ call: CAPPluginCall) -> Observable<String>{
    return Observable.just("SYNC_SCHEMAS_TO_SQLSERVER")
}

但这些函数仍然按顺序执行。请帮我解决这个问题。

已更新

    func example() -> Observable<String>{
      var res=  Observable.zip(
            doJob(1).subscribeOn(ConcurrentDispatchQueueScheduler(qos: .default)),
            doJob(2).subscribeOn(ConcurrentDispatchQueueScheduler(qos: .default)),
            doJob(3).subscribeOn(ConcurrentDispatchQueueScheduler(qos: .default))
        )
        .debug("done")
        .subscribe()

   return Observable.just("Success");
    }
    
    func doJob(_ value: Int) -> Observable<Void> {
        return Observable.create { observer in
            print("starting job", value)
            sleep(3)
            print("done with job", value)
            observer.onNext(())
            observer.onCompleted()
            return Disposables.create()
        }
    }

ConcurrentMainScheduler.instance 不会并行处理事情。请改用 ConcurrentDispatchQueueScheduler。以下代码有效:

func example() {
    doAllJobs()
        .subscribe(onNext: { print([=10=]) })
}

func doAllJobs() -> Observable<String>{
    Observable.zip(
        doJob(1).subscribeOn(ConcurrentDispatchQueueScheduler(qos: .default)),
        doJob(2).subscribeOn(ConcurrentDispatchQueueScheduler(qos: .default)),
        doJob(3).subscribeOn(ConcurrentDispatchQueueScheduler(qos: .default))
    )
    .map { _ in "Success" }
}

func doJob(_ value: Int) -> Observable<Void> {
    Observable.create { observer in
        print("starting job", value)
        sleep(3)
        print("done with job", value)
        observer.onNext(())
        observer.onCompleted()
        return Disposables.create()
    }
}

这也行得通:

func doAllJobs() -> Observable<String> {
    let scheduler = ConcurrentDispatchQueueScheduler(qos: .default)
    return Observable.zip(
        doJob(1).subscribeOn(scheduler),
        doJob(2).subscribeOn(scheduler),
        doJob(3).subscribeOn(scheduler)
    )
    .map { _ in "Success" }
}