rxjs 异步更新 Observable

rxjs async updates to Observable

在 Observable 流中间处理异步更新的最佳方式是什么。

假设有 3 个可观察值:

Obs1(从 API 获取数据)-> 管道到 Obs2 Obs2(转换数据)-> 管道到 Obs3 Obs3(发送转换后的数据)

(实际应用比较复杂,没有在单个Observable中完成是有原因的,这只是一个简单的例子)

如果是线性同步路径,一切都很好。

但是我们也有异步消息会改变 Obs2 的输出。

我要问的 3 种情况是: - 我们获取数据,并通过 Obs1、Obs2 和 Obs3 - 我们收到一条消息进行更改,通过 Obs2 和 Obs3 - 我们收到一条不同的消息以进行更改,该消息还需要通过 Obs2 和 Obs3

应用上一条消息的更改

这里的主要问题是有不同类型的异步消息会改变 Obs2 的结果,但它们都仍然需要知道 Obs2 的先前结果是什么(因此发生的消息的任何其他变化之前仍然适用)

我试过在 Obs2 中使用 switchMap 并在 Obs1 中进行扫描,如下所示:

obs1

const obs1$ = obs1$.pipe(
  // this returns a function used in the reducer.
  map((data) => (prevData) => 'modifiedData',
  scan((data, reducer) => reducer(betsMap), {})
)

obs2

const obs2$ = obs1$.pipe(
  switchMap(data =>
    someChange$.pipe(map(reducer => reducer(data)))
  )
)

其中 someChange$ 是一个使用另一个 reducer 函数应用更改的 BehaviorSubject。

这对于进行了一些更改的异步消息 #1 效果很好。 但是当消息 #2 出现并且需要进行不同的更改时,第一个更改将丢失。

应该在 obs1$ 中 "prevData" 的更改始终未定义,因为它发生在应用消息之前。

如何获取 obs2$ 的输出并对其应用异步更新以记住所有过去的更新内容? (如果需要,我可以清除所有更改)

所以如果我答对了这个问题,那么这个问题解决了两个问题:

第一:如何缓存流中最后 2 个发出的值。

scan 绝对是正确的方法,如果不止一个 place/file 需要这种缓存逻辑,我会选择自定义管道运算符,如下面的

function cachePipe() {
  return sourceObservable =>
    sourceObservable.pipe(
      scan((acc, cur) => {
        return acc.length === 2 ? [...acc.slice(1), cur] : [...acc, cur];
      }, [])
    );
}

cachePipe 将始终 return 通过流传递的最新 2 个值。

...
.pipe(
      cachePipe()
     )

第二:如何在流事件发生时同时从多个流访问数据

此处 rxjs 的 combineLatest 创建运算符可能会为您解决问题,

combineLatest(API$, async1$ ,async2$,async3$)
  .pipe(
    // here I have access to an array of the last emitted value of all streams
    // and the data can be passed to `Obs2` in your case
   )

pipe 中,我可以链接任意数量的可观察对象,这解决了第二个问题。

注:

combineLatest 需要在其中的所有流 emit once,在运算符 strats 发出它们的 组合值 之前,一个解决方法是对输入流使用 startWith 运算符,另一种方法是通过传递数据槽BehaviorSubject-s.

这是 CodeSandbox 上的一个演示,它使用 cachePipe()startWith 策略将源 (Obs1) 与将更改数据的异步可观察对象结合起来。