pipe() 中的 concatMap() 行为

concatMap() behavior in a pipe()

我正在尝试了解 concatMap() 运算符的行为。我写了一些example code

import './style.css';

import { concatMap, fromEvent, tap } from 'rxjs';
import { first } from 'rxjs/operators';

const button1 = document.getElementById('button1');
const button2 = document.getElementById('button2');
const button3 = document.getElementById('button3');
const obs1 = fromEvent(button1, 'click');
const obs2 = fromEvent(button2, 'click');
const obs3 = fromEvent(button3, 'click');

obs1
  .pipe(
    concatMap((obs1value) => {
      console.log('obs1:', obs1value);
      return obs2; // wait for this to complete before merging next value
    }),
    concatMap((obs2value) => {
      console.log('obs2', obs2value);
      return obs3; // wait for this to complete before merging next value
    })
  )
  .subscribe((obs3value) => {
    console.log('obs3:', obs3value);
  });

// button2.click() // nothing
// button3.click() // nothing

button1.click(); // obs1
button2.click(); // obs2
button3.click(); // obs3

button1.click(); // nothing -???
button2.click(); // nothing -???

button3.click(); // obs3
button3.click(); // obs3
button3.click(); // obs3

最初在单击按钮 1、按钮 2 和按钮 3 后,我可以看到打印语句。但是之后对于 button1 和 button2,没有打印语句,只有在 button3 的情况下才能观察到。我看到 RxJS 文档说 concatMap() 将是“......在合并下一个之前等待每个完成”。但是obs1,obs2,obs3都还没有完成,为什么我看到打印语句一次都没有?

如果我稍微更改代码 https://stackblitz.com/edit/rxjs-zggqxc,以完成如下所示的 observable

obs3.pipe(first());

现在正在舔button1,为什么什么也没有打印出来?

button1.click(); // obs1
button2.click(); // obs2
button3.click(); // obs3

button1.click(); // nothing -???
button2.click(); // obs2
button3.click(); // obs3
button3.click(); // nothing
button3.click(); // nothing
button2.click(); // obs2

当您第二次调用 button1.click(); 时,会发出 next 通知,但 obs2 尚未完成,因此不会调用 concatMap() 中的项目函数。仅当 obs2 完成时,concatMap() 才会弹出堆栈中最旧的通知并将其传递给其项目函数。

如果您记录来自 obs1next 通知,您将看到正在发出的事件。

...

obs1
  .pipe(
    tap(console.log),
    concatMap((obs1value) => {
      console.log('obs1:', obs1value);
      return obs2; // wait for this to complete before merging next value
    }),
...

请注意,您使用的是 concatMap,重点是 map。 映射不仅会连接所有可观察对象,还会用一系列其他项目替换(映射)先前可观察对象中的项目。

对于实际上映射到另一个“内部”可观察对象的“外部”可观察对象中的每个项目,映射回调(concatMap(...) 的参数)在映射实际发生时执行一次。

这些映射是什么时候执行的? - concatMap 将从外部可观察对象中取出一项,然后推迟处理来自外部可观察对象的所有其他项目,直到内部可观察对象(由地图回调返回)完成。外部 observable 的项目确实被缓冲,直到内部 observable 完成。

旁注:在同步场景中,这大致相当于来自 C# 的 SelectMany 或来自 Java Streams API.

flatMap

让我描述一下您的第一个使用场景:

  1. obs1 中的每个项目都将替换为 obs2 中的项目序列。
    • 当通过按 button1 评估此替换时(您可能以一种取决于从 obs1 收到的值的方式实现它!),第一个日志输出变得可见。
    • button1 上的进一步点击将被忽略,直到 obs2 完成。
  2. obs2 中的每个项目都将被 obs3 中的项目序列替换。
    • 同样,当通过按 button2 对替换进行评估时,第二个日志输出变为可见。
    • obs3 完成之前,将忽略对 button2 的进一步点击。

因此,当按下 button1button2button3 时,您实际上只是订阅了 obs3。 由于 obs3 永远不会完成,您确实永远不会看到任何其他输出。

在您的第二种情况下,您映射到 obs2 的可观察对象是 obs3.pipe(take(1)),它在单击 button3 后完成。 一旦发生这种情况,管道将继续映射 obs2 中的项目。一旦 obs2 中的一个项目可用(如果 button2 被中途或稍后被点击,则立即发生)下一个映射发生,然后单击 button3 再次等待。