forEach 运算符在没有订阅的情况下被评估

forEach operator being evaluated without subscription

我正在尝试自学一些响应式函数式编程。来自 Ben Lesh 的 video 展示了一个可观察的例子。我之前的阅读表明 observable 是惰性的,即它仅在被订阅后才进行评估。奇怪的是,此代码不需要订阅即可打印到控制台。

var Rx = require('rxjs/Rx')

var source = Rx.Observable.from([1,2,3,4,5]);

var newSource = source.filter(x => x % 2 === 1)
                .map(x => x + '!')
                .forEach(x => console.log(x));

来自RxJS docs

似乎 Observable 必须积极解决 .forEach 发出的承诺,我所以对此感到困惑。

进一步的混淆源于这段代码:

var Rx = require('rxjs/Rx')

var source = Rx.Observable.from([1,2,3,4,5]);

var newSource = source.filter(x => x % 2 === 1)
      .map(x => x + '!')
      .do(x => console.log(x));

直到运行 newSource.subscribe();才计算,请帮我解释一下这两个运算符背后的区别。

Observables 默认是惰性的。如果你对一个 observable 执行一个运算符,在后台,rxjs 会为你创建一个新的 observable,它与前一个 observable 相关联。知道可观察对象是不可变的。

但是,ForEach 是一种特殊的运算符。它不是 return 一个新的 Observable,但它会订阅引擎盖下的 observable 并对该 observable 发出的每个元素执行一个函数。如果您查看 Observable class 本身上的 forEach 实现的源代码,您将看到以下内容(只是一个片段)。

const subscription = this.subscribe((value) => {
    if (subscription) {
      // if there is a subscription, then we can surmise
      // the next handling is asynchronous. Any errors thrown
      // need to be rejected explicitly and unsubscribe must be
      // called manually
      try {
        next(value);
      } catch (err) {
        reject(err);
        subscription.unsubscribe();
      }

在这里我们可以看到正在订阅 observable 并且值是 'next'-ed。下一个函数是您传递给 forEach 调用的函数。