如何将对象保存到通量内的反应性存储库
How to save object to reactive repository inside flux
我有问题的答案列表。我想保存这些答案,然后(在 mongo 给他们一个 id 之后)将它们添加到他们的问题中。
现在我是这样做的:
public Flux<Answer> createAnswers(List<Answer> answers) {
return answerRepository.saveAll(answers)
.map(answer -> {
questionRepository.findById(answer.getQuestionId())
.subscribe(question -> {
question.getAnswers().removeIf(ans -> Objects.equals(ans.getId(), answer.getId()));
question.getAnswers().add(answer);
questionRepository.save(question).block();
});
return answer;
});
}
我也试过 ..saveAll(answers).doOnNext()
和 doOnEach()
但是这样问题就不会被保存。
好像map是用来转换数据的,不是用来对每个元素进行操作的。另外,我对 block()
的调用有点困惑。
有没有更好的方法来达到我的目的?
你不应该在 Flux
或 Mono
上调用 subscribe
/block
returns 反应类型本身的方法。
这样做会使当前管道与您要实现的目标脱钩。最好的情况,这将打破背压支持。在许多情况下,这也可能以令人惊讶的方式中断;例如,如果您的方法正在处理 HTTP request/response 或某种会话,则 response/session 可能会关闭,而您的其他订阅者仍在尝试对其执行某些操作。
我相信这样的事情更一致(虽然我在这里遗漏了很多上下文,所以这可能不是实现这一目标的最佳方式):
public Flux<Answer> createAnswers(List<Answer> answers) {
return answerRepository.saveAll(answers)
.flatMap(answer -> {
return questionRepository
.findById(answer.getQuestionId())
.flatMap(question -> {
question.getAnswers().removeIf(ans -> Objects.equals(ans.getId(), answer.getId()));
question.getAnswers().add(answer);
return questionRepository.save(question);
})
.thenReturn(answer);
});
}
我有问题的答案列表。我想保存这些答案,然后(在 mongo 给他们一个 id 之后)将它们添加到他们的问题中。
现在我是这样做的:
public Flux<Answer> createAnswers(List<Answer> answers) {
return answerRepository.saveAll(answers)
.map(answer -> {
questionRepository.findById(answer.getQuestionId())
.subscribe(question -> {
question.getAnswers().removeIf(ans -> Objects.equals(ans.getId(), answer.getId()));
question.getAnswers().add(answer);
questionRepository.save(question).block();
});
return answer;
});
}
我也试过 ..saveAll(answers).doOnNext()
和 doOnEach()
但是这样问题就不会被保存。
好像map是用来转换数据的,不是用来对每个元素进行操作的。另外,我对 block()
的调用有点困惑。
有没有更好的方法来达到我的目的?
你不应该在 Flux
或 Mono
上调用 subscribe
/block
returns 反应类型本身的方法。
这样做会使当前管道与您要实现的目标脱钩。最好的情况,这将打破背压支持。在许多情况下,这也可能以令人惊讶的方式中断;例如,如果您的方法正在处理 HTTP request/response 或某种会话,则 response/session 可能会关闭,而您的其他订阅者仍在尝试对其执行某些操作。
我相信这样的事情更一致(虽然我在这里遗漏了很多上下文,所以这可能不是实现这一目标的最佳方式):
public Flux<Answer> createAnswers(List<Answer> answers) {
return answerRepository.saveAll(answers)
.flatMap(answer -> {
return questionRepository
.findById(answer.getQuestionId())
.flatMap(question -> {
question.getAnswers().removeIf(ans -> Objects.equals(ans.getId(), answer.getId()));
question.getAnswers().add(answer);
return questionRepository.save(question);
})
.thenReturn(answer);
});
}