RxSwift subscribeOn 和 observeOn 不在预期的后台线程上

RxSwift subscribeOn and observeOn not on the expected background Thread

我有以下调度程序:

let scheduler = ConcurrentDispatchQueueScheduler(queue: .global(qos: .background))

尝试过:observeOn(scheduler) 还有:subscribeOn(scheduler)

由于调度程序的原因,我预计来自订阅的回调将在某些后台线程上执行。

func signIn(withExternalUserID userID: UserID, authenticationToken: String) -> Single<QBUUser> {
        let authenticationToken = "Bearer \(authenticationToken)"
        return .create { observable in
            let request = QBRequest.logIn(
                withUserLogin: "\(userID)",
                password: authenticationToken,
                successBlock: { _, user in
                    user.password = QBSession.current.sessionDetails?.token
                    observable(.success(user)) // <- Here we are on the main thread
                }, errorBlock: { [unowned self] in
                    observable(.error(ChatError.signInError(self.resolve(errorResponse: [=11=]))))  // <- same here: main thread
                }
            )
            return Disposables.create {
                request.cancel()
            }
        }
    }

因为内部的回调API是在主线程执行的,所以我所有的subcribes而不是后台线程也是在主线程执行

tokenRequest
      .flatMap { [unowned self] token -> Single<QBUUser> in
           return self.chatService.signIn(withExternalUserID: currentUser.id, authenticationToken: token)
      }
      .flatMap { [unowned self] user -> Single<QBUUser> in
           return self.chatService.connect(user: user).andThen(Single.just(user))
      }
      .observeOn(self.scheduler)
      .subscribe(onNext: { [unowned self] user in
            self.chatStorage.set(currentQBUser: user, currentUser: currentUser)
            completable(.completed)
            self.loginSempahore.signal()
       }, onError: { [unowned self] error in
            self.chatStorage.unsetCurrentQBUser()
            completable(.error(error))
            self.loginSempahore.signal()
       })
       .disposed(by: self.disposeBag)

从 flatMap 强制所有回调并由后台线程执行订阅的正确方法是什么?

无法强制函数在特定线程上调用其回调,除非库为您提供了这样做的选项。您的 successBlockerrorBlock 闭包将在 QBRequest.logIn 想要调用它们的任何线程上调用,对此您无能为力。

也就是说,.observeOn(_:) 运算符会将执行转移到不同的线程,因此在您的最后一个代码示例中,您的 onNextonError 闭包 self.scheduler 线程上执行。