pipe() 中的 concatMap() 行为
concatMap() behavior in a pipe()
我正在尝试了解 concatMap() 运算符的行为。我写了一些example code
import './style.css';
import { concatMap, fromEvent, tap } from 'rxjs';
import { first } from 'rxjs/operators';
const button1 = document.getElementById('button1');
const button2 = document.getElementById('button2');
const button3 = document.getElementById('button3');
const obs1 = fromEvent(button1, 'click');
const obs2 = fromEvent(button2, 'click');
const obs3 = fromEvent(button3, 'click');
obs1
.pipe(
concatMap((obs1value) => {
console.log('obs1:', obs1value);
return obs2; // wait for this to complete before merging next value
}),
concatMap((obs2value) => {
console.log('obs2', obs2value);
return obs3; // wait for this to complete before merging next value
})
)
.subscribe((obs3value) => {
console.log('obs3:', obs3value);
});
// button2.click() // nothing
// button3.click() // nothing
button1.click(); // obs1
button2.click(); // obs2
button3.click(); // obs3
button1.click(); // nothing -???
button2.click(); // nothing -???
button3.click(); // obs3
button3.click(); // obs3
button3.click(); // obs3
最初在单击按钮 1、按钮 2 和按钮 3 后,我可以看到打印语句。但是之后对于 button1 和 button2,没有打印语句,只有在 button3 的情况下才能观察到。我看到 RxJS 文档说 concatMap() 将是“......在合并下一个之前等待每个完成”。但是obs1,obs2,obs3都还没有完成,为什么我看到打印语句一次都没有?
如果我稍微更改代码 https://stackblitz.com/edit/rxjs-zggqxc,以完成如下所示的 observable
obs3.pipe(first());
现在正在舔button1,为什么什么也没有打印出来?
button1.click(); // obs1
button2.click(); // obs2
button3.click(); // obs3
button1.click(); // nothing -???
button2.click(); // obs2
button3.click(); // obs3
button3.click(); // nothing
button3.click(); // nothing
button2.click(); // obs2
当您第二次调用 button1.click();
时,会发出 next
通知,但 obs2
尚未完成,因此不会调用 concatMap()
中的项目函数。仅当 obs2
完成时,concatMap()
才会弹出堆栈中最旧的通知并将其传递给其项目函数。
如果您记录来自 obs1
的 next
通知,您将看到正在发出的事件。
...
obs1
.pipe(
tap(console.log),
concatMap((obs1value) => {
console.log('obs1:', obs1value);
return obs2; // wait for this to complete before merging next value
}),
...
请注意,您使用的是 concatMap
,重点是 map
。
映射不仅会连接所有可观察对象,还会用一系列其他项目替换(映射)先前可观察对象中的项目。
对于实际上映射到另一个“内部”可观察对象的“外部”可观察对象中的每个项目,映射回调(concatMap(...)
的参数)在映射实际发生时执行一次。
这些映射是什么时候执行的? - concatMap
将从外部可观察对象中取出一项,然后推迟处理来自外部可观察对象的所有其他项目,直到内部可观察对象(由地图回调返回)完成。外部 observable 的项目确实被缓冲,直到内部 observable 完成。
旁注:在同步场景中,这大致相当于来自 C# 的 SelectMany
或来自 Java Streams API.
的 flatMap
让我描述一下您的第一个使用场景:
obs1
中的每个项目都将替换为 obs2
中的项目序列。
- 当通过按
button1
评估此替换时(您可能以一种取决于从 obs1 收到的值的方式实现它!),第一个日志输出变得可见。
- 在
button1
上的进一步点击将被忽略,直到 obs2
完成。
obs2
中的每个项目都将被 obs3
中的项目序列替换。
- 同样,当通过按
button2
对替换进行评估时,第二个日志输出变为可见。
- 在
obs3
完成之前,将忽略对 button2
的进一步点击。
因此,当按下 button1
、button2
和 button3
时,您实际上只是订阅了 obs3
。
由于 obs3
永远不会完成,您确实永远不会看到任何其他输出。
在您的第二种情况下,您映射到 obs2
的可观察对象是 obs3.pipe(take(1))
,它在单击 button3
后完成。
一旦发生这种情况,管道将继续映射 obs2
中的项目。一旦 obs2
中的一个项目可用(如果 button2
被中途或稍后被点击,则立即发生)下一个映射发生,然后单击 button3
再次等待。
我正在尝试了解 concatMap() 运算符的行为。我写了一些example code
import './style.css';
import { concatMap, fromEvent, tap } from 'rxjs';
import { first } from 'rxjs/operators';
const button1 = document.getElementById('button1');
const button2 = document.getElementById('button2');
const button3 = document.getElementById('button3');
const obs1 = fromEvent(button1, 'click');
const obs2 = fromEvent(button2, 'click');
const obs3 = fromEvent(button3, 'click');
obs1
.pipe(
concatMap((obs1value) => {
console.log('obs1:', obs1value);
return obs2; // wait for this to complete before merging next value
}),
concatMap((obs2value) => {
console.log('obs2', obs2value);
return obs3; // wait for this to complete before merging next value
})
)
.subscribe((obs3value) => {
console.log('obs3:', obs3value);
});
// button2.click() // nothing
// button3.click() // nothing
button1.click(); // obs1
button2.click(); // obs2
button3.click(); // obs3
button1.click(); // nothing -???
button2.click(); // nothing -???
button3.click(); // obs3
button3.click(); // obs3
button3.click(); // obs3
最初在单击按钮 1、按钮 2 和按钮 3 后,我可以看到打印语句。但是之后对于 button1 和 button2,没有打印语句,只有在 button3 的情况下才能观察到。我看到 RxJS 文档说 concatMap() 将是“......在合并下一个之前等待每个完成”。但是obs1,obs2,obs3都还没有完成,为什么我看到打印语句一次都没有?
如果我稍微更改代码 https://stackblitz.com/edit/rxjs-zggqxc,以完成如下所示的 observable
obs3.pipe(first());
现在正在舔button1,为什么什么也没有打印出来?
button1.click(); // obs1
button2.click(); // obs2
button3.click(); // obs3
button1.click(); // nothing -???
button2.click(); // obs2
button3.click(); // obs3
button3.click(); // nothing
button3.click(); // nothing
button2.click(); // obs2
当您第二次调用 button1.click();
时,会发出 next
通知,但 obs2
尚未完成,因此不会调用 concatMap()
中的项目函数。仅当 obs2
完成时,concatMap()
才会弹出堆栈中最旧的通知并将其传递给其项目函数。
如果您记录来自 obs1
的 next
通知,您将看到正在发出的事件。
...
obs1
.pipe(
tap(console.log),
concatMap((obs1value) => {
console.log('obs1:', obs1value);
return obs2; // wait for this to complete before merging next value
}),
...
请注意,您使用的是 concatMap
,重点是 map
。
映射不仅会连接所有可观察对象,还会用一系列其他项目替换(映射)先前可观察对象中的项目。
对于实际上映射到另一个“内部”可观察对象的“外部”可观察对象中的每个项目,映射回调(concatMap(...)
的参数)在映射实际发生时执行一次。
这些映射是什么时候执行的? - concatMap
将从外部可观察对象中取出一项,然后推迟处理来自外部可观察对象的所有其他项目,直到内部可观察对象(由地图回调返回)完成。外部 observable 的项目确实被缓冲,直到内部 observable 完成。
旁注:在同步场景中,这大致相当于来自 C# 的 SelectMany
或来自 Java Streams API.
flatMap
让我描述一下您的第一个使用场景:
obs1
中的每个项目都将替换为obs2
中的项目序列。- 当通过按
button1
评估此替换时(您可能以一种取决于从 obs1 收到的值的方式实现它!),第一个日志输出变得可见。 - 在
button1
上的进一步点击将被忽略,直到obs2
完成。
- 当通过按
obs2
中的每个项目都将被obs3
中的项目序列替换。- 同样,当通过按
button2
对替换进行评估时,第二个日志输出变为可见。 - 在
obs3
完成之前,将忽略对button2
的进一步点击。
- 同样,当通过按
因此,当按下 button1
、button2
和 button3
时,您实际上只是订阅了 obs3
。
由于 obs3
永远不会完成,您确实永远不会看到任何其他输出。
在您的第二种情况下,您映射到 obs2
的可观察对象是 obs3.pipe(take(1))
,它在单击 button3
后完成。
一旦发生这种情况,管道将继续映射 obs2
中的项目。一旦 obs2
中的一个项目可用(如果 button2
被中途或稍后被点击,则立即发生)下一个映射发生,然后单击 button3
再次等待。