Spring WebFlux:WebClient 结合了 2 个 Reactive RESTful Web 服务

Spring WebFlux: WebClient combines 2 Reactive RESTful Web Service

我正在使用 Spring WebFlux 开发具有 Reactive 支持的微服务应用程序。让我们看看,我有一个属于某个类别的问题列表和每个问题的选项列表。我将问题和选项分离到具有 Reactive 支持的服务中,我希望有另一个服务使用 Spring WebFlux 的 WebClient 将它们组合在一起。当然也需要支持Reactive

问题服务实现:

public Flux<Question> getQuestions(String categoryId) {
    WebClient client = WebClient
        .builder()
        .baseUrl(getServiceUrl())
        .build();

    WebClient.ResponseSpec responseSpec = client
        .get()
        .uri("/questions/" + categoryId)
        .retrieve();

    return responseSpec.bodyToFlux(Question.class);
}

OptionServiceImpl:

public Flux<Option> getOptions(String questionId) {
    WebClient client = WebClient
            .builder()
            .baseUrl(getServiceUrl())
            .build();

        WebClient.ResponseSpec responseSpec = client
            .get()
            .uri("/options/" + questionId)
            .retrieve();

        return responseSpec.bodyToFlux(Option.class);
}

但我不知道如何以 Reactive 方式将问题与其选项结合起来。谁能提出一些想法?

更新的解决方案:

我添加了一个名为 CompositeQuestion

的新 class
@Data
@AllArgsConstructor 
public class CompositeQuestion {

    private String id;

    private String description;

    private String categoryId;

    private List<Option> options;

}

现在要获取问题的列表选项,我的代码如下:

Flux<CompositeQuestion> compositQuestion = questionsFromCoreQuestionService.flatMap(question ->
        optionService.getOptions(question.getId())
            .collectList()
            .map(options -> new CompositeQuestion(question.getId(), question.getDescription(), question.getCategoryId(), options)))
        .subscribeOn(Schedulers.elastic());

假设您有如下 class:

@Value
public class QuestionOptions {
     private Question question;
     private List<Option> options;
}

(@Value annotation is from Lombok)

您可以检索带有如下选项的问题:

Flux<String> categoryIds = Flux.just("1", "2", "3");
Flux<QuestionOptions> questionOptions = 
    categoryIds.flatMap(categoryId -> 
         // retrieve questions for each category
         questionService.getQuestions(categoryId)
              // get options for each question 
              .flatMap(question -> optionService.getOptions(question.getId())
              .collectList()
              .map(optionList -> new QuestionOptions(question, optionList))
         ))
    .subscribeOn(Schedulers.elastic()); // retrieve each question on a different thread.

请注意,如果类别的顺序可能与您要求的顺序不同。如果这对您来说是一个交易破坏者,您可以考虑使用 concatMap() instead of flatMap(),尽管每个请求将按顺序 运行。