Spring 具有多个顺序 API 的 webflux 调用并转换为无订阅和阻塞的通量对象

Spring webflux with multiple sequential API call and convert to flux object without subscribe and block

我正在研究 spring 反应式,需要使用 webclient 对其他 REST API 按顺序调用多个调用。问题是我可以对其他 Rest API 进行多次调用,但如果不订阅或阻止,则无法读取响应。由于非响应式编程,我无法使用订阅或阻止。有什么办法,我可以在阅读响应时合并并将其作为通量发送。 下面是我卡住的那段代码。

private Flux<SeasonsDto> getSeasonsInfo(List<HuntsSeasonsMapping> l2, String seasonsUrl) {
for (HuntsSeasonsMapping s : l2)
        {
            List<SeasonsJsonDto> list = huntsSeasonsProcessor.appendSeaosonToJson(s.getSeasonsRef());
for (SeasonsJsonDto sjdto:list)
            {
 Mono<SeasonsDto> mono =new  SeasonsAdapter("http://localhost:8087/").callToSeasonsAPI(sjdto.getSeasonsRef());
    //Not able to read stream without subscribe an return as Flux object
}

 public Mono<SeasonsDto> callToSeasonsAPI(Long long1) {
         LOGGER.debug("Seasons API call");
        
          return this.webClient.get().uri("hunts/seasonsInfo/"
          +long1).header("X-GoHunt-LoggedIn-User",
          "a4d4b427-c716-458b-9bb5-9917b6aa30ff").retrieve().bodyToMono(SeasonsDto.class);
         
}

请帮忙解决这个问题。

您需要使用 mapflatMapconcatMap 等运算符组合反应流。

private Flux<SeasonsDto> getSeasonsInfo(List<HuntsSeasonsMapping> l2, String seasonsUrl) {
   List<Mono<SeasonsDto>> monos = new ArrayList<>();
   for (HuntsSeasonsMapping s : l2) {
   
      List<SeasonsJsonDto> list = huntsSeasonsProcessor.appendSeaosonToJson(s.getSeasonsRef());
      for (SeasonsJsonDto sjdto:list) {
         Mono<SeasonsDto> mono =new  SeasonsAdapter("http://localhost:8087/").callToSeasonsAPI(sjdto.getSeasonsRef()); 
         //Not able to read stream without subscribe an return as Flux object
         monos.add(mono);
      }
    }
  return Flux.fromIterable(monos).concatMap(mono -> mono);
}

这可以使用 Steam API 进一步改进,我建议您研究一下,但我不想更改太多现有代码。

我已经想出了如何做到这一点。我已经完全重写了代码并改变了反应。这意味着所有 for 循环都已被删除。下面是相同的代码,可能对其他人有帮助。

public Flux<SeasonsDto> getAllSeasonDetails(String uuid) {

    return hunterRepository.findByUuidAndIsPrimaryAndDeleted(uuid, true, false).next().flatMapMany(h1 -> {
        return huntsMappingRepository.findByHunterIdAndDeleted(h1.getId(), false).flatMap(k -> {
            return huntsMappingRepository.findByHuntReferrenceIdAndDeleted(k.getHuntReferrenceId(), false)
                    .flatMap(l2 -> {
                        return huntsSeasonsProcessor.appendSeaosonToJsonFlux(l2.getSeasonsDtl()).flatMap(fs -> {
                            return seasonsAdapter.callSeasonsAPI(fs.getSeasonsRef(), h1.getId(), uuid).map(k->{
                                return k;
                            });
                        });
                    });
        });
    });
}