Spring 使用 rest 调用在微服务中响应
Spring Reactive in Microservices using rest calls
我在使用 spring boot web flux 进行微服务项目,这里有一些服务:
- baseInfoService
- 通知服务
- 账户服务
- 订单服务
- 执行服务
我正在 OrderService 中实现一个服务,它将具有以下流程:
@PostMapping("/create")
//@PostMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Mono<OrderDto> createOrder(@RequestParam Publisher<OrderRequestDto> orderRequest){
return service.createOrder(Mono.from(orderRequest));
}
这是我的服务:
public Mono<OrderDto> createOrder(Mono<OrderRequestDto> orderRequest){
rule.validate(orderRequest); //returns Mono<OrderRequestDto>
//====== in the same time after validation =======//
//@todo call baseInfoService to get some data
//@todo call baseInfoService to get some other data
//@todo call performService and send orderRequest to get some data
//====== after getting above request do ======//
//@todo calculate and fill some order data acording above request results
//====== from here temporary I will use blocking style ======//
OrderEntity order = transformer.transform(orderRequest);
order = repository.save(order);
OrderDto orderDto = transformer.transformToDto(order);
//======== after above calculation and save do these operation in the same time =======//
//@todo call accountService and send orderDto to serive
//failure on this service is important and I should take proper action (there should be a call back instead of continue and sending notif)
//@todo call notificationService and send orderDto to service
return Mono.just(orderDto);
}
现在我应该使用 webClient 进行服务调用,将来我会使用 kafka 和 spring 云流并将请求作为事件发送。
- 第一个问题是我应该在控制器中使用
Publisher<OrderRequestDto>
还是 OrderRequestDto
?
- 第二个问题:我已经看到 github 和其他站点中存在许多简单示例,但所有示例都只是从存储库中获取一个单声道并将其传递给控制器,再传递给 return 给用户,我做不到找到一个像这个场景这样复杂的现实世界的例子。能否请您提供我应该实现它的方式。
谢谢。
首先,如果你使用反应堆,你不应该调用阻塞 api,就像你在 save 方法中所做的那样。
当您使用 webflux 时,您的线程数量很少,如果您阻塞这些线程,您的应用程序性能将会非常差。我建议改用反应式数据库驱动程序。
1,你不应该在控制器中使用平面对象,因为你必须阻塞线程才能获取对象本身。在反应堆中,你不能调用阻塞操作。如果您不确定什么是阻塞的,也建议使用 blockhound。如果调用阻塞方法,它会在测试期间抛出异常。
2、在响应式流中,你必须使用响应式操作,如map
、flatmap
等来对你的对象进行操作。
例如,假设你想遍历一个对象列表,从网络加载一些数据到每个对象并将它们保存到数据库中(注意,这里我将使用模拟数据库和网络服务,但你可以把那些改成真正的服务,这个例子的本质是处理器。我这里也用了Kotlin,类似于Java)
//Mock repository
object ReactorDatabase {
fun saveData(car: Car): Mono<Car> = Mono.just(Car(1, car.producer,car.type))
fun loadData(): Flux<Vehicle> = Flux.just(Vehicle("Toyota"), Vehicle("Ford"))
}
//Mock webservice
object ReactiveWebService {
fun loadFromWeb(): Mono<String> = Mono.just("corolla").delayElement(Duration.ofMillis(100))
}
处理器函数:
//Load vehicles
//Map vehicles to cars
//Load car type somewhere from the web and zip with the car
//Add type to car
//Save cars into database
//Return with the saved entites
fun process(): Flux<Car>{
return ReactorDatabase.loadData()
.map { Mapper.vehicleToCar(it) }
.zipWith(ReactiveWebService.loadFromWeb())
.map { (car, type) -> Mapper.carWithDetails(car, type) }
.concatMap { ReactorDatabase.saveData(it) }
}
如您所见,所有方法都是从反应流中调用的。当你使用响应式 IO 操作时,它总是 returns 与一个 Publisher。您可以使用 flatMap
、concatMap
... 获取对象并在不阻塞的情况下推向流。此外,如果您的操作没有阻塞,那么您可以从 map
.
调用 Mappers 等方法
对于 reactor,您不能使用命令式编程风格(除了使用协程或类似的东西)。
另外 here 是响应式 spring 云流的示例。
我在使用 spring boot web flux 进行微服务项目,这里有一些服务:
- baseInfoService
- 通知服务
- 账户服务
- 订单服务
- 执行服务
我正在 OrderService 中实现一个服务,它将具有以下流程:
@PostMapping("/create")
//@PostMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Mono<OrderDto> createOrder(@RequestParam Publisher<OrderRequestDto> orderRequest){
return service.createOrder(Mono.from(orderRequest));
}
这是我的服务:
public Mono<OrderDto> createOrder(Mono<OrderRequestDto> orderRequest){
rule.validate(orderRequest); //returns Mono<OrderRequestDto>
//====== in the same time after validation =======//
//@todo call baseInfoService to get some data
//@todo call baseInfoService to get some other data
//@todo call performService and send orderRequest to get some data
//====== after getting above request do ======//
//@todo calculate and fill some order data acording above request results
//====== from here temporary I will use blocking style ======//
OrderEntity order = transformer.transform(orderRequest);
order = repository.save(order);
OrderDto orderDto = transformer.transformToDto(order);
//======== after above calculation and save do these operation in the same time =======//
//@todo call accountService and send orderDto to serive
//failure on this service is important and I should take proper action (there should be a call back instead of continue and sending notif)
//@todo call notificationService and send orderDto to service
return Mono.just(orderDto);
}
现在我应该使用 webClient 进行服务调用,将来我会使用 kafka 和 spring 云流并将请求作为事件发送。
- 第一个问题是我应该在控制器中使用
Publisher<OrderRequestDto>
还是OrderRequestDto
? - 第二个问题:我已经看到 github 和其他站点中存在许多简单示例,但所有示例都只是从存储库中获取一个单声道并将其传递给控制器,再传递给 return 给用户,我做不到找到一个像这个场景这样复杂的现实世界的例子。能否请您提供我应该实现它的方式。
谢谢。
首先,如果你使用反应堆,你不应该调用阻塞 api,就像你在 save 方法中所做的那样。 当您使用 webflux 时,您的线程数量很少,如果您阻塞这些线程,您的应用程序性能将会非常差。我建议改用反应式数据库驱动程序。
1,你不应该在控制器中使用平面对象,因为你必须阻塞线程才能获取对象本身。在反应堆中,你不能调用阻塞操作。如果您不确定什么是阻塞的,也建议使用 blockhound。如果调用阻塞方法,它会在测试期间抛出异常。
2、在响应式流中,你必须使用响应式操作,如map
、flatmap
等来对你的对象进行操作。
例如,假设你想遍历一个对象列表,从网络加载一些数据到每个对象并将它们保存到数据库中(注意,这里我将使用模拟数据库和网络服务,但你可以把那些改成真正的服务,这个例子的本质是处理器。我这里也用了Kotlin,类似于Java)
//Mock repository
object ReactorDatabase {
fun saveData(car: Car): Mono<Car> = Mono.just(Car(1, car.producer,car.type))
fun loadData(): Flux<Vehicle> = Flux.just(Vehicle("Toyota"), Vehicle("Ford"))
}
//Mock webservice
object ReactiveWebService {
fun loadFromWeb(): Mono<String> = Mono.just("corolla").delayElement(Duration.ofMillis(100))
}
处理器函数:
//Load vehicles
//Map vehicles to cars
//Load car type somewhere from the web and zip with the car
//Add type to car
//Save cars into database
//Return with the saved entites
fun process(): Flux<Car>{
return ReactorDatabase.loadData()
.map { Mapper.vehicleToCar(it) }
.zipWith(ReactiveWebService.loadFromWeb())
.map { (car, type) -> Mapper.carWithDetails(car, type) }
.concatMap { ReactorDatabase.saveData(it) }
}
如您所见,所有方法都是从反应流中调用的。当你使用响应式 IO 操作时,它总是 returns 与一个 Publisher。您可以使用 flatMap
、concatMap
... 获取对象并在不阻塞的情况下推向流。此外,如果您的操作没有阻塞,那么您可以从 map
.
对于 reactor,您不能使用命令式编程风格(除了使用协程或类似的东西)。
另外 here 是响应式 spring 云流的示例。