如何连接嵌套在另一个 Observable 中的多个 Observable 集合
How to concat several collections of Observables nested in another Observable
我是 RxJs 的新手,我正在为一个 Angular 应用程序的复杂场景而苦苦挣扎,我在其他可观察对象中嵌套了可观察对象(下文会简化)。
简而言之,我有:
- 一个 Observable returns 一个包含对象集合的流 SearchActivity =>
Observable<SearchActivity[]>
- 每个 SearchActivity 对象都有一个 Observable 属性 'results$' =>
BehaviorSubject<SearchActivityResult[]>
我尝试将所有活动中的所有结果提取到一个 Observable 中。
所以,从下面的例子(这不是代码,而是一种数据结构的思想):
Observable<SearchActivity[]> => [
{id: 1, results$: Observable<SearchActivityResult[]> => [1, 2]},
{id: 2, results$: Observable<SearchActivityResult[]> => [3, 4, 5]}
{id: 3, results$: Observable<SearchActivityResult[]> => [6]}
]
我想提取结果得到:
Observable<SearchActivityResult[]> => [1, 2, 3, 4, 5, 6] (Edit: the order is not important)
如何使用 RxJs 实现这一点?
到目前为止我得到了类似的东西:
// This is not working properly
this.allResults$ = this.displayedActivities$.pipe(
mergeMap(activities => {
return concat(...activities?.map(activity => activity.results$ || of([])));
})
);
但它似乎没有按预期工作。
我准备了一份StackBiltz with a more complex scenario here
更新2021-02-23
我创建了一个新的 StackBlitz,您可以在其中通过单击与结果进行交互。目标是获取每个数据库的所有结果的摘要(页面底部的 table)并显示所有搜索条件下 positive/negative 结果的数量。
TL;DR => 解决方案
我刚刚创建了一个新的StackBlitz,复杂度较低,以便隔离问题。我已经包含了 Mrk Sef
提出的建议并且它正在工作。
我还有其他一些关于平面运算符的错误和对 json 的奇怪循环引用,但这更可能是由 StackBlitz 模拟器引起的。
我认为您的想法是正确的,但您需要最后一步来将您的流减少为您的价值。
您写的内容:
this.allResults$ = this.displayedActivities$.pipe(
mergeMap(activities =>
concat(...activities?.map(
activity => activity?.results$ || of([])
))
)
);
activities
中的每个条目都有一个排放,但您不想要一系列排放,您想要一个数组 merging/flattening 结果。
您可以尝试类似的方法:
this.allResults$ = this.displayedActivities$.pipe(
mergeMap(activities =>
merge(...activities?.map(
activity => activity?.results$ || of([])
))
),
toArray(),
map(res => res.flat())
);
或
this.allResults$ = this.displayedActivities$.pipe(
mergeMap(activities =>
merge(...activities?.map(
activity => activity?.results$ || of([])
))
),
reduce((acc, curr) => [...acc,...curr], [])
);
更新:
你可以运行这个最小的例子:
const displayedActivities$ = of([
{results$: of([1,2,3])},
{results$: of([10,20,30])},
{something: "else"},
{results$: of([100,200,300])}
]);
const allResults$ = displayedActivities$.pipe(
mergeMap(activities =>
merge(...activities.filter(
activity => activity?.results$ != null
).map(
activity => activity.results$
))
),
reduce((acc, curr) => [...acc,...curr], [])
).subscribe(console.log);
输出:
[1, 2, 3, 10, 20, 30, 100, 200, 300]
更新#2
这就是您处理将 属性 result$ observables 更改为 long-lived observables 的方式。和上面的toArray()
、Array.flat()
差不多,只不过现在用的是combineLatest
这是应该 运行 的代码。它将保留每个可观察到的最新发射作为最终组合输出的一部分:
模拟设置
创建 displayedActivities$
发射对象数组,每个对象都有一个 属性 result$
在完成之前发射一定次数(在本例中为 3 次)。
/****
* Pipeable Operator:
* Takes arrays emitted by the source and spaces out their
* values by the given interval time in milliseconds
****/
function intervalArray<T>(intervalTime = 1000): OperatorFunction<T[], T> {
return pipe(
concatMap((v: T[]) =>
concat(
...v.map((value: T) =>
EMPTY.pipe(
delay(intervalTime),
startWith(value)
)
)
)
)
);
}
const displayedActivities$ = of([
{ results$:
of([
[1,2,3],
[4,5,6],
[7,8,9]
]).pipe(intervalArray(250))
},
{ results$:
of([
[10,20,30],
[40,50,60],
[70,80,90]
]).pipe(intervalArray(300))
},
{ something: "else" },
{ results$:
of([
[100,200,300],
[400,500,600],
[700,800,900]
]).pipe(intervalArray(350))
}
]);
合并所有结果$
const allResults$ = displayedActivities$.pipe(
mergeMap(activities =>
combineLatest(...activities.filter(
activity => activity?.results$ != null
).map(
activity => activity.results$.pipe(startWith([]))
))
),
map(latest => latest.flat())
).subscribe(console.log);
输出:
[1,2,3]
[1,2,3,10,20,30]
[1,2,3,10,20,30,100,200,300]
[4,5,6,10,20,30,100,200,300]
[4,5,6,40,50,60,100,200,300]
[4,5,6,40,50,60,400,500,600]
[7,8,9,40,50,60,400,500,600]
[7,8,9,70,80,90,400,500,600]
[7,8,9,70,80,90,700,800,900]
您可以尝试使用 Array#map
映射数组中的可观察对象,并通过 forkJoin
传递它以获得并行请求。可以使用 switchMap
.
等高阶映射运算符从源可观察对象映射
尝试以下方法
Observable<SearchActivity[]>.pipe(
switchMap((data: any) =>
forkJoin(data.map(item =>
item.results$
))
)
).subscribe({
next: value => {
console.log(value);
},
error: error => {
// handle error
}
});
注:
Observable<SearchActivity[]>
这里是占位符。将其替换为实际的源可观察变量。
forkJoin
仅当所有源可观察对象完成时才会发出。如果您希望数据流替换为 combineLatest
或 zip
.
这样的东西可能适合你:
this.allResults$ = this.displayedActivities$.pipe(
switchMap(activities =>
merge(...activities.map(a => a.results$))
),
mergeMap(resultsArray => from(resultsArray)),
scan((acc, cur) => ([...acc, cur]), [])
);
流程如下:
scan
中的函数可能过于简单,因为它只是将所有发射组合到一个数组中,而没有考虑表示相同结果的多个发射。我看到你在返回的 SearchActivityResult
项目上有一个 id
,所以也许可以在考虑到这一点的扫描内部构建一个更复杂的处理函数。
但希望这能让您入门。
这是一个有效的 StackBlitz
我是 RxJs 的新手,我正在为一个 Angular 应用程序的复杂场景而苦苦挣扎,我在其他可观察对象中嵌套了可观察对象(下文会简化)。
简而言之,我有:
- 一个 Observable returns 一个包含对象集合的流 SearchActivity =>
Observable<SearchActivity[]>
- 每个 SearchActivity 对象都有一个 Observable 属性 'results$' =>
BehaviorSubject<SearchActivityResult[]>
我尝试将所有活动中的所有结果提取到一个 Observable 中。 所以,从下面的例子(这不是代码,而是一种数据结构的思想):
Observable<SearchActivity[]> => [
{id: 1, results$: Observable<SearchActivityResult[]> => [1, 2]},
{id: 2, results$: Observable<SearchActivityResult[]> => [3, 4, 5]}
{id: 3, results$: Observable<SearchActivityResult[]> => [6]}
]
我想提取结果得到:
Observable<SearchActivityResult[]> => [1, 2, 3, 4, 5, 6] (Edit: the order is not important)
如何使用 RxJs 实现这一点?
到目前为止我得到了类似的东西:
// This is not working properly
this.allResults$ = this.displayedActivities$.pipe(
mergeMap(activities => {
return concat(...activities?.map(activity => activity.results$ || of([])));
})
);
但它似乎没有按预期工作。
我准备了一份StackBiltz with a more complex scenario here
更新2021-02-23
我创建了一个新的 StackBlitz,您可以在其中通过单击与结果进行交互。目标是获取每个数据库的所有结果的摘要(页面底部的 table)并显示所有搜索条件下 positive/negative 结果的数量。
TL;DR => 解决方案
我刚刚创建了一个新的StackBlitz,复杂度较低,以便隔离问题。我已经包含了 Mrk Sef
提出的建议并且它正在工作。
我还有其他一些关于平面运算符的错误和对 json 的奇怪循环引用,但这更可能是由 StackBlitz 模拟器引起的。
我认为您的想法是正确的,但您需要最后一步来将您的流减少为您的价值。
您写的内容:
this.allResults$ = this.displayedActivities$.pipe(
mergeMap(activities =>
concat(...activities?.map(
activity => activity?.results$ || of([])
))
)
);
activities
中的每个条目都有一个排放,但您不想要一系列排放,您想要一个数组 merging/flattening 结果。
您可以尝试类似的方法:
this.allResults$ = this.displayedActivities$.pipe(
mergeMap(activities =>
merge(...activities?.map(
activity => activity?.results$ || of([])
))
),
toArray(),
map(res => res.flat())
);
或
this.allResults$ = this.displayedActivities$.pipe(
mergeMap(activities =>
merge(...activities?.map(
activity => activity?.results$ || of([])
))
),
reduce((acc, curr) => [...acc,...curr], [])
);
更新:
你可以运行这个最小的例子:
const displayedActivities$ = of([
{results$: of([1,2,3])},
{results$: of([10,20,30])},
{something: "else"},
{results$: of([100,200,300])}
]);
const allResults$ = displayedActivities$.pipe(
mergeMap(activities =>
merge(...activities.filter(
activity => activity?.results$ != null
).map(
activity => activity.results$
))
),
reduce((acc, curr) => [...acc,...curr], [])
).subscribe(console.log);
输出:
[1, 2, 3, 10, 20, 30, 100, 200, 300]
更新#2
这就是您处理将 属性 result$ observables 更改为 long-lived observables 的方式。和上面的toArray()
、Array.flat()
差不多,只不过现在用的是combineLatest
这是应该 运行 的代码。它将保留每个可观察到的最新发射作为最终组合输出的一部分:
模拟设置
创建 displayedActivities$
发射对象数组,每个对象都有一个 属性 result$
在完成之前发射一定次数(在本例中为 3 次)。
/****
* Pipeable Operator:
* Takes arrays emitted by the source and spaces out their
* values by the given interval time in milliseconds
****/
function intervalArray<T>(intervalTime = 1000): OperatorFunction<T[], T> {
return pipe(
concatMap((v: T[]) =>
concat(
...v.map((value: T) =>
EMPTY.pipe(
delay(intervalTime),
startWith(value)
)
)
)
)
);
}
const displayedActivities$ = of([
{ results$:
of([
[1,2,3],
[4,5,6],
[7,8,9]
]).pipe(intervalArray(250))
},
{ results$:
of([
[10,20,30],
[40,50,60],
[70,80,90]
]).pipe(intervalArray(300))
},
{ something: "else" },
{ results$:
of([
[100,200,300],
[400,500,600],
[700,800,900]
]).pipe(intervalArray(350))
}
]);
合并所有结果$
const allResults$ = displayedActivities$.pipe(
mergeMap(activities =>
combineLatest(...activities.filter(
activity => activity?.results$ != null
).map(
activity => activity.results$.pipe(startWith([]))
))
),
map(latest => latest.flat())
).subscribe(console.log);
输出:
[1,2,3]
[1,2,3,10,20,30]
[1,2,3,10,20,30,100,200,300]
[4,5,6,10,20,30,100,200,300]
[4,5,6,40,50,60,100,200,300]
[4,5,6,40,50,60,400,500,600]
[7,8,9,40,50,60,400,500,600]
[7,8,9,70,80,90,400,500,600]
[7,8,9,70,80,90,700,800,900]
您可以尝试使用 Array#map
映射数组中的可观察对象,并通过 forkJoin
传递它以获得并行请求。可以使用 switchMap
.
尝试以下方法
Observable<SearchActivity[]>.pipe(
switchMap((data: any) =>
forkJoin(data.map(item =>
item.results$
))
)
).subscribe({
next: value => {
console.log(value);
},
error: error => {
// handle error
}
});
注:
Observable<SearchActivity[]>
这里是占位符。将其替换为实际的源可观察变量。forkJoin
仅当所有源可观察对象完成时才会发出。如果您希望数据流替换为combineLatest
或zip
.
这样的东西可能适合你:
this.allResults$ = this.displayedActivities$.pipe(
switchMap(activities =>
merge(...activities.map(a => a.results$))
),
mergeMap(resultsArray => from(resultsArray)),
scan((acc, cur) => ([...acc, cur]), [])
);
流程如下:
scan
中的函数可能过于简单,因为它只是将所有发射组合到一个数组中,而没有考虑表示相同结果的多个发射。我看到你在返回的 SearchActivityResult
项目上有一个 id
,所以也许可以在考虑到这一点的扫描内部构建一个更复杂的处理函数。
但希望这能让您入门。
这是一个有效的 StackBlitz