RxSwift subscribeOn 和 observeOn 不在预期的后台线程上
RxSwift subscribeOn and observeOn not on the expected background Thread
我有以下调度程序:
let scheduler = ConcurrentDispatchQueueScheduler(queue: .global(qos: .background))
尝试过:observeOn(scheduler)
还有:subscribeOn(scheduler)
由于调度程序的原因,我预计来自订阅的回调将在某些后台线程上执行。
func signIn(withExternalUserID userID: UserID, authenticationToken: String) -> Single<QBUUser> {
let authenticationToken = "Bearer \(authenticationToken)"
return .create { observable in
let request = QBRequest.logIn(
withUserLogin: "\(userID)",
password: authenticationToken,
successBlock: { _, user in
user.password = QBSession.current.sessionDetails?.token
observable(.success(user)) // <- Here we are on the main thread
}, errorBlock: { [unowned self] in
observable(.error(ChatError.signInError(self.resolve(errorResponse: [=11=])))) // <- same here: main thread
}
)
return Disposables.create {
request.cancel()
}
}
}
因为内部的回调API是在主线程执行的,所以我所有的subcribes而不是后台线程也是在主线程执行
tokenRequest
.flatMap { [unowned self] token -> Single<QBUUser> in
return self.chatService.signIn(withExternalUserID: currentUser.id, authenticationToken: token)
}
.flatMap { [unowned self] user -> Single<QBUUser> in
return self.chatService.connect(user: user).andThen(Single.just(user))
}
.observeOn(self.scheduler)
.subscribe(onNext: { [unowned self] user in
self.chatStorage.set(currentQBUser: user, currentUser: currentUser)
completable(.completed)
self.loginSempahore.signal()
}, onError: { [unowned self] error in
self.chatStorage.unsetCurrentQBUser()
completable(.error(error))
self.loginSempahore.signal()
})
.disposed(by: self.disposeBag)
从 flatMap 强制所有回调并由后台线程执行订阅的正确方法是什么?
无法强制函数在特定线程上调用其回调,除非库为您提供了这样做的选项。您的 successBlock
和 errorBlock
闭包将在 QBRequest.logIn
想要调用它们的任何线程上调用,对此您无能为力。
也就是说,.observeOn(_:)
运算符会将执行转移到不同的线程,因此在您的最后一个代码示例中,您的 onNext
和 onError
闭包 将 在 self.scheduler
线程上执行。
我有以下调度程序:
let scheduler = ConcurrentDispatchQueueScheduler(queue: .global(qos: .background))
尝试过:observeOn(scheduler)
还有:subscribeOn(scheduler)
由于调度程序的原因,我预计来自订阅的回调将在某些后台线程上执行。
func signIn(withExternalUserID userID: UserID, authenticationToken: String) -> Single<QBUUser> {
let authenticationToken = "Bearer \(authenticationToken)"
return .create { observable in
let request = QBRequest.logIn(
withUserLogin: "\(userID)",
password: authenticationToken,
successBlock: { _, user in
user.password = QBSession.current.sessionDetails?.token
observable(.success(user)) // <- Here we are on the main thread
}, errorBlock: { [unowned self] in
observable(.error(ChatError.signInError(self.resolve(errorResponse: [=11=])))) // <- same here: main thread
}
)
return Disposables.create {
request.cancel()
}
}
}
因为内部的回调API是在主线程执行的,所以我所有的subcribes而不是后台线程也是在主线程执行
tokenRequest
.flatMap { [unowned self] token -> Single<QBUUser> in
return self.chatService.signIn(withExternalUserID: currentUser.id, authenticationToken: token)
}
.flatMap { [unowned self] user -> Single<QBUUser> in
return self.chatService.connect(user: user).andThen(Single.just(user))
}
.observeOn(self.scheduler)
.subscribe(onNext: { [unowned self] user in
self.chatStorage.set(currentQBUser: user, currentUser: currentUser)
completable(.completed)
self.loginSempahore.signal()
}, onError: { [unowned self] error in
self.chatStorage.unsetCurrentQBUser()
completable(.error(error))
self.loginSempahore.signal()
})
.disposed(by: self.disposeBag)
从 flatMap 强制所有回调并由后台线程执行订阅的正确方法是什么?
无法强制函数在特定线程上调用其回调,除非库为您提供了这样做的选项。您的 successBlock
和 errorBlock
闭包将在 QBRequest.logIn
想要调用它们的任何线程上调用,对此您无能为力。
也就是说,.observeOn(_:)
运算符会将执行转移到不同的线程,因此在您的最后一个代码示例中,您的 onNext
和 onError
闭包 将 在 self.scheduler
线程上执行。