主题。消息侦听器示例

Subjects. Messages listeners example

我需要创建一个消息序列。

现在我完成了:

var math = require('mathjs')
var Rx = require('rx')
var _ = require('lodash')

var messagesSubject = new Rx.Subject()
var messagesPool = messagesSubject.map(function() { return [el]}).scan([], _.union)


Rx.Observable
  .interval(500 /* ms */)
  .timeInterval()

  .filter(
    function() { return math.randomInt(10) > 8;}
  )
  .do(function(x) {
    messagesSubject.subscribe(function(msg) {
      console.log('subscriber ' + x.value + ' do something with ' + msg.text)
    })
  }).subscribe()

Rx.Observable
  .interval(500 /* ms */)
  .timeInterval()
  .filter(
    function() { return math.randomInt(10) > 2;}
  )
  .map(function() {
    return { text: math.pickRandom(['one', 'two', 'three'])}
  }).subscribe(messagesSubject)

我怎样才能将所有以前的消息通知每个新订阅者 (messagesPool)?

侧面questions:Is 主题的有效用例?或者我应该选择其他类型的科目?

听起来您正在寻找 ReplaySubject 而不是 Subject

[ReplaySubject is a] Subject that buffers all items it observes and replays them to any Observer that subscribes.

正如其他人指出的那样,ReplaySubject 可以成为您的朋友。

这可能意味着您可以删除消息池功能。

如果您只是撰写查询,也可以完全摆脱主题:

var math = require('mathjs')
var Rx = require('rx')
var _ = require('lodash')

var messages = Rx.Observable
  .interval(500 /* ms */)
  .timeInterval()
  .filter(
    function() { return math.randomInt(10) > 2;}
  )
  .map(function() {
    return { text: math.pickRandom(['one', 'two', 'three'])}
  })
  .replay();

//Randomly add subscribers (but this would only be dummy code, not suitable for prod)
var randomSubsriberAdder = Rx.Observable
  .interval(500 /* ms */)
  .timeInterval()
  .filter(
    function() { return math.randomInt(10) > 8;}
  )
  .subscribe(function(x) {
    messages.subscribe(function(msg) {
      console.log('subscriber ' + x.value + ' do something with ' + msg.text);

var connection = messages.Connect();
//messages will now be collecting all values.
//  Late subscribers will get all previous values.
//  As new values are published, existing subscribers will get the new value.

您最好使用硬编码数据集和 Rx 测试 tools/libs。 这样您就可以控制要测试的边缘情况(早期订阅者、晚期订阅者、断开订阅者、流中的静默等)

不使用主题的代码示例,使用重播语义和瞬态订阅者进行单元测试。在具有 node-unit

的节点上运行

(windows 命令)

npm install rx
npm install node-unit
.\node_modules\.bin\nodeunit.cmd tests

test目录中的代码。

var Rx = require('rx')

var onNext = Rx.ReactiveTest.onNext,
    onError = Rx.ReactiveTest.onError,
    onCompleted = Rx.ReactiveTest.onCompleted,
    subscribe = Rx.ReactiveTest.subscribe;

exports.testingReplayWithTransientSubscribers = function(test){
    //Declare that we expect to have 3 asserts enforced.
    test.expect(3);

    //Control time with a test scheduler
    var scheduler = new Rx.TestScheduler();
    //Create our known message that will be published at known times (all times in milliseconds).
    var messages = scheduler.createColdObservable(
        onNext(0500, 'one'),
        onNext(1000, 'two'),
        onNext(2000, 'three'),
        onNext(3500, 'four'),
        onNext(4000, 'five')
    );

    //Replay all messages, and connect the reply decorator.
    var replay = messages.replay();
    var connection = replay.connect();

    //Create 3 observers to subscribe/unsubscribe at various times.
    var observerA = scheduler.createObserver();
    var observerB = scheduler.createObserver();
    var observerC = scheduler.createObserver();

    //Subscribe immediately
    var subA = replay.subscribe(observerA);
    //Subscribe late, missing 1 message
    var subB = Rx.Disposable.empty;
    scheduler.scheduleAbsolute(null, 0800, function(){subB = replay.subscribe(observerB);});
    //Subscribe late, and dispose before any live message happen
    var subC = Rx.Disposable.empty;
    scheduler.scheduleAbsolute(null, 1100, function(){subC = replay.subscribe(observerC);});
    scheduler.scheduleAbsolute(null, 1200, function(){subC.dispose();});
    //Dispose early 
    scheduler.scheduleAbsolute(null, 3000, function(){subB.dispose();});


    //Start virutal time. Run through all the scheduled work (publishing messages, subscribing and unsubscribing)
    scheduler.start();

    //Assert our assumptions.
    test.deepEqual(observerA.messages, [
            onNext(0500, 'one'),
            onNext(1000, 'two'),
            onNext(2000, 'three'),
            onNext(3500, 'four'),
            onNext(4000, 'five')
        ], 
        "ObserverA should receive all values");
    test.deepEqual(observerB.messages, [
            onNext(0800, 'one'),
            onNext(1000, 'two'),
            onNext(2000, 'three'),
        ], 
        "ObserverB should receive initial value on subscription, and then two live values");
    test.deepEqual(observerC.messages, [
            onNext(1100, 'one'),
            onNext(1100, 'two'),
        ], 
        "ObserverC should only receive initial values on subscription");
    test.done();
};