RxSwift 在 Scheduler 上调用一个简单的函数

RxSwift call a simple function on a Scheduler

我有一个简单的非反应性函数,我想在 SerialDispatchQueueScheduler 上 运行。该方法没有 return 任何东西,我也不需要它发出任何事件,但它必须在调度程序 运行 上

func deleteDataOf(_ personId: Int, on scheduler: SerialDispatchQueueScheduler) {
    //Run the method body on scheduler
    try? removeItem(at: getDirectoryURL(personId: personId))
}

可能有更好的方法,但您可以这样做:

func runOnScheduler<I>(
  scheduler: SerialDispatchQueueScheduler,
  f: @escaping (I) -> Void,
  param: I
) {
  Observable.just(param)
    .observeOn(scheduler)
    .subscribe(onNext: f)
}

func deleteDataOf(_ personId: Int) {
  print(Thread.isMainThread)
}

runOnScheduler(
  scheduler: SerialDispatchQueueScheduler(qos: .background),
  f: deleteDataOf,
  param: 1
)

// false

最简单的解决方案是将副作用放在 .do(onNext:).subscribe(onNext:) 运算符中,该运算符在相关调度程序上调用。这样它就是可观察链的一部分。

下一个最简单的方法是在调度程序上安排操作:

func deleteDataOf(_ personId: Int, on scheduler: SchedulerType) -> Disposable {
    scheduler.schedule(()) {
        try? removeItem(at: getDirectoryURL(personId: personId))
        return Disposables.create()
    }
}

请注意,它 returns 是一次性的,可以取消。

我注意到你的副作用抛出,这对我来说意味着你可能对错误感兴趣。也许 Completable 会是一个更好的主意,但是你必须订阅结果。

像这样:

func deleteDataOf(_ personId: Int) -> Completable {
    Completable.deferred {
        do {
            try removeItem(at: getDirectoryURL(personId: personId))
            return .empty()
        }
        catch {
            return .error(error)
        }
    }
}

func deleteDataOf(_ personId: Int, on scheduler: SchedulerType) {
    _ = deleteDataOf(personId)
        .subscribeOn(scheduler)
        .subscribe(onError: { print("there was an error", [=11=]) })
}