发布 observable 使其*热门*

publishing observable to make it *hot*

我正在尝试创建一个不需要订阅的热可观察对象。这是一个库,我想让用户能够调用某些方法并避免调用 subscribe() 来触发 observables。最初我有这个:

const q = new Queue();

q.add('foo bar baz').subscribe();   // <<< need to call subscribe


Queue.prototype.add = Queue.prototype.enqueue = function (lines) {

    lines = _.flattenDeep([lines]);

    var lockAcquired = false;

    return this.init()
        .flatMap(() => {
            return acquireLock(this)
        })
        .flatMap(() => {
            lockAcquired = true;
            return appendFile(this, lines)
        })
        .flatMap(() => releaseLock(this))
        .catch(err => {
            if (lockAcquired) {
                return releaseLock(this);
            }
            else {
                return makeGenericObservable();
            }
        })


};

为了让 observable 变热,我想我可以这样做:

const q = new Queue();

q.add('foo bar baz');  // <<< don't call subscribe

Queue.prototype.add = Queue.prototype.enqueue = function (lines) {

    lines = _.flattenDeep([lines]);

    var lockAcquired = false;

    return this.init()
        .flatMap(() => {
            return acquireLock(this)
        })
        .flatMap(() => {
            lockAcquired = true;
            return appendFile(this, lines)
        })
        .flatMap(() => releaseLock(this))
        .catch(err => {
            if (lockAcquired) {
                return releaseLock(this);
            }
            else {
                return makeGenericObservable();
            }
        })
        .publish()
        .share()  // this too?

};

然而问题是当我调用 publish() 时,没有任何反应,而且 add 方法似乎从未被完全调用(我假设序列中的第一个可观察对象根本不会触发,因为有效订阅不是称为)。但是我虽然 publish() 会自动调用可观察链?

如何使添加方法返回的可观察对象 hot?

你误会了。热观察确实像冷观察一样需要订阅。不同之处在于,hot 使用一些外部制作人(如 dome element)并在订阅时开始收听。另一方面,cold observable 在订阅时在内部创建生产者。

这导致您可能会错过一些具有热可观察性的事件,因为外部生产者对订阅一无所知并独立发出。有了冷观察,你就不会错过任何东西,因为生产者是在订阅时创建的。

长话短说,您可以在热或冷 Observable 之上构建任何 Observable 链,但在您订阅它之前,什么都不会发生。

PS。无需将 publishshare 一起使用,因为后者对于 .publish().refCount().

alias

如果您不想使用 subscribe,您可以在您的方法中手动 .connect() 您的流或 subscribe 它:

const q = new Queue();
q.add('foo bar baz');

Queue.prototype.add = Queue.prototype.enqueue = function (lines) {
    lines = _.flattenDeep([lines]);
    var lockAcquired = false;

    let add$ = this.init()
        .flatMap(() => {
            return acquireLock(this)
        })
        .flatMap(() => {
            lockAcquired = true;
            return appendFile(this, lines)
        })
        .flatMap(() => releaseLock(this))
        .catch(err => {
            if (lockAcquired) {
                return releaseLock(this);
            }
            else {
                return makeGenericObservable();
            }
        })
        .publish();

     add$.connect();
     return add$;  // optional, depends if you even need the stream outside of the add-method
};

或者作为使用内部 subscribe:

的替代方法
const q = new Queue();
q.add('foo bar baz');  // <<< don't call subscribe

Queue.prototype.add = Queue.prototype.enqueue = function (lines) {
    lines = _.flattenDeep([lines]);
    var lockAcquired = false;
    let add$ = this.init()
        ...
        .share();

     add$.subscribe();
     return add$;
};