Rxjs - 如何提取数组中的多个值并将它们同步反馈给可观察流

Rxjs - How can I extract multiple values inside an array and feed them back to the observable stream synchronously

我从事件流中创建了一个 Rx.Observable

Rx.Observable.fromEvent(recognizeStream, 'data')

其中每个数据事件如下所示:

{ error: null, alternatives: [result1, result2, result3] }

我想提取 alternatives 数组中的每个值并将它们合并到流中。我需要看什么 operators

据我所知,flatMapconcatMap 可以胜任这项工作,但我无法从他们的示例中得到启发。

有人可以解释我应该使用哪个运算符并提供示例吗?

你应该 pluck "alternatives" 然后迭代数组使用 from,由于 from 创建了一个新的可观察对象,您需要将其展平,因此需要使用 flatMap 来完成这项工作。所以最后你有 备选方案 回到流中。

试一试:

var data = {
  error: null,
  alternatives: [{
    result: 1
  }, {
    result: 2
  }, {
    result: 3
  }]
}

var input$ = Rx.Observable.of(data);
input$.pluck('alternatives').flatMap(alternatives => Rx.Observable.from(alternatives)).subscribe(alternative => console.log(alternative));
<script src="https://npmcdn.com/@reactivex/rxjs@5.0.0-beta.7/dist/global/Rx.umd.js"></script>

运算符flatMap()concatMap()都是不错的选择。您可以将 alternatives 属性 转换为另一个 Observable,然后将数组项合并到流中。

const Observable = Rx.Observable;

Observable.of({ error: null, alternatives: ['result1', 'result2', 'result3'] })
  .concatMap(val => {
    return Observable.from(val['alternatives']);
  })
  .subscribe(val => console.log(val));

这会打印到控制台:

result1
result2
result3

观看现场演示:https://jsbin.com/foqutab/2/edit?js,console

xxxMap() 运算符家族都处理 higher-order Observables。这意味着它们允许您在主 Observable 中创建 Observables 并将结果值内联到主流中。所以你可以将类型签名读作 Observable<Observable<T>> => Observable<T>

给定一个流,每个发射 x 是一个包含 4 个发射值的 Observable:

input:  --x----------x 
flatMap   a-a-a-a-|  b-b-b-b-|
result: --a-a-a-a----b-b-b-b-|

类型转换 xxxMap(myFnc) return 值

xxxMap() 运算符都使用 ObservablePromiseArray 类型的结果。根据您放入的内容,如果需要,它会转换为 Observable。

Rx.Observable.of('')
  .flatMap(() => [1,2,3,4])
  .subscribe(val => console.log('array value: ' + val));

Rx.Observable.of('')
  .flatMap(() => Promise.resolve(1))
  .subscribe(val => console.log('promise value: ' + val));

Rx.Observable.of('')
  .flatMap(() => Promise.resolve([1,2,3,4]))
  .subscribe(val => console.log('promise array value: ' + val));

Rx.Observable.of('')
  .flatMap(() => Rx.Observable.from([1,2,3,4]))
  .subscribe(val => console.log('Observable value: ' + val));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

在您的情况下,您可以轻松地 flatMap 对象和 return 数组:

Rx.Observable.of({ error: null, alternatives: ['result1', 'result2', 'result3'] })
  .flatMap(val => val.alternatives)
  .subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

所有xxxMap运算符的区别

mergeMap有什么作用

flatMap, better known as mergeMap will merge 进入主流时的所有排放量。

concatMap

concatMap 将在 concat 下一个流之前等待所有发射完成:

switchMap

但是 switchMap 将在新的发射可用时放弃 先前的流,并且切换 以从新流发射值: