使用运算符扩展 RxJS Observable class

Extend RxJS Observable class with operators

Observable class 如何通过应用内置的 RxJS 运算符对其进行扩展?

我想做这样的事情:

class TruthyObservable extends Observable {
  constructor(subscriber) {
    super(subscriber);

    return this.filter(x => x);
  }
}

class TruthyMappedObservable extends TruthyObservable {
  constructor(subscriber) {
    super(subscriber);

    return this.map(x => `'${x}'`);
  }
}

没有构造函数可以完成吗return?

我认为您可以通过自定义运算符获得所需的内容:

    Observable.prototype.truthy = function truthy() {
        return this.filter(x => x);
    }

这在很大程度上取决于你想做什么,但假设你想制作一个 TruthyObservable ,它的行为与默认 Observable.create(...) 非常相似,但只传递偶数:

import { Observable, Observer, Subscriber, Subject, Subscription } from 'rxjs';
import 'rxjs/add/operator/filter';

class TruthyObservable<T> extends Observable<T> {

    constructor(subscribe?: <R>(this: Observable<T>, subscriber: Subscriber<R>) => any) {
        if (subscribe) {
            let oldSubscribe = subscribe;
            subscribe = (obs: Subscriber<any>) => {
                obs = this.appendOperators(obs);
                return oldSubscribe.call(this, obs);
            };
        }

        super(subscribe);
    }

    private appendOperators(obs: Subscriber<any>) {
        let subject = new Subject();

        subject
            .filter((val: number) => val % 2 == 0)
            .subscribe(obs);

        return new Subscriber(subject);
    }

}

let o = new TruthyObservable<number>((obs: Observer<number>) => {
    obs.next(3);
    obs.next(6);
    obs.next(7);
    obs.next(8);
});

o.subscribe(val => console.log(val));

这将打印到控制台:

6
8

观看现场演示:https://jsbin.com/recuto/3/edit?js,console

通常 类 继承 Observable 覆盖实际上在内部进行订阅的 _subscribe() 方法,但在我们的例子中,我们想使用回调,我们可以自己发出值(因为这个 Observable 本身不发射任何东西)。如果方法 _subscribe() 存在,它会被 _subscribe 属性 掩盖,因此如果我们只是覆盖此方法,我们将无法向其添加任何运算符。这就是为什么我用另一个函数将 _subscribe 包装在构造函数中,然后通过 appendOperators() 方法中与 filter() 链接的 Subject 传递所有值。请注意,我在 obs = this.appendOperators(obs).

处用 Subject 替换了原来的 Observer

最后,当我打电话时。 obs.next(3); 我实际上是将值推送到 Subject 过滤它们并将它们传递给原始 Observer