处置(取消)可观察的。 SubscribeOn 和 observeOn 不同的调度器
Dispose (cancel) observable. SubscribeOn and observeOn different schedulers
修改后的问题
我修改了我的问题。对于一般情况。
我想在后台线程中用 RxSwift 生成项目(从磁盘加载,long-运行 计算等),并在 MainThread 中观察项目。而且我想确保在处理后(从主线程)不会交付任何项目。
根据文档 (https://github.com/ReactiveX/RxSwift/blob/master/Documentation/GettingStarted.md#disposing):
So can this code print something after the dispose call is executed? The answer is: it depends.
If the scheduler is a serial scheduler (ex. MainScheduler) and dispose is called on the same serial scheduler, the answer is no.
Otherwise it is yes.
但是如果使用不同调度程序的 subscribeOn 和 observerOn - 我们不能保证在 dispose 之后不会发射任何东西(手动或通过 dispose 包,这无关紧要)。
我应该如何在后台生成项目(例如图像)并确保在处置后不会使用结果?
我在实际项目中做了解决方法,但我想解决这个问题并了解在相同情况下我们应该如何避免它。
在我的测试项目中,我使用了小周期 - 它们完美地展示了问题!
import RxSwift
class TestClass {
private var disposeBag = DisposeBag()
private var isCancelled = false
init(cancelAfter: TimeInterval, longRunningTaskDuration: TimeInterval) {
assert(Thread.isMainThread)
load(longRunningTaskDuration: longRunningTaskDuration)
DispatchQueue.main.asyncAfter(deadline: .now() + cancelAfter) { [weak self] in
self?.cancel()
}
}
private func load(longRunningTaskDuration: TimeInterval) {
assert(Thread.isMainThread)
// We set task not cancelled
isCancelled = false
DataService
.shared
.longRunngingTaskEmulation(sleepFor: longRunningTaskDuration)
// We want long running task to be executed in background thread
.subscribeOn(ConcurrentDispatchQueueScheduler.init(queue: .global()))
// We want to process result in Main thread
.observeOn(MainScheduler.instance)
.subscribe(onSuccess: { [weak self] (result) in
assert(Thread.isMainThread)
guard let strongSelf = self else {
return
}
if !strongSelf.isCancelled {
print("Should not be called! Task is cancelled!")
} else {
// Do something with result, set image to UIImageView, for instance
// But if task was cancelled, this method will set invalid (old) data
print(result)
}
}, onError: nil)
.disposed(by: disposeBag)
}
// Cancel all tasks. Can be called in PreapreForReuse.
private func cancel() {
assert(Thread.isMainThread)
// For test purposes. After cancel, old task should not make any changes.
isCancelled = true
// Cancel all tasks by creating new DisposeBag (and disposing old)
disposeBag = DisposeBag()
}
}
class DataService {
static let shared = DataService()
private init() { }
func longRunngingTaskEmulation(sleepFor: TimeInterval) -> Single<String> {
return Single
.deferred {
assert(!Thread.isMainThread)
// Enulate long running task
Thread.sleep(forTimeInterval: sleepFor)
// Return dummy result for test purposes.
return .just("Success")
}
}
}
class MainClass {
static let shared = MainClass()
private init() { }
func main() {
Timer.scheduledTimer(withTimeInterval: 0.150, repeats: true) { [weak self] (_) in
assert(Thread.isMainThread)
let longRunningTaskDuration: TimeInterval = 0.050
let offset = TimeInterval(arc4random_uniform(20)) / 1000.0
let cancelAfter = 0.040 + offset
self?.executeTest(cancelAfter: cancelAfter, longRunningTaskDuration: longRunningTaskDuration)
}
}
var items: [TestClass] = []
func executeTest(cancelAfter: TimeInterval, longRunningTaskDuration: TimeInterval) {
let item = TestClass(cancelAfter: cancelAfter, longRunningTaskDuration: longRunningTaskDuration)
items.append(item)
}
}
在某处调用 MainClass.shared.main() 开始。
我们调用方法加载一些数据,然后我们调用取消(全部来自主线程)。取消后我们有时会收到结果(也在主线程中),但它已经很旧了。
在实际项目中TestClass是UITableViewCell的子类,在prepareForReuse中调用了cancel方法。然后单元格被重用并将新数据设置到单元格。后来我们得到了旧任务的结果。并将旧图像设置为单元格!
原始问题(旧):
我想在 iOS 中使用 RxSwift 加载图像。我想在后台加载图像,并在主线程中使用它。所以我 subscribeOn 后台线程,并 observeOn 主线程。函数将如下所示:
func getImage(path: String) -> Single<UIImage> {
return Single
.deferred {
if let image = UIImage(contentsOfFile: path) {
return Single.just(image)
} else {
return Single.error(SimpleError())
}
}
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background))
.observeOn(MainScheduler.instance)
}
但是我在取消时遇到了问题。因为使用不同的调度程序来创建项目和调用 dispose(从主线程处理),所以可以在调用 dispose 后引发订阅事件。因此,在我使用 UITableViewCell 的情况下,我收到无效(旧)图像。
如果我在观察(主线程)的同一调度程序中创建项目(加载图像),一切正常!
但我想在后台加载图像,我希望它在处理后被取消(在 prepareForReuse 方法或新路径设置方法中)。这个的通用模板是什么?
编辑:
我已经创建了一个测试项目,我可以在其中模拟处理后收到事件时的问题。
我有一个简单有效的解决方案。我们应该在同一个调度程序中发出项目。所以我们应该捕获调度程序并在那里发出项目(在长时间 运行 任务完成后)。
func getImage2(path: String) -> Single<UIImage> {
return Single
.create(subscribe: { (single) -> Disposable in
// We captrure current queue to execute callback in
// TODO: It can be nil if called from background thread
let callbackQueue = OperationQueue.current
// For async calculations
OperationQueue().addOperation {
// Perform any long-running task
let image = UIImage(contentsOfFile: path)
// Emit item in captured queue
callbackQueue?.addOperation {
if let result = image {
single(.success(result))
} else {
single(.error(SimpleError()))
}
}
}
return Disposables.create()
})
.observeOn(MainScheduler.instance)
}
但不是Rx方式。而且我认为这不是最好的解决方案。
可能我应该使用 CurrentThreadScheduler 来发出项目,但我不明白如何。是否有任何使用调度程序生成项目的教程或示例?我没找到。
有趣的测试用例。有个小bug,应该是if strongSelf.isCancelled
而不是if !strongSelf.isCancelled
。除此之外,测试用例显示了问题。
我直觉地希望在发出之前检查是否已经发生处置,如果它发生在同一个线程上。
我还发现了这个:
just to make this clear, if you call dispose on one thread (like
main), you won't observe any elements on that same thread. That is a
guarantee.
看这里:https://github.com/ReactiveX/RxSwift/issues/38
所以这可能是一个错误。
为了确保我在这里打开了一个问题:
https://github.com/ReactiveX/RxSwift/issues/1778
更新
看来这实际上是一个错误。同时,RxSwift 的优秀人员已经确认并幸运地很快修复了它。请参阅上面的问题 link。
测试
错误已通过提交 bac86346087c7e267dd5a620eed90a7849fd54ff
修复。所以如果你使用的是 CocoaPods,你可以简单地使用类似下面的东西进行测试:
target 'RxSelfContained' do
use_frameworks!
pod 'RxAtomic', :git => 'https://github.com/ReactiveX/RxSwift.git', :commit => 'bac86346087c7e267dd5a620eed90a7849fd54ff'
pod 'RxSwift', :git => 'https://github.com/ReactiveX/RxSwift.git', :commit => 'bac86346087c7e267dd5a620eed90a7849fd54ff'
end
修改后的问题
我修改了我的问题。对于一般情况。
我想在后台线程中用 RxSwift 生成项目(从磁盘加载,long-运行 计算等),并在 MainThread 中观察项目。而且我想确保在处理后(从主线程)不会交付任何项目。
根据文档 (https://github.com/ReactiveX/RxSwift/blob/master/Documentation/GettingStarted.md#disposing):
So can this code print something after the dispose call is executed? The answer is: it depends.
If the scheduler is a serial scheduler (ex. MainScheduler) and dispose is called on the same serial scheduler, the answer is no.
Otherwise it is yes.
但是如果使用不同调度程序的 subscribeOn 和 observerOn - 我们不能保证在 dispose 之后不会发射任何东西(手动或通过 dispose 包,这无关紧要)。
我应该如何在后台生成项目(例如图像)并确保在处置后不会使用结果?
我在实际项目中做了解决方法,但我想解决这个问题并了解在相同情况下我们应该如何避免它。
在我的测试项目中,我使用了小周期 - 它们完美地展示了问题!
import RxSwift
class TestClass {
private var disposeBag = DisposeBag()
private var isCancelled = false
init(cancelAfter: TimeInterval, longRunningTaskDuration: TimeInterval) {
assert(Thread.isMainThread)
load(longRunningTaskDuration: longRunningTaskDuration)
DispatchQueue.main.asyncAfter(deadline: .now() + cancelAfter) { [weak self] in
self?.cancel()
}
}
private func load(longRunningTaskDuration: TimeInterval) {
assert(Thread.isMainThread)
// We set task not cancelled
isCancelled = false
DataService
.shared
.longRunngingTaskEmulation(sleepFor: longRunningTaskDuration)
// We want long running task to be executed in background thread
.subscribeOn(ConcurrentDispatchQueueScheduler.init(queue: .global()))
// We want to process result in Main thread
.observeOn(MainScheduler.instance)
.subscribe(onSuccess: { [weak self] (result) in
assert(Thread.isMainThread)
guard let strongSelf = self else {
return
}
if !strongSelf.isCancelled {
print("Should not be called! Task is cancelled!")
} else {
// Do something with result, set image to UIImageView, for instance
// But if task was cancelled, this method will set invalid (old) data
print(result)
}
}, onError: nil)
.disposed(by: disposeBag)
}
// Cancel all tasks. Can be called in PreapreForReuse.
private func cancel() {
assert(Thread.isMainThread)
// For test purposes. After cancel, old task should not make any changes.
isCancelled = true
// Cancel all tasks by creating new DisposeBag (and disposing old)
disposeBag = DisposeBag()
}
}
class DataService {
static let shared = DataService()
private init() { }
func longRunngingTaskEmulation(sleepFor: TimeInterval) -> Single<String> {
return Single
.deferred {
assert(!Thread.isMainThread)
// Enulate long running task
Thread.sleep(forTimeInterval: sleepFor)
// Return dummy result for test purposes.
return .just("Success")
}
}
}
class MainClass {
static let shared = MainClass()
private init() { }
func main() {
Timer.scheduledTimer(withTimeInterval: 0.150, repeats: true) { [weak self] (_) in
assert(Thread.isMainThread)
let longRunningTaskDuration: TimeInterval = 0.050
let offset = TimeInterval(arc4random_uniform(20)) / 1000.0
let cancelAfter = 0.040 + offset
self?.executeTest(cancelAfter: cancelAfter, longRunningTaskDuration: longRunningTaskDuration)
}
}
var items: [TestClass] = []
func executeTest(cancelAfter: TimeInterval, longRunningTaskDuration: TimeInterval) {
let item = TestClass(cancelAfter: cancelAfter, longRunningTaskDuration: longRunningTaskDuration)
items.append(item)
}
}
在某处调用 MainClass.shared.main() 开始。
我们调用方法加载一些数据,然后我们调用取消(全部来自主线程)。取消后我们有时会收到结果(也在主线程中),但它已经很旧了。
在实际项目中TestClass是UITableViewCell的子类,在prepareForReuse中调用了cancel方法。然后单元格被重用并将新数据设置到单元格。后来我们得到了旧任务的结果。并将旧图像设置为单元格!
原始问题(旧):
我想在 iOS 中使用 RxSwift 加载图像。我想在后台加载图像,并在主线程中使用它。所以我 subscribeOn 后台线程,并 observeOn 主线程。函数将如下所示:
func getImage(path: String) -> Single<UIImage> {
return Single
.deferred {
if let image = UIImage(contentsOfFile: path) {
return Single.just(image)
} else {
return Single.error(SimpleError())
}
}
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .background))
.observeOn(MainScheduler.instance)
}
但是我在取消时遇到了问题。因为使用不同的调度程序来创建项目和调用 dispose(从主线程处理),所以可以在调用 dispose 后引发订阅事件。因此,在我使用 UITableViewCell 的情况下,我收到无效(旧)图像。
如果我在观察(主线程)的同一调度程序中创建项目(加载图像),一切正常! 但我想在后台加载图像,我希望它在处理后被取消(在 prepareForReuse 方法或新路径设置方法中)。这个的通用模板是什么?
编辑:
我已经创建了一个测试项目,我可以在其中模拟处理后收到事件时的问题。
我有一个简单有效的解决方案。我们应该在同一个调度程序中发出项目。所以我们应该捕获调度程序并在那里发出项目(在长时间 运行 任务完成后)。
func getImage2(path: String) -> Single<UIImage> {
return Single
.create(subscribe: { (single) -> Disposable in
// We captrure current queue to execute callback in
// TODO: It can be nil if called from background thread
let callbackQueue = OperationQueue.current
// For async calculations
OperationQueue().addOperation {
// Perform any long-running task
let image = UIImage(contentsOfFile: path)
// Emit item in captured queue
callbackQueue?.addOperation {
if let result = image {
single(.success(result))
} else {
single(.error(SimpleError()))
}
}
}
return Disposables.create()
})
.observeOn(MainScheduler.instance)
}
但不是Rx方式。而且我认为这不是最好的解决方案。
可能我应该使用 CurrentThreadScheduler 来发出项目,但我不明白如何。是否有任何使用调度程序生成项目的教程或示例?我没找到。
有趣的测试用例。有个小bug,应该是if strongSelf.isCancelled
而不是if !strongSelf.isCancelled
。除此之外,测试用例显示了问题。
我直觉地希望在发出之前检查是否已经发生处置,如果它发生在同一个线程上。
我还发现了这个:
just to make this clear, if you call dispose on one thread (like main), you won't observe any elements on that same thread. That is a guarantee.
看这里:https://github.com/ReactiveX/RxSwift/issues/38
所以这可能是一个错误。
为了确保我在这里打开了一个问题: https://github.com/ReactiveX/RxSwift/issues/1778
更新
看来这实际上是一个错误。同时,RxSwift 的优秀人员已经确认并幸运地很快修复了它。请参阅上面的问题 link。
测试
错误已通过提交 bac86346087c7e267dd5a620eed90a7849fd54ff
修复。所以如果你使用的是 CocoaPods,你可以简单地使用类似下面的东西进行测试:
target 'RxSelfContained' do
use_frameworks!
pod 'RxAtomic', :git => 'https://github.com/ReactiveX/RxSwift.git', :commit => 'bac86346087c7e267dd5a620eed90a7849fd54ff'
pod 'RxSwift', :git => 'https://github.com/ReactiveX/RxSwift.git', :commit => 'bac86346087c7e267dd5a620eed90a7849fd54ff'
end