rxjs 异步更新 Observable
rxjs async updates to Observable
在 Observable 流中间处理异步更新的最佳方式是什么。
假设有 3 个可观察值:
Obs1(从 API 获取数据)-> 管道到 Obs2
Obs2(转换数据)-> 管道到 Obs3
Obs3(发送转换后的数据)
(实际应用比较复杂,没有在单个Observable中完成是有原因的,这只是一个简单的例子)
如果是线性同步路径,一切都很好。
但是我们也有异步消息会改变 Obs2 的输出。
我要问的 3 种情况是:
- 我们获取数据,并通过 Obs1、Obs2 和 Obs3
- 我们收到一条消息进行更改,通过 Obs2 和 Obs3
- 我们收到一条不同的消息以进行更改,该消息还需要通过 Obs2 和 Obs3
应用上一条消息的更改
这里的主要问题是有不同类型的异步消息会改变 Obs2 的结果,但它们都仍然需要知道 Obs2 的先前结果是什么(因此发生的消息的任何其他变化之前仍然适用)
我试过在 Obs2 中使用 switchMap 并在 Obs1 中进行扫描,如下所示:
obs1
const obs1$ = obs1$.pipe(
// this returns a function used in the reducer.
map((data) => (prevData) => 'modifiedData',
scan((data, reducer) => reducer(betsMap), {})
)
obs2
const obs2$ = obs1$.pipe(
switchMap(data =>
someChange$.pipe(map(reducer => reducer(data)))
)
)
其中 someChange$
是一个使用另一个 reducer 函数应用更改的 BehaviorSubject。
这对于进行了一些更改的异步消息 #1 效果很好。
但是当消息 #2 出现并且需要进行不同的更改时,第一个更改将丢失。
应该在 obs1$ 中 "prevData" 的更改始终未定义,因为它发生在应用消息之前。
如何获取 obs2$ 的输出并对其应用异步更新以记住所有过去的更新内容? (如果需要,我可以清除所有更改)
所以如果我答对了这个问题,那么这个问题解决了两个问题:
第一:如何缓存流中最后 2 个发出的值。
scan
绝对是正确的方法,如果不止一个 place/file 需要这种缓存逻辑,我会选择自定义管道运算符,如下面的
function cachePipe() {
return sourceObservable =>
sourceObservable.pipe(
scan((acc, cur) => {
return acc.length === 2 ? [...acc.slice(1), cur] : [...acc, cur];
}, [])
);
}
cachePipe
将始终 return 通过流传递的最新 2 个值。
...
.pipe(
cachePipe()
)
第二:如何在流事件发生时同时从多个流访问数据
此处 rxjs 的 combineLatest
创建运算符可能会为您解决问题,
combineLatest(API$, async1$ ,async2$,async3$)
.pipe(
// here I have access to an array of the last emitted value of all streams
// and the data can be passed to `Obs2` in your case
)
在 pipe
中,我可以链接任意数量的可观察对象,这解决了第二个问题。
注:
combineLatest
需要在其中的所有流 emit once,在运算符 strats 发出它们的 组合值 之前,一个解决方法是对输入流使用 startWith
运算符,另一种方法是通过传递数据槽BehaviorSubject
-s.
这是 CodeSandbox 上的一个演示,它使用 cachePipe()
和 startWith
策略将源 (Obs1) 与将更改数据的异步可观察对象结合起来。
在 Observable 流中间处理异步更新的最佳方式是什么。
假设有 3 个可观察值:
Obs1(从 API 获取数据)-> 管道到 Obs2 Obs2(转换数据)-> 管道到 Obs3 Obs3(发送转换后的数据)
(实际应用比较复杂,没有在单个Observable中完成是有原因的,这只是一个简单的例子)
如果是线性同步路径,一切都很好。
但是我们也有异步消息会改变 Obs2 的输出。
我要问的 3 种情况是: - 我们获取数据,并通过 Obs1、Obs2 和 Obs3 - 我们收到一条消息进行更改,通过 Obs2 和 Obs3 - 我们收到一条不同的消息以进行更改,该消息还需要通过 Obs2 和 Obs3
应用上一条消息的更改这里的主要问题是有不同类型的异步消息会改变 Obs2 的结果,但它们都仍然需要知道 Obs2 的先前结果是什么(因此发生的消息的任何其他变化之前仍然适用)
我试过在 Obs2 中使用 switchMap 并在 Obs1 中进行扫描,如下所示:
obs1
const obs1$ = obs1$.pipe(
// this returns a function used in the reducer.
map((data) => (prevData) => 'modifiedData',
scan((data, reducer) => reducer(betsMap), {})
)
obs2
const obs2$ = obs1$.pipe(
switchMap(data =>
someChange$.pipe(map(reducer => reducer(data)))
)
)
其中 someChange$
是一个使用另一个 reducer 函数应用更改的 BehaviorSubject。
这对于进行了一些更改的异步消息 #1 效果很好。 但是当消息 #2 出现并且需要进行不同的更改时,第一个更改将丢失。
应该在 obs1$ 中 "prevData" 的更改始终未定义,因为它发生在应用消息之前。
如何获取 obs2$ 的输出并对其应用异步更新以记住所有过去的更新内容? (如果需要,我可以清除所有更改)
所以如果我答对了这个问题,那么这个问题解决了两个问题:
第一:如何缓存流中最后 2 个发出的值。
scan
绝对是正确的方法,如果不止一个 place/file 需要这种缓存逻辑,我会选择自定义管道运算符,如下面的
function cachePipe() {
return sourceObservable =>
sourceObservable.pipe(
scan((acc, cur) => {
return acc.length === 2 ? [...acc.slice(1), cur] : [...acc, cur];
}, [])
);
}
cachePipe
将始终 return 通过流传递的最新 2 个值。
...
.pipe(
cachePipe()
)
第二:如何在流事件发生时同时从多个流访问数据
此处 rxjs 的 combineLatest
创建运算符可能会为您解决问题,
combineLatest(API$, async1$ ,async2$,async3$)
.pipe(
// here I have access to an array of the last emitted value of all streams
// and the data can be passed to `Obs2` in your case
)
在 pipe
中,我可以链接任意数量的可观察对象,这解决了第二个问题。
注:
combineLatest
需要在其中的所有流 emit once,在运算符 strats 发出它们的 组合值 之前,一个解决方法是对输入流使用 startWith
运算符,另一种方法是通过传递数据槽BehaviorSubject
-s.
这是 CodeSandbox 上的一个演示,它使用 cachePipe()
和 startWith
策略将源 (Obs1) 与将更改数据的异步可观察对象结合起来。