在 Spring Webflux 中组合多个单声道
Combine multiple mono in Spring Webflux
我是 webflux 的新手,我正在尝试使用 Flux
执行多个单声道。但我认为我做错了..这是执行多个 Mono
并将其收集到列表的最佳方法吗?
这是我的代码:
mainService.getAllBranch()
.flatMapMany(branchesList -> {
List<Branch> branchesList2 = (List<Branch>) branchesList.getData();
List<Mono<Transaction>> trxMonoList= new ArrayList<>();
branchesList2.stream().forEach(branch -> {
trxMonoList.add(mainService.getAllTrxByBranchId(branch.branchId));
});
return Flux.concat(trxMonoList); // <--- is there any other way than using concat?
})
.collectList()
.flatMap(resultList -> combineAllList());
interface MainService{
Mono<RespBody> getAllBranch();
Mono<RespBody> getAllTrxByBranchId(String branchId); //will return executed url ex: http://trx.com/{branchId}
}
到目前为止,我可以用上面的代码来解释它:
- 获取所有分支
- 遍历所有
branchesList2
并将其添加到 trxMonoList
- return
Flux.concat
,这是我不确定这样做是否正确的地方。但它正在工作
- 合并所有列表
我只是想知道在我的上下文中这是使用 Flux
的正确方法吗?还是有更好的方法来实现我正在尝试做的事情?
您需要稍微重构一下您的代码以响应式。
mainService.getAllBranch()
.flatMapMany(branchesList -> Flux.fromIterable(branchesList.getData())) (1)
.flatMap(branch -> mainService.getAllTrxByBranchId(branch.branchId)) (2)
.collectList()
.flatMap(resultList -> combineAllList());
1) 从 List 创建 Flux 个分支;
2) 遍历每个元素并调用服务。
您不应在 Reactor 中使用 Stream API,因为它具有相同的方法,但具有针对多线程的适配和优化。
这里真正的问题是您不应该在 Flux
中多次点击 Mono
。那会给你带来麻烦。如果你正在设计 API 你应该修复它以正确的反应方式做你想做的事。
interface MainService{
Flux<Branch> getAllBranch();
Flux<Transaction> getAllTrxByBranchId(Flux<String> branchIds);
}
那么你的代码就会变得更简单,反应式框架就可以正常工作了。
mainService.getAllTrxByBranchId(mainService.getAllBranch().map(Branch::getId));
我是 webflux 的新手,我正在尝试使用 Flux
执行多个单声道。但我认为我做错了..这是执行多个 Mono
并将其收集到列表的最佳方法吗?
这是我的代码:
mainService.getAllBranch()
.flatMapMany(branchesList -> {
List<Branch> branchesList2 = (List<Branch>) branchesList.getData();
List<Mono<Transaction>> trxMonoList= new ArrayList<>();
branchesList2.stream().forEach(branch -> {
trxMonoList.add(mainService.getAllTrxByBranchId(branch.branchId));
});
return Flux.concat(trxMonoList); // <--- is there any other way than using concat?
})
.collectList()
.flatMap(resultList -> combineAllList());
interface MainService{
Mono<RespBody> getAllBranch();
Mono<RespBody> getAllTrxByBranchId(String branchId); //will return executed url ex: http://trx.com/{branchId}
}
到目前为止,我可以用上面的代码来解释它:
- 获取所有分支
- 遍历所有
branchesList2
并将其添加到trxMonoList
- return
Flux.concat
,这是我不确定这样做是否正确的地方。但它正在工作 - 合并所有列表
我只是想知道在我的上下文中这是使用 Flux
的正确方法吗?还是有更好的方法来实现我正在尝试做的事情?
您需要稍微重构一下您的代码以响应式。
mainService.getAllBranch()
.flatMapMany(branchesList -> Flux.fromIterable(branchesList.getData())) (1)
.flatMap(branch -> mainService.getAllTrxByBranchId(branch.branchId)) (2)
.collectList()
.flatMap(resultList -> combineAllList());
1) 从 List 创建 Flux 个分支;
2) 遍历每个元素并调用服务。
您不应在 Reactor 中使用 Stream API,因为它具有相同的方法,但具有针对多线程的适配和优化。
这里真正的问题是您不应该在 Flux
中多次点击 Mono
。那会给你带来麻烦。如果你正在设计 API 你应该修复它以正确的反应方式做你想做的事。
interface MainService{
Flux<Branch> getAllBranch();
Flux<Transaction> getAllTrxByBranchId(Flux<String> branchIds);
}
那么你的代码就会变得更简单,反应式框架就可以正常工作了。
mainService.getAllTrxByBranchId(mainService.getAllBranch().map(Branch::getId));