Reactive Spring IntegrationFlow 中间的集成访问流量

Reactive Spring Integration access flux in the middle of an IntegrationFlow

我已经看到关于 accessing flux at the middle of an IntegrationFlow 的问题,我想知道为什么我通过以下方式在 flux 中成功编写了逻辑:

public void writeToSolr(IntegrationFlowDefinition<?> flowDefinition) {
    flowDefinition
        .bridge(e -> e.reactive(flux -> a ->
           flux.log("write to solr")
               .flatMap(writeToSolr)
               .subscribe()));
}

我首先想知道为什么我从来没有将错误抛出到控制台,但在调试时却看到错误。 我还想知道这是如何工作的以及为什么我需要 a 变量(它总是产生 NullPointerException,即使流程可以继续并正常工作)。当我省略 a 变量时:

public void writeToSolr(IntegrationFlowDefinition<?> flowDefinition) {
    flowDefinition
        .bridge(e -> e.reactive(flux ->
           flux.log("write to solr")
               .flatMap(writeToSolr)
               .subscribe()));
}

我遇到异常 Bad return type in lambda expression: Disposable cannot be converted to Publisher<Message<?>> - 例如由于类型问题,代码无法编译代码。

你在e.reactive()中写的逻辑不正确。 请参阅该端点配置器选项的文档以了解其用途:

/**
 * Make the consumer endpoint as reactive independently of an input channel and
 * apply the provided function into the {@link Flux#transform(Function)} operator.
 * @param reactiveCustomizer the function to transform {@link Flux} for the input channel.
 * @return the spec
 * @since 5.5
 */
public S reactive(Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> reactiveCustomizer) {

https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-reactive

您不能在此配置器中执行 flatMap() 等“主动”操作。您完全消除了流程的目的。 flatMap 必须在下游的处理程序中完成。这样的 bridge 没有任何意义。它只是将当前的流动状态变成反应性的。当具有类似 reactive() 配置器的 handle() 会执行相同的操作时,还会应用此处理程序的目的 - 它的句柄部分。

.subscribe() 根本不正确。让框架在 Spring 集成之上处理提供的反应流!这就是为什么你用 flux -> a -> 误导自己。它确实编译并让你 运行 因为它不是编译或配置部分。它确实是在 运行 时评估的回调,当您已经发送消息时。

writeToSolr可以这样使用:

flowDefinition
    .channel(c -> c.flux())
    .handle(new ReactiveMessageHandlerAdapter((message) -> writeToSolr(message.getPayload())))

我认为我们将修改端点的 reactive() 以仅公开那些仅用于配置的 Flux 运算符。其余部分超出当前端点配置:必须在目标处理程序方法逻辑中完成。

此外,我认为我们可以引入 handleReactive(ReactiveMessageHandler) 作为 IntegrationFlow 的终端运算符,以简化 ReactiveMessageHandlerAdapter 的用法。