Spring 使用 rest 调用在微服务中响应

Spring Reactive in Microservices using rest calls

我在使用 spring boot web flux 进行微服务项目,这里有一些服务:

我正在 OrderService 中实现一个服务,它将具有以下流程:

@PostMapping("/create")
//@PostMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Mono<OrderDto> createOrder(@RequestParam Publisher<OrderRequestDto> orderRequest){
    return service.createOrder(Mono.from(orderRequest));
}

这是我的服务:

public Mono<OrderDto> createOrder(Mono<OrderRequestDto> orderRequest){
    rule.validate(orderRequest); //returns Mono<OrderRequestDto>

    //====== in the same time after validation =======//

    //@todo call baseInfoService to get some data

    //@todo call baseInfoService to get some other data

    //@todo call performService and send orderRequest to get some data

    //====== after getting above request do ======//

    //@todo calculate and fill some order data acording above request results        

    //====== from here temporary I will use blocking style ======//

    OrderEntity order = transformer.transform(orderRequest);

    order = repository.save(order); 

    OrderDto orderDto = transformer.transformToDto(order);

    //======== after above calculation and save do these operation in the same time =======// 

    //@todo call accountService and send orderDto to serive
    //failure on this service is important and I should take proper action (there should be a call back instead of continue and sending notif)

    //@todo call notificationService and send orderDto to service

    return Mono.just(orderDto);
}

现在我应该使用 webClient 进行服务调用,将来我会使用 kafka 和 spring 云流并将请求作为事件发送。

谢谢。

首先,如果你使用反应堆,你不应该调用阻塞 api,就像你在 save 方法中所做的那样。 当您使用 webflux 时,您的线程数量很少,如果您阻塞这些线程,您的应用程序性能将会非常差。我建议改用反应式数据库驱动程序。

1,你不应该在控制器中使用平面对象,因为你必须阻塞线程才能获取对象本身。在反应堆中,你不能调用阻塞操作。如果您不确定什么是阻塞的,也建议使用 blockhound。如果调用阻塞方法,它会在测试期间抛出异常。

2、在响应式流中,你必须使用响应式操作,如mapflatmap等来对你的对象进行操作。

例如,假设你想遍历一个对象列表,从网络加载一些数据到每个对象并将它们保存到数据库中(注意,这里我将使用模拟数据库和网络服务,但你可以把那些改成真正的服务,这个例子的本质是处理器。我这里也用了Kotlin,类似于Java)

//Mock repository
object ReactorDatabase {
    fun saveData(car: Car): Mono<Car> = Mono.just(Car(1, car.producer,car.type))
    fun loadData(): Flux<Vehicle> = Flux.just(Vehicle("Toyota"), Vehicle("Ford"))
}  

//Mock webservice
object ReactiveWebService {
    fun loadFromWeb(): Mono<String> = Mono.just("corolla").delayElement(Duration.ofMillis(100))
}

处理器函数:

//Load vehicles
//Map vehicles to cars
//Load car type somewhere from the web and zip with the car
//Add type to car
//Save cars into database
//Return with the saved entites
fun process(): Flux<Car>{
        return ReactorDatabase.loadData()
            .map { Mapper.vehicleToCar(it) }           
            .zipWith(ReactiveWebService.loadFromWeb())
            .map { (car, type) -> Mapper.carWithDetails(car, type) }
            .concatMap { ReactorDatabase.saveData(it) }
    }

如您所见,所有方法都是从反应流中调用的。当你使用响应式 IO 操作时,它总是 returns 与一个 Publisher。您可以使用 flatMapconcatMap... 获取对象并在不阻塞的情况下推向流。此外,如果您的操作没有阻塞,那么您可以从 map.

调用 Mappers 等方法

对于 reactor,您不能使用命令式编程风格(除了使用协程或类似的东西)。

另外 here 是响应式 spring 云流的示例。