RxJS 分组发出的事件 nodejs

RxJS grouping emitted events nodejs

我正在查询数据库并将结果作为逐行事件流检索 'db_row_receieved'。我正在尝试按公司 ID 对这些结果进行分组,但我在订阅时没有得到任何输出。

数据库行格式如下所示。

 // row 1
    {
        companyId: 50,
        value: 200
    }
    // row 2   
    {
        companyId: 50,
        value: 300
    }
    // row 3 
    {
        companyId: 51,
        value: 400
    }

代码:

var source = Rx.Observable.fromEvent(eventEmitter, 'db_row_receieved');
var grouped = source.groupBy((x) => { return x.companyId; });
var selectMany = grouped.selectMany(x => x.reduce((acc, v) => {
                             return acc + v.value;
                          }, 0));

var subscription = selectMany.subscribe(function (obs) {
                        console.log("value: ", obs);
                   }

预期输出:

value: 500    // from the group with companyId 50
value: 400    // from the group with companyId 51

实际输出: 订阅不输出任何内容,但在使用 Rx.Observable.fromArray(someArray)

时有效

谁能告诉我哪里出错了?

所以问题是 reduce 仅当基础流 completed 时才会产生单个值。由于事件发射器是一种无限源,因此它始终处于活动状态。

看看下面的代码片段 - 第一个示例完成,另一个没有。

const data = [
  {k: 'A', v: 1},
  {k: 'B', v: 10},
  {k: 'A', v: 1},
  {k: 'B', v: 10},
  {k: 'A', v: 1},
  {k: 'B', v: 10},
  {k: 'A', v: 1},
  {k: 'A', v: 1},
  {k: 'A', v: 1},
];

Rx.Observable.from(data)
  .concatMap(d => Rx.Observable.of(d).delay(100))
  .groupBy(d => d.k)
  .mergeMap(group => group.reduce((acc, value) => {
    acc.sum += value.v;
    return acc;
  }, {key: group.key, sum: 0}))
  .do(d => console.log('RESULT', d.key, d.sum))
  .subscribe();
  
Rx.Observable.from(data)
  .concatMap(d => Rx.Observable.of(d).delay(100))
  .merge(Rx.Observable.never()) // MERGIN NEVER IN
  // .take(data.length) // UNCOMMENT TO MITIGATE NEVER
  .groupBy(d => d.k)
  .mergeMap(group => group.reduce((acc, value) => {
    acc.sum += value.v;
    return acc;
  }, {key: group.key, sum: 0}))
  .do(d => console.log('RESULT - NEVER - WILL NOT BE PRINTED', d))
  .subscribe();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.10/Rx.umd.js"></script>

我不知道你的具体用例,但我最常想到的两个是:

  • 使用scan(可能有去抖),
  • 如果有指示底层流结束的事件,则使用takeUntil