我如何启动特定队列上的可观察链?

How do I kick off a chain of observable on a particular queue?

我的理解是 SubscribeOn 告诉 Rx 序列应该从哪个队列开始,而 observeOn 会切换调度程序。

好像不是这样,看来我缺乏一些基本的了解。

请考虑以下示例。

override func viewDidLoad() {
        super.viewDidLoad()
        // Do any additional setup after loading the view.
        operation1().observeOn(realmReadScheduler)
            .observeOn(realmWriteScheduler)
            .flatMap(self.operation2)
            .observeOn(realmReadScheduler)
            .flatMap(operation3)
            .observeOn(realmSignalScheduler)
            .flatMap(operation4)
            .observeOn(realmConvertScheduler)
            .flatMap(operation5).subscribeOn(realmConvertScheduler)
            .subscribe(onNext: {success in
                print(success)
            })


    }

打印出:

> Operation 1 com.apple.main-thread Operation 2 com.jci.xaap.realm.write
> Operation 3 com.jci.xaap.realm.read Operation 4
> rxswift.queue.DispatchQoS(qosClass:
> Dispatch.DispatchQoS.QoSClass.background, relativePriority: 0)
> Operation 5 com.jci.xaap.realm.convert

我原以为 Operation1 会在 'realmConvertScheduler' 而不是主线程上执行..

如何确保链中的第一个可观察对象在我想要的队列上执行?

我怀疑你的 operaton1() 写得不正确。请注意,该函数是在主线程上调用的。 Observable 所做的工作是在 subscribeOn 告诉它使用的任何线程上执行的(或者如果不存在 subscribeOn 则调用 subscribe 的线程。

这是一个例子:

let schedulerA = ConcurrentDispatchQueueScheduler(queue: DispatchQueue(label: "schedulerA"))

func operation1() -> Observable<Void> {
    return Observable.create { observer in
        print("do your work on:", Thread.current)
        observer.onNext(())
        observer.onCompleted()
        return Disposables.create()
    }
}

operation1()
    .observeOn(MainScheduler.instance)
    .subscribeOn(schedulerA)
    .subscribe(onNext: {
        print("subscribe(onNext:) on:", Thread.current)
    })

另一种选择是这样写:

func operation1() -> Observable<Void> {
    return Observable.deferred {
        print("do your work on:", Thread.current)
        return .just(())
    }
}