如何找到 Flux(Parent) 中的所有数据都由其内部非阻塞 Flux 或 Mono(child) 处理?
how to find all the data in a Flux(Parent) are processed by its inner non-blocking Flux or Mono(child)?
我有一个聚合器实用程序 class,我必须在其中联合多个 cassandra table 数据。我的生产代码如下所示,但并不完全相同。
@Autowired FollowersRepository followersRepository;
@Autowired TopicRepository topicRepository;
@GetMapping("/info")
public Flux<FullDetails> getData(){
return Flux.create(emitter ->{
followersRepository.findAll()
.doOnNext(data -> {
List<String> all = data.getTopiclist(); //will get list of topic id
List<Alltopics> processedList = new ArrayList<Alltopics>();
all.forEach(action -> {
topicRepository.findById(action) //will get full detail about topic
.doOnSuccess(topic ->{
processedList.add(topic);
if (processedList.size() >= all.size()) {
FullDetails fulldetails = new FullDetails(action,processedList);
emitter.next(fulldetails);
//emitter.complete();
}
})
.subscribe();
});
})
.doOnComplete(() ->{
System.out.println("All the data are processed !!!");
//emitter.complete(); // executing if all the data are pushed from database not waiting for doOnNext method to complete.
})
.subscribe();
});
}
有关更多详细信息,请参阅此处的代码 CodeLink。
我已尝试对外部 Flux 使用 doOnComplete 和 doOnFinally,它不会等待所有内部非阻塞调用完成。
我想在处理完 Flux 中所有嵌套的 Mono/Flux(非阻塞)请求后调用 onComplete。
对于嵌套阻塞 flux/mono,外部通量 doOnComplete 方法在内部 Flux/Mono 完成后执行。
PostScript(PS):-
在下面的示例中,我找不到放置 emitter.complete() 的位置。
因为在所有内部 Mono 完成之前调用 doOnComplete() 方法。
请求正文:-
[{ "content":"Intro to React and operators", "author":"Josh Long", "name":"Spring WebFlux" },{ "content":"Intro to Flux", "author":"Josh Long", "name":"Spring WebFlux" },{ "content":"Intro to Mono", "author":"Josh Long", "name":"Spring WebFlux" }]
我的休息控制器:-
@PostMapping("/topics")
public Flux<?> loadTopic(@RequestBody Flux<Alltopics> data)
{
return Flux.create(emitter ->{
data
.map(topic -> {
topic.setTopicid(null ==topic.getTopicid() || topic.getTopicid().isEmpty()?UUID.randomUUID().toString():topic.getTopicid());
return topic;
})
.doOnNext(topic -> {
topicRepository.save(topic).doOnSuccess(persistedTopic ->{
emitter.next(persistedTopic);
//emitter.complete();
}).subscribe();
})
.doOnComplete(() -> {
//emitter.complete();
System.out.println(" all the data are processed!!!");
}).subscribe();
});
}
以下是编写响应式管道时应遵循的一些规则:
doOnXYZ
运算符永远不应该用于执行大量 I/O、涉及延迟的操作或任何反应性操作。这些应该用于 "side-effects" 操作,例如日志记录、指标等。
- 你永远不应该
subscribe
来自管道或 returns 反应类型的方法。这将此操作的处理与主管道分离,这意味着不能保证您会在正确的时间获得预期的结果,也不能保证您的应用程序会知道 complete/error 信号。
- 你永远不应该
block
来自管道或 returns 反应类型的方法。这将在运行时对您的应用程序造成严重问题。
现在,由于您的代码片段非常复杂,我将只为您提供大体方向,以便您遵循另一个代码片段。
@GetMapping("/info")
public Flux<FullDetails> getData(){
return followersRepository.findAll()
.flatMap(follower -> {
Mono<List<Alltopics>> topics = topicRepository.findAllById(follower.getTopiclist()).collectList();
return topics.map(topiclist -> new FullDetails(follower.getId(), topiclist));
});
}
我有一个聚合器实用程序 class,我必须在其中联合多个 cassandra table 数据。我的生产代码如下所示,但并不完全相同。
@Autowired FollowersRepository followersRepository;
@Autowired TopicRepository topicRepository;
@GetMapping("/info")
public Flux<FullDetails> getData(){
return Flux.create(emitter ->{
followersRepository.findAll()
.doOnNext(data -> {
List<String> all = data.getTopiclist(); //will get list of topic id
List<Alltopics> processedList = new ArrayList<Alltopics>();
all.forEach(action -> {
topicRepository.findById(action) //will get full detail about topic
.doOnSuccess(topic ->{
processedList.add(topic);
if (processedList.size() >= all.size()) {
FullDetails fulldetails = new FullDetails(action,processedList);
emitter.next(fulldetails);
//emitter.complete();
}
})
.subscribe();
});
})
.doOnComplete(() ->{
System.out.println("All the data are processed !!!");
//emitter.complete(); // executing if all the data are pushed from database not waiting for doOnNext method to complete.
})
.subscribe();
});
}
有关更多详细信息,请参阅此处的代码 CodeLink。
我已尝试对外部 Flux 使用 doOnComplete 和 doOnFinally,它不会等待所有内部非阻塞调用完成。
我想在处理完 Flux 中所有嵌套的 Mono/Flux(非阻塞)请求后调用 onComplete。
对于嵌套阻塞 flux/mono,外部通量 doOnComplete 方法在内部 Flux/Mono 完成后执行。
PostScript(PS):-
在下面的示例中,我找不到放置 emitter.complete() 的位置。
因为在所有内部 Mono 完成之前调用 doOnComplete() 方法。
请求正文:-
[{ "content":"Intro to React and operators", "author":"Josh Long", "name":"Spring WebFlux" },{ "content":"Intro to Flux", "author":"Josh Long", "name":"Spring WebFlux" },{ "content":"Intro to Mono", "author":"Josh Long", "name":"Spring WebFlux" }]
我的休息控制器:-
@PostMapping("/topics")
public Flux<?> loadTopic(@RequestBody Flux<Alltopics> data)
{
return Flux.create(emitter ->{
data
.map(topic -> {
topic.setTopicid(null ==topic.getTopicid() || topic.getTopicid().isEmpty()?UUID.randomUUID().toString():topic.getTopicid());
return topic;
})
.doOnNext(topic -> {
topicRepository.save(topic).doOnSuccess(persistedTopic ->{
emitter.next(persistedTopic);
//emitter.complete();
}).subscribe();
})
.doOnComplete(() -> {
//emitter.complete();
System.out.println(" all the data are processed!!!");
}).subscribe();
});
}
以下是编写响应式管道时应遵循的一些规则:
doOnXYZ
运算符永远不应该用于执行大量 I/O、涉及延迟的操作或任何反应性操作。这些应该用于 "side-effects" 操作,例如日志记录、指标等。- 你永远不应该
subscribe
来自管道或 returns 反应类型的方法。这将此操作的处理与主管道分离,这意味着不能保证您会在正确的时间获得预期的结果,也不能保证您的应用程序会知道 complete/error 信号。 - 你永远不应该
block
来自管道或 returns 反应类型的方法。这将在运行时对您的应用程序造成严重问题。
现在,由于您的代码片段非常复杂,我将只为您提供大体方向,以便您遵循另一个代码片段。
@GetMapping("/info")
public Flux<FullDetails> getData(){
return followersRepository.findAll()
.flatMap(follower -> {
Mono<List<Alltopics>> topics = topicRepository.findAllById(follower.getTopiclist()).collectList();
return topics.map(topiclist -> new FullDetails(follower.getId(), topiclist));
});
}