如何 return 高阶可观察量中的外部可观察量而不是内部可观察量

How to return outer observable and not inner in a High-order observables

让我们用以下代码澄清问题:

    this.rates$ = this._glbRateService.getRates(params); // 1
    this.rates$.pipe(
        mergeMap(rates => {
            const priceByRates: Observable<any>[] = rates.map(rate => {
                const paramsRatingItemProduct = {
                    idItem: product.idItem,
                    idRate: rate.idRate
                };
                return this._glbRatingItemProduct.getPrice(paramsRatingItemProduct); // 2
            });
            return priceByRates;
        })
    ).subscribe(response => {
        console.log(response); // 3
    });

在该代码中:

  1. 我从服务器获取费率
  2. 对于每个费率,我都会得到价格(地图)
  3. 我的 console.log returns 来自内部订阅的值 (this._glbRatingItemProduct.getPr...)

我想要的是对映射值和内部订阅进行逻辑处理。

像这样:

this.rates$ = this._glbRateService.getRates(params);
this.rates$.pipe(
    mergeMap(rates => {
        const priceByRates: Observable<any>[] = rates.map(rate => {
            const paramsRatingItemProduct = {
                idItem: product.idItem,
                idRate: rate.idRate
            };
            return this._glbRatingItemProduct.getPrice(paramsRatingItemProduct);
            // WITH THE SUBSCRIPTION OF THIS RETURN I WANT TO MAKE LOGIC
            // WITH rates.map, and then return rates, NOT THE INNER SUBSCRIPTION

        });
        return priceByRates;
    })
).subscribe(response => {
    console.log(response);
});

你首先需要先用 maybe forkJoin 执行内部可观察数组 然后运行你的映射函数与数组

mergeMap(rates => {
    const priceByRates: Observable<any>[] = rates.map(rate => {
        const paramsRatingItemProduct = {
            idItem: product.idItem,
            idRate: rate.idRate
        };
        return this._glbRatingItemProduct.getPrice(paramsRatingItemProduct);

    });
    return forkJoin(...priceByRates).pipe((values)=>values.map....your logic ));
})

https://www.learnrxjs.io/learn-rxjs/operators/combination/forkjoin

有时将映射和展平 higher-order 可观察对象的逻辑分开是有帮助的。这里应该更清楚一点,map() returns 一组可观察值和 forkJoin() 将所有这些可观察值加入一个流。

this.rates$ = this._glbRateService.getRates(params);
this.rates$.pipe(
  map(rates => rates.map(
    rate => this._glbRatingItemProduct.getPrice({
      idItem: product.idItem,
      idRate: rate.idRate
    })
  ),
  mergeMap(priceByRates => forkJoin(priceByRates))
).subscribe(console.log);

另一方面,forkJoin() 仅在所有源可观察对象完成后才发出。如果您不需要将所有响应放在一起,则可以使用更简单的 merge() 保留源流 de-coupled。只有一行需要更改:

mergeMap(priceByRates => merge(...priceByRates))

要记住的是 mergeMap 期望返回单个流。它将数组转换为值流。所以 mergeMap(num => [10,9,8,7,num]) 不会将 num 映射到数组中,它会创建一个新流,一次一个地发出这些数字。

这就是为什么 mergeMap(_ => val : Observable[]) 一次只发出每个可观察值(作为高阶可观察值)。

有了这些知识,您实际上可以将流更改为合并,而无需使用上面的静态合并功能。可能看起来像这样:

this.rates$ = this._glbRateService.getRates(params);
this.rates$.pipe(
  mergeMap(rates => rates.map(
    rate => this._glbRatingItemProduct.getPrice({
      idItem: product.idItem,
      idRate: rate.idRate
    })
  ),
  mergeAll()
).subscribe(console.log);

mergeAll() 将在每个 higher-order 可观察对象到达时接收并订阅+合并它们的输出。