确保订户获得更新的顺序
Ensure order that subscribers get updated
有没有办法确保订阅者更新的顺序得到保证?
我有一个 hot observable,我的第一个订阅者做了一些同步工作来更新一个变量,然后我的下一个订阅者必须初始化一个服务(只有一次!),并且只有在确保设置该变量之后!
看起来像这样:
import App from './App'
var appSource = App.init() // gets the hot observable
// our second subscriber
appSource.take(1).subscribe(() => {
// take 1 to only run this once
nextService.init()
})
其中 App.init
看起来像这样:
...
init() {
var source = this.createObservable() // returns a hot interval observable that fetches a resource every few minutes
// first subscriber, updates the `myVar` every few minutes
source.subscribe((data) => this.myVar = data)
return source
}
...
这目前有效,但我不确定它是否会始终 100% 遵循顺序。
编辑:
据我所知,订阅者将被调用 FIFO。所以顺序有点放心。
我不知道 RxJS 是否曾明确保证观察者按订阅顺序被调用。但是,正如你所说,它通常有效。
但是,您可能会考虑对实际工作流程进行建模,而不是依赖隐式观察者顺序。
听起来您需要知道您的应用程序何时初始化,以便您可以采取进一步的行动。 App
可以不依赖于 App.init
的内部工作知识,而是为此暴露一个 API:
一种(非 Rx 方式)是让调用者向 init
:
提供回调
//...
init(callback) {
var source = this.createObservable() // returns a hot interval observable that fetches a resource every few minutes
// first subscriber, updates the `myVar` every few minutes
source.subscribe((data) => {
this.myVar = data;
if (callback) {
callback();
callback = undefined;
}
})
return source
}
// elsewhere
App.init(() => nextService.init());
另一个替代回调的选项是 init
return 一个 Promise
你的决定(或者一个 Rx.AsyncSubject
你的信号)一旦初始化完成.
还有另一种选择,但需要进行一些重构,就是将 this.myVar
建模为可观察数据。即:
init() {
this.myVar = this.createObservable().replay(1);
this.myVar.connect();
// returns an observable that signals when we are initialized
return this.myVar.first();
}
// elsewhere, you end up with this pattern...
const servicesToInit = [ App, service1, service2, service3 ];
Observable
.of(servicesToInit)
.concatMap(s => Rx.Observable.defer(() => s.init()))
.toArray()
.subscribe(results => {
// all initializations complete
// results is an array containing the value returned by each service's init observable
});
现在,任何想要使用 myVar
的东西都需要以某种方式订阅它以获得当前的 and/or 未来值。他们永远不可能只是同步地询问当前值。
有没有办法确保订阅者更新的顺序得到保证?
我有一个 hot observable,我的第一个订阅者做了一些同步工作来更新一个变量,然后我的下一个订阅者必须初始化一个服务(只有一次!),并且只有在确保设置该变量之后!
看起来像这样:
import App from './App'
var appSource = App.init() // gets the hot observable
// our second subscriber
appSource.take(1).subscribe(() => {
// take 1 to only run this once
nextService.init()
})
其中 App.init
看起来像这样:
...
init() {
var source = this.createObservable() // returns a hot interval observable that fetches a resource every few minutes
// first subscriber, updates the `myVar` every few minutes
source.subscribe((data) => this.myVar = data)
return source
}
...
这目前有效,但我不确定它是否会始终 100% 遵循顺序。
编辑:
据我所知,订阅者将被调用 FIFO。所以顺序有点放心。
我不知道 RxJS 是否曾明确保证观察者按订阅顺序被调用。但是,正如你所说,它通常有效。
但是,您可能会考虑对实际工作流程进行建模,而不是依赖隐式观察者顺序。
听起来您需要知道您的应用程序何时初始化,以便您可以采取进一步的行动。 App
可以不依赖于 App.init
的内部工作知识,而是为此暴露一个 API:
一种(非 Rx 方式)是让调用者向 init
:
//...
init(callback) {
var source = this.createObservable() // returns a hot interval observable that fetches a resource every few minutes
// first subscriber, updates the `myVar` every few minutes
source.subscribe((data) => {
this.myVar = data;
if (callback) {
callback();
callback = undefined;
}
})
return source
}
// elsewhere
App.init(() => nextService.init());
另一个替代回调的选项是 init
return 一个 Promise
你的决定(或者一个 Rx.AsyncSubject
你的信号)一旦初始化完成.
还有另一种选择,但需要进行一些重构,就是将 this.myVar
建模为可观察数据。即:
init() {
this.myVar = this.createObservable().replay(1);
this.myVar.connect();
// returns an observable that signals when we are initialized
return this.myVar.first();
}
// elsewhere, you end up with this pattern...
const servicesToInit = [ App, service1, service2, service3 ];
Observable
.of(servicesToInit)
.concatMap(s => Rx.Observable.defer(() => s.init()))
.toArray()
.subscribe(results => {
// all initializations complete
// results is an array containing the value returned by each service's init observable
});
现在,任何想要使用 myVar
的东西都需要以某种方式订阅它以获得当前的 and/or 未来值。他们永远不可能只是同步地询问当前值。