取消订阅 RxJS Observables
Unsubscribe from RxJS Observables
我有这两个对象,我想停止监听他们的事件。我对 observables 和 RxJS 完全陌生,只是想使用 Inquirer 库。
这里是 RxJS API 供参考:
http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html
如何取消订阅这些类型的可观察对象?
ConnectableObservable:
ConnectableObservable {
source: EventPatternObservable { _add: [Function], _del: [Function], _fn: undefined },
_connection: ConnectDisposable { _p: [Circular], _s: [Object] },
_source: AnonymousObservable { source: [Object], __subscribe: [Function: subscribe] },
_subject:
Subject {
isDisposed: false,
isStopped: false,
observers: [Object],
hasError: false } },
_count: 1,
_connectableSubscription:
ConnectDisposable {
_p:
ConnectableObservable {
source: [Object],
_connection: [Circular],
_source: [Object],
_subject: [Object] },
_s: AutoDetachObserver { isStopped: false, observer: [Object], m: [Object] } } }
FilterObservable:
FilterObservable {
source:
RefCountObservable {
source:
ConnectableObservable {
source: [Object],
_connection: [Object],
_source: [Object],
_subject: [Object] },
_count: 1,
_connectableSubscription: ConnectDisposable { _p: [Object], _s: [Object] } },
predicate: [Function] }
我需要取消订阅这些对象:
'use strict';
var rx = require('rx');
function normalizeKeypressEvents(value, key) {
return {value: value, key: key || {}};
}
module.exports = function (rl) {
var keypress = rx.Observable.fromEvent(rl.input, 'keypress', normalizeKeypressEvents)
.filter(function (e) {
// Ignore `enter` key. On the readline, we only care about the `line` event.
return e.key.name !== 'enter' && e.key.name !== 'return';
});
return {
line: rx.Observable.fromEvent(rl, 'line'),
keypress: keypress,
normalizedLeftKey: keypress.filter(function (e) {
return e.key.name === 'left';
}).share(),
normalizedRightKey: keypress.filter(function (e) {
return e.key.name === 'right';
}).share(),
normalizedUpKey: keypress.filter(function (e) {
return e.key.name === 'up' || e.key.name === 'k' || (e.key.name === 'p' && e.key.ctrl);
}).share(),
normalizedDownKey: keypress.filter(function (e) {
return e.key.name === 'down' || e.key.name === 'j' || (e.key.name === 'n' && e.key.ctrl);
}).share(),
numberKey: keypress.filter(function (e) {
return e.value && '123456789'.indexOf(e.value) >= 0;
}).map(function (e) {
return Number(e.value);
}).share(),
spaceKey: keypress.filter(function (e) {
return e.key && e.key.name === 'space';
}).share(),
aKey: keypress.filter(function (e) {
return e.key && e.key.name === 'a';
}).share(),
iKey: keypress.filter(function (e) {
return e.key && e.key.name === 'i';
}).share()
};
};
我目前最好的猜测是没有像这样的显式订阅调用:
var source = Rx.Observable.fromEvent(input, 'click');
var subscription = source.subscribe(
function (x) {
console.log('Next: Clicked!');
},
function (err) {
console.log('Error: %s', err);
},
function () {
console.log('Completed');
});
但是,有这些调用:
events.normalizedUpKey.takeUntil(validation.success).forEach(this.onUpKey.bind(this));
events.normalizedDownKey.takeUntil(validation.success).forEach(this.onDownKey.bind(this));
所以我最好的猜测是我需要一种方法来 nullify/cancel takeUntil 调用。
如果您想取消订阅,您需要有 Subscription
对象。这是每个 Observable.subscribe()
调用返回的对象。例如:
let subscriber = Observable.subscribe(...);
...
subscriber.unsubscribe();
有关详细信息,请参阅:https://github.com/ReactiveX/rxjs/blob/master/doc/subscription.md
我支持第一位评论者所说的。
但是感觉代码中某处需要这样调用:
let subscription = normalizedUpKey.subscribe( data => console.log('data') );
你可以做到
subscription.unsubscribe()
上。您还怎么知道什么时候发生了什么,或者那是第 3 方库的一部分?
阅读更多关于 Rxjs 的信息。我建议你看看这本免费的书,https://www.gitbook.com/book/chrisnoring/rxjs-5-ultimate/details
没有必要也没有取消订阅 observable 的协议。实际上,我看到了你问题中的代码,尤其是 return 对象的部分,其中包含了一堆由 share 组成的可观察对象。但是,这些可观察对象仍然是可观察对象,而不是订阅对象,这意味着这些元素没有unsubscribing
这样的概念。
因此,如果您在模块外有一些新的 subscribe-like 代码并且与事件可观察对象一起工作,您显然可以取消订阅特定的订阅实例。
目前题中的代码,source observable上使用的方法都是operator,比如.filter()
.share()
.takeUntil()
,而不是订阅执行,即实际上方法 return new observables.
如Rxjs官方文档所述,虽然.share()
创建了multicasted observables,
如果使用一些方便的运算符时订阅者的数量从 1
减少到 0
,执行仍然可能会停止,其中代码中的 .share()
也被准确包含在内。
总而言之,您的问题中的代码无需担心取消订阅事件。潜在地,确实存在一个听起来像您问题中描述的问题的进一步问题:如果您使用 .connect()
而不是 .share()
。需要注意的是ConnectableObservable
的情况,手动取消事件绑定。
我有这两个对象,我想停止监听他们的事件。我对 observables 和 RxJS 完全陌生,只是想使用 Inquirer 库。
这里是 RxJS API 供参考: http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html
如何取消订阅这些类型的可观察对象?
ConnectableObservable:
ConnectableObservable {
source: EventPatternObservable { _add: [Function], _del: [Function], _fn: undefined },
_connection: ConnectDisposable { _p: [Circular], _s: [Object] },
_source: AnonymousObservable { source: [Object], __subscribe: [Function: subscribe] },
_subject:
Subject {
isDisposed: false,
isStopped: false,
observers: [Object],
hasError: false } },
_count: 1,
_connectableSubscription:
ConnectDisposable {
_p:
ConnectableObservable {
source: [Object],
_connection: [Circular],
_source: [Object],
_subject: [Object] },
_s: AutoDetachObserver { isStopped: false, observer: [Object], m: [Object] } } }
FilterObservable:
FilterObservable {
source:
RefCountObservable {
source:
ConnectableObservable {
source: [Object],
_connection: [Object],
_source: [Object],
_subject: [Object] },
_count: 1,
_connectableSubscription: ConnectDisposable { _p: [Object], _s: [Object] } },
predicate: [Function] }
我需要取消订阅这些对象:
'use strict';
var rx = require('rx');
function normalizeKeypressEvents(value, key) {
return {value: value, key: key || {}};
}
module.exports = function (rl) {
var keypress = rx.Observable.fromEvent(rl.input, 'keypress', normalizeKeypressEvents)
.filter(function (e) {
// Ignore `enter` key. On the readline, we only care about the `line` event.
return e.key.name !== 'enter' && e.key.name !== 'return';
});
return {
line: rx.Observable.fromEvent(rl, 'line'),
keypress: keypress,
normalizedLeftKey: keypress.filter(function (e) {
return e.key.name === 'left';
}).share(),
normalizedRightKey: keypress.filter(function (e) {
return e.key.name === 'right';
}).share(),
normalizedUpKey: keypress.filter(function (e) {
return e.key.name === 'up' || e.key.name === 'k' || (e.key.name === 'p' && e.key.ctrl);
}).share(),
normalizedDownKey: keypress.filter(function (e) {
return e.key.name === 'down' || e.key.name === 'j' || (e.key.name === 'n' && e.key.ctrl);
}).share(),
numberKey: keypress.filter(function (e) {
return e.value && '123456789'.indexOf(e.value) >= 0;
}).map(function (e) {
return Number(e.value);
}).share(),
spaceKey: keypress.filter(function (e) {
return e.key && e.key.name === 'space';
}).share(),
aKey: keypress.filter(function (e) {
return e.key && e.key.name === 'a';
}).share(),
iKey: keypress.filter(function (e) {
return e.key && e.key.name === 'i';
}).share()
};
};
我目前最好的猜测是没有像这样的显式订阅调用:
var source = Rx.Observable.fromEvent(input, 'click');
var subscription = source.subscribe(
function (x) {
console.log('Next: Clicked!');
},
function (err) {
console.log('Error: %s', err);
},
function () {
console.log('Completed');
});
但是,有这些调用:
events.normalizedUpKey.takeUntil(validation.success).forEach(this.onUpKey.bind(this));
events.normalizedDownKey.takeUntil(validation.success).forEach(this.onDownKey.bind(this));
所以我最好的猜测是我需要一种方法来 nullify/cancel takeUntil 调用。
如果您想取消订阅,您需要有 Subscription
对象。这是每个 Observable.subscribe()
调用返回的对象。例如:
let subscriber = Observable.subscribe(...);
...
subscriber.unsubscribe();
有关详细信息,请参阅:https://github.com/ReactiveX/rxjs/blob/master/doc/subscription.md
我支持第一位评论者所说的。 但是感觉代码中某处需要这样调用:
let subscription = normalizedUpKey.subscribe( data => console.log('data') );
你可以做到
subscription.unsubscribe()
上。您还怎么知道什么时候发生了什么,或者那是第 3 方库的一部分?
阅读更多关于 Rxjs 的信息。我建议你看看这本免费的书,https://www.gitbook.com/book/chrisnoring/rxjs-5-ultimate/details
没有必要也没有取消订阅 observable 的协议。实际上,我看到了你问题中的代码,尤其是 return 对象的部分,其中包含了一堆由 share 组成的可观察对象。但是,这些可观察对象仍然是可观察对象,而不是订阅对象,这意味着这些元素没有unsubscribing
这样的概念。
因此,如果您在模块外有一些新的 subscribe-like 代码并且与事件可观察对象一起工作,您显然可以取消订阅特定的订阅实例。
目前题中的代码,source observable上使用的方法都是operator,比如.filter()
.share()
.takeUntil()
,而不是订阅执行,即实际上方法 return new observables.
如Rxjs官方文档所述,虽然.share()
创建了multicasted observables,
如果使用一些方便的运算符时订阅者的数量从 1
减少到 0
,执行仍然可能会停止,其中代码中的 .share()
也被准确包含在内。
总而言之,您的问题中的代码无需担心取消订阅事件。潜在地,确实存在一个听起来像您问题中描述的问题的进一步问题:如果您使用 .connect()
而不是 .share()
。需要注意的是ConnectableObservable
的情况,手动取消事件绑定。