如果未完成观察,则共享订阅
Share a subscription if observable not done
我有一个 angular 应用程序,它应该在某些情况下(软件中的某些触发器或用户请求时)与服务器同步一些数据。所以我有这样的功能:
...
public createSyncObservable(): Observable<any> {
return this.retriveDataFromStorage().pipe(
switchMap(
(data) => forkJoin(this.api.sendData1(data.data1),this.api.sendData2(data.data2),this.api.sendData3(data.data3))
),
switchMap(
(data) => this.api.getDataFromServer()
),
switchMap(
(data) => this.updateLocal(data)
)
)
}
我想要的行为是:
- 如果用户(或某些触发器)请求同步并且它已经发生,我不应该再做一次,只需等待当前同步结束并且 return 相同的可观察对象(共享)。
- 如果上次同步已经完成,它应该重新开始(创建一个新的 observable)。
我现在最好的解决方案是做这样的事情(未经测试的代码):
...
public syncData(): Observable<any> {
if (this.observable_complete) {
this.observable_complete = false;
this.syncObservable$ = this.createSyncObservable().pipe(share())
this.syncObservable$.subscribe(
(data) => {this.observable_complete = true}
)
}
return this.syncObservable$;
}
这是要走的路吗?也许我缺少一些在这种情况下可以帮助我的 RxJS 运算符?这个解决方案似乎有点老套...
您应该尝试使用 takeWhile/skipWhile 运算符:
这只是一个例子,但我希望你觉得它有用。
private isResourceFree = true;
public createSyncObservable(): Observable<any> {
return of(isResourceFree).pipe(
takeWhile(val => val),
tap(_ => this.isResourceFree = false),
switchMap(_ => this.retriveDataFromStorage()),
switchMap(
(data) => forkJoin(this.api.sendData1(data.data1),this.api.sendData2(data.data2),this.api.sendData3(data.data3))
),
switchMap(
(data) => this.api.getDataFromServer()
),
switchMap(
(data) => this.updateLocal(data)
),
tap(_ => this.isResourceFree = true)
)
}
如果调用 this.createSyncObservable()
不执行任何实际工作,而只是订阅可观察对象,那么 returns 您只需要调用该函数一次。然后你可以简单地做:
public syncData$ = this.createSyncObservable().pipe(share());
如果没有订阅者,share
将从其来源取消订阅(即当 this.createSyncObservable()
完成时)。因此,this.syncData$
的订阅者将触发对从 this.createSyncObservable()
返回的可观察对象的订阅,如果它完成的话。
// The first subscribe will trigger a subscribe to createSyncObservable()
syncData$.subscribe()
// A second subscribe while the first hasn't completed won't trigger a subscribe
// to createSyncObservable() but instead just wait for its response
syncData$.subscribe()
// After some time ...
// Another subscribe after createSyncObservable() completed will trigger another
// subscribe to createSyncObservable()
syncData$.subscribe()
我有一个 angular 应用程序,它应该在某些情况下(软件中的某些触发器或用户请求时)与服务器同步一些数据。所以我有这样的功能:
...
public createSyncObservable(): Observable<any> {
return this.retriveDataFromStorage().pipe(
switchMap(
(data) => forkJoin(this.api.sendData1(data.data1),this.api.sendData2(data.data2),this.api.sendData3(data.data3))
),
switchMap(
(data) => this.api.getDataFromServer()
),
switchMap(
(data) => this.updateLocal(data)
)
)
}
我想要的行为是:
- 如果用户(或某些触发器)请求同步并且它已经发生,我不应该再做一次,只需等待当前同步结束并且 return 相同的可观察对象(共享)。
- 如果上次同步已经完成,它应该重新开始(创建一个新的 observable)。
我现在最好的解决方案是做这样的事情(未经测试的代码):
...
public syncData(): Observable<any> {
if (this.observable_complete) {
this.observable_complete = false;
this.syncObservable$ = this.createSyncObservable().pipe(share())
this.syncObservable$.subscribe(
(data) => {this.observable_complete = true}
)
}
return this.syncObservable$;
}
这是要走的路吗?也许我缺少一些在这种情况下可以帮助我的 RxJS 运算符?这个解决方案似乎有点老套...
您应该尝试使用 takeWhile/skipWhile 运算符:
这只是一个例子,但我希望你觉得它有用。
private isResourceFree = true;
public createSyncObservable(): Observable<any> {
return of(isResourceFree).pipe(
takeWhile(val => val),
tap(_ => this.isResourceFree = false),
switchMap(_ => this.retriveDataFromStorage()),
switchMap(
(data) => forkJoin(this.api.sendData1(data.data1),this.api.sendData2(data.data2),this.api.sendData3(data.data3))
),
switchMap(
(data) => this.api.getDataFromServer()
),
switchMap(
(data) => this.updateLocal(data)
),
tap(_ => this.isResourceFree = true)
)
}
如果调用 this.createSyncObservable()
不执行任何实际工作,而只是订阅可观察对象,那么 returns 您只需要调用该函数一次。然后你可以简单地做:
public syncData$ = this.createSyncObservable().pipe(share());
如果没有订阅者,share
将从其来源取消订阅(即当 this.createSyncObservable()
完成时)。因此,this.syncData$
的订阅者将触发对从 this.createSyncObservable()
返回的可观察对象的订阅,如果它完成的话。
// The first subscribe will trigger a subscribe to createSyncObservable()
syncData$.subscribe()
// A second subscribe while the first hasn't completed won't trigger a subscribe
// to createSyncObservable() but instead just wait for its response
syncData$.subscribe()
// After some time ...
// Another subscribe after createSyncObservable() completed will trigger another
// subscribe to createSyncObservable()
syncData$.subscribe()