发布 observable 使其*热门*
publishing observable to make it *hot*
我正在尝试创建一个不需要订阅的热可观察对象。这是一个库,我想让用户能够调用某些方法并避免调用 subscribe() 来触发 observables。最初我有这个:
const q = new Queue();
q.add('foo bar baz').subscribe(); // <<< need to call subscribe
Queue.prototype.add = Queue.prototype.enqueue = function (lines) {
lines = _.flattenDeep([lines]);
var lockAcquired = false;
return this.init()
.flatMap(() => {
return acquireLock(this)
})
.flatMap(() => {
lockAcquired = true;
return appendFile(this, lines)
})
.flatMap(() => releaseLock(this))
.catch(err => {
if (lockAcquired) {
return releaseLock(this);
}
else {
return makeGenericObservable();
}
})
};
为了让 observable 变热,我想我可以这样做:
const q = new Queue();
q.add('foo bar baz'); // <<< don't call subscribe
Queue.prototype.add = Queue.prototype.enqueue = function (lines) {
lines = _.flattenDeep([lines]);
var lockAcquired = false;
return this.init()
.flatMap(() => {
return acquireLock(this)
})
.flatMap(() => {
lockAcquired = true;
return appendFile(this, lines)
})
.flatMap(() => releaseLock(this))
.catch(err => {
if (lockAcquired) {
return releaseLock(this);
}
else {
return makeGenericObservable();
}
})
.publish()
.share() // this too?
};
然而问题是当我调用 publish()
时,没有任何反应,而且 add 方法似乎从未被完全调用(我假设序列中的第一个可观察对象根本不会触发,因为有效订阅不是称为)。但是我虽然 publish()
会自动调用可观察链?
如何使添加方法返回的可观察对象 hot?
你误会了。热观察确实像冷观察一样需要订阅。不同之处在于,hot 使用一些外部制作人(如 dome element)并在订阅时开始收听。另一方面,cold observable 在订阅时在内部创建生产者。
这导致您可能会错过一些具有热可观察性的事件,因为外部生产者对订阅一无所知并独立发出。有了冷观察,你就不会错过任何东西,因为生产者是在订阅时创建的。
长话短说,您可以在热或冷 Observable 之上构建任何 Observable 链,但在您订阅它之前,什么都不会发生。
PS。无需将 publish
与 share
一起使用,因为后者对于 .publish().refCount()
.
是 alias
如果您不想使用 subscribe
,您可以在您的方法中手动 .connect()
您的流或 subscribe
它:
const q = new Queue();
q.add('foo bar baz');
Queue.prototype.add = Queue.prototype.enqueue = function (lines) {
lines = _.flattenDeep([lines]);
var lockAcquired = false;
let add$ = this.init()
.flatMap(() => {
return acquireLock(this)
})
.flatMap(() => {
lockAcquired = true;
return appendFile(this, lines)
})
.flatMap(() => releaseLock(this))
.catch(err => {
if (lockAcquired) {
return releaseLock(this);
}
else {
return makeGenericObservable();
}
})
.publish();
add$.connect();
return add$; // optional, depends if you even need the stream outside of the add-method
};
或者作为使用内部 subscribe
:
的替代方法
const q = new Queue();
q.add('foo bar baz'); // <<< don't call subscribe
Queue.prototype.add = Queue.prototype.enqueue = function (lines) {
lines = _.flattenDeep([lines]);
var lockAcquired = false;
let add$ = this.init()
...
.share();
add$.subscribe();
return add$;
};
我正在尝试创建一个不需要订阅的热可观察对象。这是一个库,我想让用户能够调用某些方法并避免调用 subscribe() 来触发 observables。最初我有这个:
const q = new Queue();
q.add('foo bar baz').subscribe(); // <<< need to call subscribe
Queue.prototype.add = Queue.prototype.enqueue = function (lines) {
lines = _.flattenDeep([lines]);
var lockAcquired = false;
return this.init()
.flatMap(() => {
return acquireLock(this)
})
.flatMap(() => {
lockAcquired = true;
return appendFile(this, lines)
})
.flatMap(() => releaseLock(this))
.catch(err => {
if (lockAcquired) {
return releaseLock(this);
}
else {
return makeGenericObservable();
}
})
};
为了让 observable 变热,我想我可以这样做:
const q = new Queue();
q.add('foo bar baz'); // <<< don't call subscribe
Queue.prototype.add = Queue.prototype.enqueue = function (lines) {
lines = _.flattenDeep([lines]);
var lockAcquired = false;
return this.init()
.flatMap(() => {
return acquireLock(this)
})
.flatMap(() => {
lockAcquired = true;
return appendFile(this, lines)
})
.flatMap(() => releaseLock(this))
.catch(err => {
if (lockAcquired) {
return releaseLock(this);
}
else {
return makeGenericObservable();
}
})
.publish()
.share() // this too?
};
然而问题是当我调用 publish()
时,没有任何反应,而且 add 方法似乎从未被完全调用(我假设序列中的第一个可观察对象根本不会触发,因为有效订阅不是称为)。但是我虽然 publish()
会自动调用可观察链?
如何使添加方法返回的可观察对象 hot?
你误会了。热观察确实像冷观察一样需要订阅。不同之处在于,hot 使用一些外部制作人(如 dome element)并在订阅时开始收听。另一方面,cold observable 在订阅时在内部创建生产者。
这导致您可能会错过一些具有热可观察性的事件,因为外部生产者对订阅一无所知并独立发出。有了冷观察,你就不会错过任何东西,因为生产者是在订阅时创建的。
长话短说,您可以在热或冷 Observable 之上构建任何 Observable 链,但在您订阅它之前,什么都不会发生。
PS。无需将 publish
与 share
一起使用,因为后者对于 .publish().refCount()
.
如果您不想使用 subscribe
,您可以在您的方法中手动 .connect()
您的流或 subscribe
它:
const q = new Queue();
q.add('foo bar baz');
Queue.prototype.add = Queue.prototype.enqueue = function (lines) {
lines = _.flattenDeep([lines]);
var lockAcquired = false;
let add$ = this.init()
.flatMap(() => {
return acquireLock(this)
})
.flatMap(() => {
lockAcquired = true;
return appendFile(this, lines)
})
.flatMap(() => releaseLock(this))
.catch(err => {
if (lockAcquired) {
return releaseLock(this);
}
else {
return makeGenericObservable();
}
})
.publish();
add$.connect();
return add$; // optional, depends if you even need the stream outside of the add-method
};
或者作为使用内部 subscribe
:
const q = new Queue();
q.add('foo bar baz'); // <<< don't call subscribe
Queue.prototype.add = Queue.prototype.enqueue = function (lines) {
lines = _.flattenDeep([lines]);
var lockAcquired = false;
let add$ = this.init()
...
.share();
add$.subscribe();
return add$;
};