如何在 Reactive Spring Integration 中处理反应类型?

How to handle reactive types within Reactive Spring Integration?

我正在尝试使用 Reactive Spring 集成,我尝试执行以下基本操作:

    @Override
    protected IntegrationFlowDefinition<?> buildFlow() {
        return from(someReactiveInboundChannelAdapter)
                .handle(new ReactiveMessageHandlerAdapter(flux -> flux.subscribe(System.out::println)));
    }

但这不起作用,因为框架无法识别 flux 是一个 Flux<?> 实例。该框架将其视为 Message<?>,我不知道如何以及从何处开始编写 Reactor 代码。

没错。 ReactiveMessageHandlerAdapter 期望 ReactiveMessageHandler 的 lambda 是:

Mono<Void> handleMessage(Message<?> message);

我不确定是什么让您认为输入必须是 Flux

我假设您的 someReactiveInboundChannelAdapterMessageProducerSupport 的扩展并且完全符合 subscribeToPublisher(Publisher<? extends Message<?>> publisher) 逻辑。这样做的目的是以反应方式从源中获取数据,但仍将每个事件作为消息发送到下游通道。因此,应该清楚的是 handle() 将收到一条包含单个项目的消息,而不是整个 Flux.

如果您想将流量视为通量,请考虑使用 fluxTransform() 运算符。

另外最好不要自己订阅,而是在配置和启动结束后在框架中订阅。

从技术上讲,您不应该考虑反应类型。你只需要分别配置一个流程,只为单个项目编写逻辑:框架为你做一个反应式交互。 Project Reactor 是一个围绕 FluxMono 的库。这就是我们谈论反应类型的地方。 Spring 集成是一个消息传递框架,它的通信是通过消息完成的。最后,对于每条消息,端点之间的交互是否以反应方式完成都无关紧要。因此,您的处理逻辑可以不受反应类型的影响。

更新

如果你想从 someReactiveInboundChannelAdapter 完全控制 Flux,那么你需要这样做:

    @Bean
    public Publisher<Message<Object>> reactiveFlow() {
          return IntegrationFlows.from(someReactiveInboundChannelAdapter)
          .toReactivePublisher();
    }

然后在需要时注入 Publisher。然后执行 Flux.from(publisher) 和您需要的任何反应运算符,包括 subscribe().

这里有一些示例:https://github.com/spring-projects/spring-integration/blob/main/spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java

IntegrationFlowAdapter 不能用于此类配置,因为它不能接受 IntegrationFlow 作为 buildFlow() 的结果。

不过 fluxTransform() 也可以为您做到这一点:

.fluxTransform(flux -> flux.as(Mono::just))

因此,下游流的负载将是一个 Flux<Message<?>>,您可以自己处理。 fluxTransform() 返回的 Mono 将由框架订阅。它的 Flux 值是您在下游流程中的责任。然后你可以使用普通的 MessageHandler:

.handle(m -> ((Flux<Message<?>>) m.getPayload())....subscribe())