如何在 Reactive Spring Integration 中处理反应类型?
How to handle reactive types within Reactive Spring Integration?
我正在尝试使用 Reactive Spring 集成,我尝试执行以下基本操作:
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(someReactiveInboundChannelAdapter)
.handle(new ReactiveMessageHandlerAdapter(flux -> flux.subscribe(System.out::println)));
}
但这不起作用,因为框架无法识别 flux
是一个 Flux<?>
实例。该框架将其视为 Message<?>
,我不知道如何以及从何处开始编写 Reactor 代码。
没错。 ReactiveMessageHandlerAdapter
期望 ReactiveMessageHandler
的 lambda 是:
Mono<Void> handleMessage(Message<?> message);
我不确定是什么让您认为输入必须是 Flux
。
我假设您的 someReactiveInboundChannelAdapter
是 MessageProducerSupport
的扩展并且完全符合 subscribeToPublisher(Publisher<? extends Message<?>> publisher)
逻辑。这样做的目的是以反应方式从源中获取数据,但仍将每个事件作为消息发送到下游通道。因此,应该清楚的是 handle()
将收到一条包含单个项目的消息,而不是整个 Flux
.
如果您想将流量视为通量,请考虑使用 fluxTransform()
运算符。
另外最好不要自己订阅,而是在配置和启动结束后在框架中订阅。
从技术上讲,您不应该考虑反应类型。你只需要分别配置一个流程,只为单个项目编写逻辑:框架为你做一个反应式交互。 Project Reactor 是一个围绕 Flux
和 Mono
的库。这就是我们谈论反应类型的地方。 Spring 集成是一个消息传递框架,它的通信是通过消息完成的。最后,对于每条消息,端点之间的交互是否以反应方式完成都无关紧要。因此,您的处理逻辑可以不受反应类型的影响。
更新
如果你想从 someReactiveInboundChannelAdapter
完全控制 Flux
,那么你需要这样做:
@Bean
public Publisher<Message<Object>> reactiveFlow() {
return IntegrationFlows.from(someReactiveInboundChannelAdapter)
.toReactivePublisher();
}
然后在需要时注入 Publisher
。然后执行 Flux.from(publisher)
和您需要的任何反应运算符,包括 subscribe()
.
IntegrationFlowAdapter
不能用于此类配置,因为它不能接受 IntegrationFlow
作为 buildFlow()
的结果。
不过 fluxTransform()
也可以为您做到这一点:
.fluxTransform(flux -> flux.as(Mono::just))
因此,下游流的负载将是一个 Flux<Message<?>>
,您可以自己处理。 fluxTransform()
返回的 Mono
将由框架订阅。它的 Flux
值是您在下游流程中的责任。然后你可以使用普通的 MessageHandler
:
.handle(m -> ((Flux<Message<?>>) m.getPayload())....subscribe())
我正在尝试使用 Reactive Spring 集成,我尝试执行以下基本操作:
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(someReactiveInboundChannelAdapter)
.handle(new ReactiveMessageHandlerAdapter(flux -> flux.subscribe(System.out::println)));
}
但这不起作用,因为框架无法识别 flux
是一个 Flux<?>
实例。该框架将其视为 Message<?>
,我不知道如何以及从何处开始编写 Reactor 代码。
没错。 ReactiveMessageHandlerAdapter
期望 ReactiveMessageHandler
的 lambda 是:
Mono<Void> handleMessage(Message<?> message);
我不确定是什么让您认为输入必须是 Flux
。
我假设您的 someReactiveInboundChannelAdapter
是 MessageProducerSupport
的扩展并且完全符合 subscribeToPublisher(Publisher<? extends Message<?>> publisher)
逻辑。这样做的目的是以反应方式从源中获取数据,但仍将每个事件作为消息发送到下游通道。因此,应该清楚的是 handle()
将收到一条包含单个项目的消息,而不是整个 Flux
.
如果您想将流量视为通量,请考虑使用 fluxTransform()
运算符。
另外最好不要自己订阅,而是在配置和启动结束后在框架中订阅。
从技术上讲,您不应该考虑反应类型。你只需要分别配置一个流程,只为单个项目编写逻辑:框架为你做一个反应式交互。 Project Reactor 是一个围绕 Flux
和 Mono
的库。这就是我们谈论反应类型的地方。 Spring 集成是一个消息传递框架,它的通信是通过消息完成的。最后,对于每条消息,端点之间的交互是否以反应方式完成都无关紧要。因此,您的处理逻辑可以不受反应类型的影响。
更新
如果你想从 someReactiveInboundChannelAdapter
完全控制 Flux
,那么你需要这样做:
@Bean
public Publisher<Message<Object>> reactiveFlow() {
return IntegrationFlows.from(someReactiveInboundChannelAdapter)
.toReactivePublisher();
}
然后在需要时注入 Publisher
。然后执行 Flux.from(publisher)
和您需要的任何反应运算符,包括 subscribe()
.
IntegrationFlowAdapter
不能用于此类配置,因为它不能接受 IntegrationFlow
作为 buildFlow()
的结果。
不过 fluxTransform()
也可以为您做到这一点:
.fluxTransform(flux -> flux.as(Mono::just))
因此,下游流的负载将是一个 Flux<Message<?>>
,您可以自己处理。 fluxTransform()
返回的 Mono
将由框架订阅。它的 Flux
值是您在下游流程中的责任。然后你可以使用普通的 MessageHandler
:
.handle(m -> ((Flux<Message<?>>) m.getPayload())....subscribe())