filter(cond) 和 flatMap(x => cond? of(x) : EMPTY) 有区别吗?

Is there a difference between filter(cond) and flatMap(x => cond? of(x) : EMPTY)?

我试图了解这两个可观察值之间的区别。代码的唯一区别是:

/**
* Inside rxjs pipe
*/
if(typeof x === 'number' && x > 3) {
    return of(x);
} else {
    return EMPTY;
}

对比:

.filter(typeof x === 'number' && x > 3)

我的测试是运行:

const a$ = from([1, 6, '4']).pipe(
            tap(console.log),
            flatMap((x) => {
                if (typeof x === 'number') {
                    if (x > 3) {
                        return of(x);
                    }
                    return EMPTY;
                }
                return EMPTY;
            }),
            tap(console.log)
        );
        const sub_a = a$.subscribe(
            (x) => { console.log(x, 'success'); done(); },
            (e) => { console.log(e, 'error'); done(); },
            () => { console.log('complete'); sub_a.unsubscribe(); done(); }
        );

和:

        const b$ = from([2, 5, '8']).pipe(
            tap(console.log),
            filter(x => typeof x === 'number' && x > 3),
            tap(console.log)
        );
        const sub_b = b$.subscribe(
            (x) => { console.log(x, 'success'); done(); },
            (e) => { console.log(e, 'error'); done(); },
            () => { console.log('complete'); sub_b.unsubscribe(); done(); }
        );

对于他们两个,我得到第一个值记录一次(在 filter/flatMap 之前),第二个值从水龙头记录两次,一次使用 "complete",第三个一次。

我认为不同之处在于发射 EMPTY 会导致 observable 完全关闭,但后续值仍然可以通过管道看到。

我对 Subject 做了同样的事情,唯一的区别是 Subject 没有发出 Complete,这是预期的。

如果从 flatMap 返回的 Observable 具有不同的调度程序,则可能会有所不同,但在您的示例中,可见行为应该相同。如果您依赖副作用,通常会发生这种情况,这通常是不鼓励的。

这是 asyncScheduler 更改行为时的示例(在第二个示例中创建后打印的值):

const { of, asyncScheduler, EMPTY, from } = rxjs; // = require("rxjs")
const { filter, flatMap } = rxjs.operators; // = require("rxjs/operators")

const items$ = from([1, 2, 3, 4, 5]);

console.log("------------ SYNC");
const sync$ = items$.pipe(
  filter(v => v % 2 === 0)
);
sync$.subscribe(e => console.log(e));
console.log("after sync");


console.log("------------ ASYNC");
items$.pipe(
  flatMap(v => v % 2 === 0 ? of(v, asyncScheduler) : EMPTY)
).subscribe(e => console.log(e));
console.log("after async");
<script src="https://unpkg.com/rxjs@6.5.2/bundles/rxjs.umd.min.js"></script>