如何在 Webflux 中的另一个异步方法中进行异步调用?

How to make async call inside another async method in Webflux?

问题的解释有点长。请花一点时间帮忙!

我有两个 http 调用将提供以下数据。

第一个 http 请求调用将 return <Mono<List<Chips>>

[
  {
    "id": 1,
    "name": "redlays"
  },
  {
    "id": 2,
    "name": "yellowlays"
  },
  {
    "id": 3,
    "name": "kurkure"
  }
]

Chips型号为

@Data
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class Chips {
    private int id;
    private String name;
}

第二次 http 请求调用将 return Mono<ChipsDetails> 基于 Id

{
    "id": 1,
    "color": "red",
    "grams": "50"
}

ChipsDetails型号如下,

@Data
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class ChipsDetails {
    private int id;
    private String color;
    private String grams;
}

我已经使用 Webflux 完成了实施。这里我使用了三个模型,分别是ChipsChipsDetailsChipsFullDetails

模型 Chips 将具有两个属性 idname 然后模型 ChipsDetails 将具有三个属性 id,colorgrams 而模型 ChipsFullDetails 将具有 ChipsChipsDetails 属性的组合,即 idnamecolorgrams

@RestController
@RequestMapping("/chips")
public class ChipsController {

    @Autowired
    ChipsService chipsService;

    @GetMapping
    public Mono<List<ChipsFullDetails>> getAllChips() {
        return chipsService.getChips()
                .map(f -> {
                            List<ChipsFullDetails> chipsFullDetails = new ArrayList<>();
                            f.forEach(a -> {
                                ChipsFullDetails chipsFullDetail = new ChipsFullDetails();
                                chipsFullDetail.setId(a.getId());
                                chipsFullDetail.setName(a.getName());

                                chipsService.getChipsDetails(a.getId())
                                        .subscribe(b -> {
                                            chipsFullDetail.setColor(b.getColor());
                                            chipsFullDetail.setGrams(b.getGrams());
                                        });
                                chipsFullDetails.add(chipsFullDetail);

                            });
                            return chipsFullDetails;
                        }
                );
    }
}

这里 chipsService.getChips() 将 return Mono<List<Chips>> 这是第一次调用 chipsService.getChipsDetails(a.getId()) 将 return Mono<ChipsDetails> 这是第二次 http 请求打电话。

执行结果会是ChipsFullDetails

@Data
@Setter
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class ChipsFullDetails {
    private int id;
    private String name;
    private String color;
    private String grams;
}

问题是 ChipsFullDetails return 的 colorgrams 属性为 null,我们从第二次 http 调用中获取这些属性,即使它已在内部订阅。

如何以异步方式实现第二次 Http 调用,即 chipsService.getChipsDetails(a.getId()) 取决于第一次 http 调用 (chipsService.getChips()) 的结果?

这是否可以在不阻塞两个调用的情况下实现?

我首先将初始 Mono<List<Chips>> 转换为 Flux<Chips>,这样您就可以 flatMap 每个元素,例如类似的东西:

public Mono<List<ChipsFullDetails>> getAllChips() {
    return chipsService
            .getChips()
            // Mono<List> to Flux:
            .flatMapIterable(Function.identity())
            // flat map on each element:
            .flatMap(this::buildChipsFullDetails)
            // Flux back to Mono<List>:
            .collectList();
}

private Mono<ChipsFullDetails> buildChipsFullDetails(Chips chips) {
    return chipsService
            .getChipsDetails(chips.getId())
            // once you get both chips and details, aggregate:
            .map(details -> buildChipsFullDetails(chips, details));
}

private ChipsFullDetails buildChipsFullDetails(Chips chips, ChipsDetails details) {
    // straightforward synchronous code:
    ChipsFullDetails chipsFullDetail = new ChipsFullDetails();
    chipsFullDetail.setId(chips.getId());
    chipsFullDetail.setName(chips.getName());
    chipsFullDetail.setColor(details.getColor());
    chipsFullDetail.setGrams(details.getGrams());
    return chipsFullDetail;
}

我基本上不同意使用 Flux 的想法,尽管我承认我也有。

我想说的是,如果您想获取芯片列表的详细信息,那么您应该创建一个端点来执行此操作。那就单调了。

对于你原来的问题,有一种方法可以不用去Flux,但它读起来有点滑稽:

ParameterizedTypeReference<List<Chip>> chipList = new ParameterizedTypeReference<List<Chip>>() {};

public Mono<List<ChipDetails>> getChipDetails() {
    return webClient.get().uri("chips").retrieve().bodyToMono(chipList).flatMap(chips -> {
        return Mono.zip(chips.stream().map(chip -> webClient.get().uri("chipDetails?id="+chip.getId()).retrieve().bodyToMono(ChipDetails.class)).collect(Collectors.toList()), details -> {
            List<ChipDetails> chipDetails = new ArrayList<>();
            for (Object o : details) {
                chipDetails.add((ChipDetails) o);
            }
            return chipDetails;
        });
    });
}

这使用 Mono.zip 从列表中的每个 Chip 条目中创建一种批处理请求,同时执行它们。 Flux 最终可能会或多或少地做同样的事情,但事实并非如此。

如果你只是制作你需要的端点,那么:

ParameterizedTypeReference<List<ChipDetails>> detailsList = new ParameterizedTypeReference<List<ChipDetails>>() {};

public Mono<List<ChipDetails>> getChipDetailsReal() {
    return webClient.post().uri("chipDetails").body(webClient.get().uri("chips").retrieve().bodyToMono(chipList), chipList).retrieve().bodyToMono(detailsList);
}

这种方法避免了对同一端点的重复调用,并且正在做你想做的事情。

我不喜欢使用 Flux,而你真正的意思是 ListFlux 是一种具有背压和复杂功能的流媒体,而列表只是一个列表。