我不知道如何使用 webflux 创建合适的流程

I am at a loss of how to create an appropriate flow using webflux

问题陈述:我有一个 POST 请求 booking-service API 获取 BookingRecord。我映射它以提取值,以便使用 WebClient 调用另一个 fare-service API。我收到了那个电话的 Mono<Fare>。我需要检查 getFare() 类型的 BookingRecord 方法的值是否与 WebClient 编辑的 Fare 类型 return 的 getFare() 相同。如果没有,我需要引发异常,并将其传递给调用者。这里的调用者是另一个微服务,ui-service 调用预订服务 API(所以我应该如何处理这个,将错误传回,否则是什么最好的办法是什么?)否则我会将新的 BookingRecord 和 return 该记录的 ID 保存给调用者。最好的流程顺序是什么?我尽了最大努力但没有成功,所以我将代码粘贴到这里。

public HandlerFunction<ServerResponse> book = request ->
    {
request.bodyToMono(BookingRecord.class)
        .map(br ->
        {
            this.webClient.get()
                    .uri("/fares/get/{flightNumber}/{flightDate}",
                         br.getFlightNumber(),
                         br.getFlightDate())
                    .retrieve()
                    .bodyToMono(Fare.class)
                    .map(f ->
                    {
                        if (!f.getFare()
                                .equals(br.getFare()))
                        {
                            throw new RuntimeException("Fare is tampered");
                        }
                        else
                        {
                            id = bookingRepository.save(br).getId();
                        }
                        return id;
                    })
                    .subscribe();
            return id;
        });

return ServerResponse.ok()
        .body(BodyInserters.fromObject(id));
};

经过多次调整,这就是我所做的。希望这是正确的做法。 1. 我从票价服务本身引发 500 Http 错误,而不是检查预订服务。

public HandlerFunction<ServerResponse> getFare = request ->
    {
        String flightNumber = request.pathVariable("flightNumber");
        String flightDate   = request.pathVariable("flightDate");
        String fare         = request.pathVariable("fare");

        Mono<ServerResponse> notFound = ServerResponse.notFound()
                .build();

        return Mono
                .justOrEmpty(faresRepository.getFareByFlightNumberAndFlightDateAndFare(flightNumber,
                                                                                       flightDate,
                                                                                       fare))
                .flatMap(f -> ServerResponse.ok()
                        .contentType(APPLICATION_JSON)
                        .body(fromObject(f)))
                .switchIfEmpty(notFound);
    };
  1. 使用 onStatus() 方法处理预订服务中的异常

    public HandlerFunction<ServerResponse> book = request ->
    {
    
        logger.info("Inside Book function");
        return request.bodyToMono(BookingRecord.class)
                .flatMap(br ->
                {
                    logger.info("Calling fare-service");
                    return this.webClient.get()
                            .uri("/fares/get/{flightNumber}/{flightDate}/{fare}",
                                 br.getFlightNumber(),
                                 br.getFlightDate(),
                                 br.getFare())
                            .retrieve()
                            .onStatus(HttpStatus::isError,
                                      x -> Mono
                                              .error(new RuntimeException("Fare has been tampered with!!")))
                            .bodyToMono(Fare.class);
    
                })
                .map(fare ->
                {
                    logger.info("Saving a BookingRecord");
                    BookingRecord br = new BookingRecord();
                    br.setFlightNumber(fare.getFlightNumber());
                    br.setFlightDate(fare.getFlightDate());
                    br.setFare(fare.getFare());
    
                    long id = bookingRepository.save(br)
                            .getId();
    
                    return id;
                })
                .flatMap(id -> ServerResponse.ok()
                        .body(BodyInserters.fromObject(id)));
    };
    

通过这种方式,我获得了票价篡改的例外情况或获得了成功保存数据库的 ID。