Spring WebFlux:WebClient 结合了 2 个 Reactive RESTful Web 服务
Spring WebFlux: WebClient combines 2 Reactive RESTful Web Service
我正在使用 Spring WebFlux 开发具有 Reactive 支持的微服务应用程序。让我们看看,我有一个属于某个类别的问题列表和每个问题的选项列表。我将问题和选项分离到具有 Reactive 支持的服务中,我希望有另一个服务使用 Spring WebFlux 的 WebClient 将它们组合在一起。当然也需要支持Reactive
问题服务实现:
public Flux<Question> getQuestions(String categoryId) {
WebClient client = WebClient
.builder()
.baseUrl(getServiceUrl())
.build();
WebClient.ResponseSpec responseSpec = client
.get()
.uri("/questions/" + categoryId)
.retrieve();
return responseSpec.bodyToFlux(Question.class);
}
OptionServiceImpl:
public Flux<Option> getOptions(String questionId) {
WebClient client = WebClient
.builder()
.baseUrl(getServiceUrl())
.build();
WebClient.ResponseSpec responseSpec = client
.get()
.uri("/options/" + questionId)
.retrieve();
return responseSpec.bodyToFlux(Option.class);
}
但我不知道如何以 Reactive 方式将问题与其选项结合起来。谁能提出一些想法?
更新的解决方案:
我添加了一个名为 CompositeQuestion
的新 class
@Data
@AllArgsConstructor
public class CompositeQuestion {
private String id;
private String description;
private String categoryId;
private List<Option> options;
}
现在要获取问题的列表选项,我的代码如下:
Flux<CompositeQuestion> compositQuestion = questionsFromCoreQuestionService.flatMap(question ->
optionService.getOptions(question.getId())
.collectList()
.map(options -> new CompositeQuestion(question.getId(), question.getDescription(), question.getCategoryId(), options)))
.subscribeOn(Schedulers.elastic());
假设您有如下 class:
@Value
public class QuestionOptions {
private Question question;
private List<Option> options;
}
(@Value annotation is from Lombok)
您可以检索带有如下选项的问题:
Flux<String> categoryIds = Flux.just("1", "2", "3");
Flux<QuestionOptions> questionOptions =
categoryIds.flatMap(categoryId ->
// retrieve questions for each category
questionService.getQuestions(categoryId)
// get options for each question
.flatMap(question -> optionService.getOptions(question.getId())
.collectList()
.map(optionList -> new QuestionOptions(question, optionList))
))
.subscribeOn(Schedulers.elastic()); // retrieve each question on a different thread.
请注意,如果类别的顺序可能与您要求的顺序不同。如果这对您来说是一个交易破坏者,您可以考虑使用 concatMap()
instead of flatMap()
,尽管每个请求将按顺序 运行。
我正在使用 Spring WebFlux 开发具有 Reactive 支持的微服务应用程序。让我们看看,我有一个属于某个类别的问题列表和每个问题的选项列表。我将问题和选项分离到具有 Reactive 支持的服务中,我希望有另一个服务使用 Spring WebFlux 的 WebClient 将它们组合在一起。当然也需要支持Reactive
问题服务实现:
public Flux<Question> getQuestions(String categoryId) {
WebClient client = WebClient
.builder()
.baseUrl(getServiceUrl())
.build();
WebClient.ResponseSpec responseSpec = client
.get()
.uri("/questions/" + categoryId)
.retrieve();
return responseSpec.bodyToFlux(Question.class);
}
OptionServiceImpl:
public Flux<Option> getOptions(String questionId) {
WebClient client = WebClient
.builder()
.baseUrl(getServiceUrl())
.build();
WebClient.ResponseSpec responseSpec = client
.get()
.uri("/options/" + questionId)
.retrieve();
return responseSpec.bodyToFlux(Option.class);
}
但我不知道如何以 Reactive 方式将问题与其选项结合起来。谁能提出一些想法?
更新的解决方案:
我添加了一个名为 CompositeQuestion
的新 class@Data
@AllArgsConstructor
public class CompositeQuestion {
private String id;
private String description;
private String categoryId;
private List<Option> options;
}
现在要获取问题的列表选项,我的代码如下:
Flux<CompositeQuestion> compositQuestion = questionsFromCoreQuestionService.flatMap(question ->
optionService.getOptions(question.getId())
.collectList()
.map(options -> new CompositeQuestion(question.getId(), question.getDescription(), question.getCategoryId(), options)))
.subscribeOn(Schedulers.elastic());
假设您有如下 class:
@Value
public class QuestionOptions {
private Question question;
private List<Option> options;
}
(@Value annotation is from Lombok)
您可以检索带有如下选项的问题:
Flux<String> categoryIds = Flux.just("1", "2", "3");
Flux<QuestionOptions> questionOptions =
categoryIds.flatMap(categoryId ->
// retrieve questions for each category
questionService.getQuestions(categoryId)
// get options for each question
.flatMap(question -> optionService.getOptions(question.getId())
.collectList()
.map(optionList -> new QuestionOptions(question, optionList))
))
.subscribeOn(Schedulers.elastic()); // retrieve each question on a different thread.
请注意,如果类别的顺序可能与您要求的顺序不同。如果这对您来说是一个交易破坏者,您可以考虑使用 concatMap()
instead of flatMap()
,尽管每个请求将按顺序 运行。