如何将 Observable 数组与 RxJS 6.x 和 Node.js 合并?
How do I merge an array of Observables with RxJS 6.x and Node.js?
出于学习目的,我正在创建一个 Node 应用程序,它需要从数组中获取 x 个 RxJS 可观察对象并将其组合成一个事件流。我想知道事件何时以任何顺序(不是以任何顺序或完全完成)在任何可观察对象中发生。我觉得它应该在一个单一的合并事件流中。基本上,来自任何可观察对象的第一个事件都会完成。
为此,我觉得 merge() 可以解决问题。由于合并不直接将数组作为参数,所以我目前使用 reduce 来帮助合并。
然而,最终的结果不是一个可观察的,而是一个函数。我也无法订阅它。代码的简化版本如下所示。
我怎样才能将这个 Node 10.14.2,RxJS 6.4.x 代码更改为 return 可观察的而不是我可以将 .subscribe() 添加到的“[函数]” ?
const { Observable } = require('rxjs');
const { merge } = require('rxjs/operators');
const observables = [
Observable.create(observer => observer.next('Hello')),
Observable.create(observer => observer.next('Hello')),
Observable.create(observer => observer.next('Hello'))
];
const mergedObservables = observables.reduce((merged, observable) => {
console.log(observable);
return merge(merged, observable);
});
// outputs:
// Observable { _isScalar: false, _subscribe: [Function] }
// Observable { _isScalar: false, _subscribe: [Function] }
console.log(mergedObservables);
// outputs:
// [Function]
mergedObservables.subscribe();
// error:
// TypeError: mergedObservables.subscribe is not a function
编辑:您正在导入 merge
operator 而不是静态 merge
函数。前者对来自源可观察对象的事件进行操作,而后者则从一个或多个源可观察对象创建新的可观察对象。虽然考虑下面的扩展语法可以简化您的代码,但这不是您的真正问题。
似乎 Node.js >= 5.0 支持函数调用中数组的扩展运算符(不过您没有指定使用的 Node.js 的哪个版本)。如果您使用的是现代版本的 Node.js:
,则以下内容应该有效
const { Observable, merge } = require('rxjs')
const observables = [
Observable.create(observer => observer.next('Hello')),
Observable.create(observer => observer.next('Hello')),
Observable.create(observer => observer.next('Hello'))
]
const mergedObservables = merge(...observables)
mergedObservables.subscribe(event => { console.log(event) })
出于学习目的,我正在创建一个 Node 应用程序,它需要从数组中获取 x 个 RxJS 可观察对象并将其组合成一个事件流。我想知道事件何时以任何顺序(不是以任何顺序或完全完成)在任何可观察对象中发生。我觉得它应该在一个单一的合并事件流中。基本上,来自任何可观察对象的第一个事件都会完成。
为此,我觉得 merge() 可以解决问题。由于合并不直接将数组作为参数,所以我目前使用 reduce 来帮助合并。
然而,最终的结果不是一个可观察的,而是一个函数。我也无法订阅它。代码的简化版本如下所示。
我怎样才能将这个 Node 10.14.2,RxJS 6.4.x 代码更改为 return 可观察的而不是我可以将 .subscribe() 添加到的“[函数]” ?
const { Observable } = require('rxjs');
const { merge } = require('rxjs/operators');
const observables = [
Observable.create(observer => observer.next('Hello')),
Observable.create(observer => observer.next('Hello')),
Observable.create(observer => observer.next('Hello'))
];
const mergedObservables = observables.reduce((merged, observable) => {
console.log(observable);
return merge(merged, observable);
});
// outputs:
// Observable { _isScalar: false, _subscribe: [Function] }
// Observable { _isScalar: false, _subscribe: [Function] }
console.log(mergedObservables);
// outputs:
// [Function]
mergedObservables.subscribe();
// error:
// TypeError: mergedObservables.subscribe is not a function
编辑:您正在导入 merge
operator 而不是静态 merge
函数。前者对来自源可观察对象的事件进行操作,而后者则从一个或多个源可观察对象创建新的可观察对象。虽然考虑下面的扩展语法可以简化您的代码,但这不是您的真正问题。
似乎 Node.js >= 5.0 支持函数调用中数组的扩展运算符(不过您没有指定使用的 Node.js 的哪个版本)。如果您使用的是现代版本的 Node.js:
,则以下内容应该有效const { Observable, merge } = require('rxjs')
const observables = [
Observable.create(observer => observer.next('Hello')),
Observable.create(observer => observer.next('Hello')),
Observable.create(observer => observer.next('Hello'))
]
const mergedObservables = merge(...observables)
mergedObservables.subscribe(event => { console.log(event) })