RxJS 分组发出的事件 nodejs
RxJS grouping emitted events nodejs
我正在查询数据库并将结果作为逐行事件流检索 'db_row_receieved'。我正在尝试按公司 ID 对这些结果进行分组,但我在订阅时没有得到任何输出。
数据库行格式如下所示。
// row 1
{
companyId: 50,
value: 200
}
// row 2
{
companyId: 50,
value: 300
}
// row 3
{
companyId: 51,
value: 400
}
代码:
var source = Rx.Observable.fromEvent(eventEmitter, 'db_row_receieved');
var grouped = source.groupBy((x) => { return x.companyId; });
var selectMany = grouped.selectMany(x => x.reduce((acc, v) => {
return acc + v.value;
}, 0));
var subscription = selectMany.subscribe(function (obs) {
console.log("value: ", obs);
}
预期输出:
value: 500 // from the group with companyId 50
value: 400 // from the group with companyId 51
实际输出:
订阅不输出任何内容,但在使用 Rx.Observable.fromArray(someArray)
时有效
谁能告诉我哪里出错了?
所以问题是 reduce
仅当基础流 complete
d 时才会产生单个值。由于事件发射器是一种无限源,因此它始终处于活动状态。
看看下面的代码片段 - 第一个示例完成,另一个没有。
const data = [
{k: 'A', v: 1},
{k: 'B', v: 10},
{k: 'A', v: 1},
{k: 'B', v: 10},
{k: 'A', v: 1},
{k: 'B', v: 10},
{k: 'A', v: 1},
{k: 'A', v: 1},
{k: 'A', v: 1},
];
Rx.Observable.from(data)
.concatMap(d => Rx.Observable.of(d).delay(100))
.groupBy(d => d.k)
.mergeMap(group => group.reduce((acc, value) => {
acc.sum += value.v;
return acc;
}, {key: group.key, sum: 0}))
.do(d => console.log('RESULT', d.key, d.sum))
.subscribe();
Rx.Observable.from(data)
.concatMap(d => Rx.Observable.of(d).delay(100))
.merge(Rx.Observable.never()) // MERGIN NEVER IN
// .take(data.length) // UNCOMMENT TO MITIGATE NEVER
.groupBy(d => d.k)
.mergeMap(group => group.reduce((acc, value) => {
acc.sum += value.v;
return acc;
}, {key: group.key, sum: 0}))
.do(d => console.log('RESULT - NEVER - WILL NOT BE PRINTED', d))
.subscribe();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.10/Rx.umd.js"></script>
我不知道你的具体用例,但我最常想到的两个是:
- 使用
scan
(可能有去抖),
- 如果有指示底层流结束的事件,则使用
takeUntil
。
我正在查询数据库并将结果作为逐行事件流检索 'db_row_receieved'。我正在尝试按公司 ID 对这些结果进行分组,但我在订阅时没有得到任何输出。
数据库行格式如下所示。
// row 1
{
companyId: 50,
value: 200
}
// row 2
{
companyId: 50,
value: 300
}
// row 3
{
companyId: 51,
value: 400
}
代码:
var source = Rx.Observable.fromEvent(eventEmitter, 'db_row_receieved');
var grouped = source.groupBy((x) => { return x.companyId; });
var selectMany = grouped.selectMany(x => x.reduce((acc, v) => {
return acc + v.value;
}, 0));
var subscription = selectMany.subscribe(function (obs) {
console.log("value: ", obs);
}
预期输出:
value: 500 // from the group with companyId 50
value: 400 // from the group with companyId 51
实际输出: 订阅不输出任何内容,但在使用 Rx.Observable.fromArray(someArray)
时有效谁能告诉我哪里出错了?
所以问题是 reduce
仅当基础流 complete
d 时才会产生单个值。由于事件发射器是一种无限源,因此它始终处于活动状态。
看看下面的代码片段 - 第一个示例完成,另一个没有。
const data = [
{k: 'A', v: 1},
{k: 'B', v: 10},
{k: 'A', v: 1},
{k: 'B', v: 10},
{k: 'A', v: 1},
{k: 'B', v: 10},
{k: 'A', v: 1},
{k: 'A', v: 1},
{k: 'A', v: 1},
];
Rx.Observable.from(data)
.concatMap(d => Rx.Observable.of(d).delay(100))
.groupBy(d => d.k)
.mergeMap(group => group.reduce((acc, value) => {
acc.sum += value.v;
return acc;
}, {key: group.key, sum: 0}))
.do(d => console.log('RESULT', d.key, d.sum))
.subscribe();
Rx.Observable.from(data)
.concatMap(d => Rx.Observable.of(d).delay(100))
.merge(Rx.Observable.never()) // MERGIN NEVER IN
// .take(data.length) // UNCOMMENT TO MITIGATE NEVER
.groupBy(d => d.k)
.mergeMap(group => group.reduce((acc, value) => {
acc.sum += value.v;
return acc;
}, {key: group.key, sum: 0}))
.do(d => console.log('RESULT - NEVER - WILL NOT BE PRINTED', d))
.subscribe();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.10/Rx.umd.js"></script>
我不知道你的具体用例,但我最常想到的两个是:
- 使用
scan
(可能有去抖), - 如果有指示底层流结束的事件,则使用
takeUntil
。