如何将最新的发射添加到已经发射的集合中,创建一个包含两者的新可观察对象?

How to add the latest emission to the already emitted set, creating a new observable containing both?

我有一个 Observable,它时不时地发出值。在消费者那里,我得到了最新的价值,但我想把所说的价值作为(并包括)先前发出的价值的一部分。我知道我需要使用 switchMap 来取消当前的可观察量,并 return 一个包含所有历史排放量的新的,坚持最新的。

从下面的身份映射开始,我应该使用什么运算符?我尝试了很多不同的方法,但没有真正找到一种有条不紊的方法来缩小可用选择范围。

const source = interval(3000);
const transform = source.pipe(switchMap(_ => of(_)));
const subscribe = transform.subscribe(val =>
  console.log("unchanged: " + val)
);

目前,发出的值产生一个序列 0, 1, 2, 3, 4, ... 但我希望它保留之前发出的值并建立一个数组来修改它的最新发射。所以最后的结果会变成 [], [0], [0,1], [0,1,2], [0,1,2,3], [0,1,2,3 ,4], ....

switchMap(...) 中使用哪个 RxJs 运算符是合适的?还有比switchMap(...)更好的选择吗? pipe(...) 的方法是否合适?

StackBlitz

我以为我找到了一个 ,但这并不是我真正得到的帮助。

编辑

根据回答,我觉得 scanscanMap 可能是合适的选择。我在查文档的时候没有意识到,可能是我的困惑和不确定。

我的印象是,最好为每个新发出的值创建一个新的可观察对象,但我不知道这将如何影响性能等,因为我看不到使用 [= 的利弊15=] 与 switchScan.

您在寻找 scan 接线员吗? https://rxjs-dev.firebaseapp.com/api/operators/scan

const source = interval(3000);
const transform = source.pipe(scan((acc, curr) => [...acc, curr], []));
const subscribe = transform.subscribe(val =>
  console.log("unchanged: " + val)
);

听起来你可以使用 switchScan,这是自 RxJS 7 以来的新运算符(部分由我实现:)),但这实际上取决于你到底想做什么。

const source = interval(500);
const transform = source.pipe(
  switchScan((acc, num) => of([...acc, num]), []),
);

对于大多数情况,即使 mergeScan does the job and even scan 如果不需要 return 一个可观察对象。目前,只有 RxJs 6 被公开记录,使得普通版本和合并版本更容易识别。

const source = interval(500);
const transform = source.pipe(
  mergeScan((acc, num) => of([...acc, num]), []),
);

scan, mergeScan and switchScan. Original author's demo 的现场演示(仅限 switchScan)。