确保订户获得更新的顺序

Ensure order that subscribers get updated

有没有办法确保订阅者更新的顺序得到保证?

我有一个 hot observable,我的第一个订阅者做了一些同步工作来更新一个变量,然后我的下一个订阅者必须初始化一个服务(只有一次!),并且只有在确保设置该变量之后!

看起来像这样:

import App from './App'

var appSource = App.init() // gets the hot observable

// our second subscriber
appSource.take(1).subscribe(() => {
  // take 1 to only run this once
  nextService.init()
})

其中 App.init 看起来像这样:

...
init() {
  var source = this.createObservable() // returns a hot interval observable that fetches a resource every few minutes

  // first subscriber, updates the `myVar` every few minutes
  source.subscribe((data) => this.myVar = data)

  return source
}
...

这目前有效,但我不确定它是否会始终 100% 遵循顺序。

编辑:

据我所知,订阅者将被调用 FIFO。所以顺序有点放心。

我不知道 RxJS 是否曾明确保证观察者按订阅顺序被调用。但是,正如你所说,它通常有效。

但是,您可能会考虑对实际工作流程进行建模,而不是依赖隐式观察者顺序。

听起来您需要知道您的应用程序何时初始化,以便您可以采取进一步的行动。 App 可以不依赖于 App.init 的内部工作知识,而是为此暴露一个 API:

一种(非 Rx 方式)是让调用者向 init:

提供回调
//...
init(callback) {
  var source = this.createObservable() // returns a hot interval observable that fetches a resource every few minutes

  // first subscriber, updates the `myVar` every few minutes
  source.subscribe((data) => {
    this.myVar = data;
    if (callback) {
        callback();
        callback = undefined;
    }
  })

  return source
}

// elsewhere
App.init(() => nextService.init());

另一个替代回调的选项是 init return 一个 Promise 你的决定(或者一个 Rx.AsyncSubject 你的信号)一旦初始化完成.

还有另一种选择,但需要进行一些重构,就是将 this.myVar 建模为可观察数据。即:

init() {
    this.myVar = this.createObservable().replay(1);
    this.myVar.connect();
    // returns an observable that signals when we are initialized
    return this.myVar.first();
}

// elsewhere, you end up with this pattern...
const servicesToInit = [ App, service1, service2, service3 ];
Observable
    .of(servicesToInit)
    .concatMap(s => Rx.Observable.defer(() => s.init()))
    .toArray()
    .subscribe(results => {
        // all initializations complete
        // results is an array containing the value returned by each service's init observable
    });

现在,任何想要使用 myVar 的东西都需要以某种方式订阅它以获得当前的 and/or 未来值。他们永远不可能只是同步地询问当前值。