Reactive Spring IntegrationFlow 中间的集成访问流量
Reactive Spring Integration access flux in the middle of an IntegrationFlow
我已经看到关于 accessing flux at the middle of an IntegrationFlow
的问题,我想知道为什么我通过以下方式在 flux 中成功编写了逻辑:
public void writeToSolr(IntegrationFlowDefinition<?> flowDefinition) {
flowDefinition
.bridge(e -> e.reactive(flux -> a ->
flux.log("write to solr")
.flatMap(writeToSolr)
.subscribe()));
}
我首先想知道为什么我从来没有将错误抛出到控制台,但在调试时却看到错误。
我还想知道这是如何工作的以及为什么我需要 a
变量(它总是产生 NullPointerException
,即使流程可以继续并正常工作)。当我省略 a
变量时:
public void writeToSolr(IntegrationFlowDefinition<?> flowDefinition) {
flowDefinition
.bridge(e -> e.reactive(flux ->
flux.log("write to solr")
.flatMap(writeToSolr)
.subscribe()));
}
我遇到异常 Bad return type in lambda expression: Disposable cannot be converted to Publisher<Message<?>>
- 例如由于类型问题,代码无法编译代码。
你在e.reactive()
中写的逻辑不正确。
请参阅该端点配置器选项的文档以了解其用途:
/**
* Make the consumer endpoint as reactive independently of an input channel and
* apply the provided function into the {@link Flux#transform(Function)} operator.
* @param reactiveCustomizer the function to transform {@link Flux} for the input channel.
* @return the spec
* @since 5.5
*/
public S reactive(Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> reactiveCustomizer) {
https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-reactive
您不能在此配置器中执行 flatMap()
等“主动”操作。您完全消除了流程的目的。 flatMap
必须在下游的处理程序中完成。这样的 bridge
没有任何意义。它只是将当前的流动状态变成反应性的。当具有类似 reactive()
配置器的 handle()
会执行相同的操作时,还会应用此处理程序的目的 - 它的句柄部分。
.subscribe()
根本不正确。让框架在 Spring 集成之上处理提供的反应流!这就是为什么你用 flux -> a ->
误导自己。它确实编译并让你 运行 因为它不是编译或配置部分。它确实是在 运行 时评估的回调,当您已经发送消息时。
writeToSolr
可以这样使用:
flowDefinition
.channel(c -> c.flux())
.handle(new ReactiveMessageHandlerAdapter((message) -> writeToSolr(message.getPayload())))
我认为我们将修改端点的 reactive()
以仅公开那些仅用于配置的 Flux
运算符。其余部分超出当前端点配置:必须在目标处理程序方法逻辑中完成。
此外,我认为我们可以引入 handleReactive(ReactiveMessageHandler)
作为 IntegrationFlow
的终端运算符,以简化 ReactiveMessageHandlerAdapter
的用法。
我已经看到关于 accessing flux at the middle of an IntegrationFlow
的问题,我想知道为什么我通过以下方式在 flux 中成功编写了逻辑:
public void writeToSolr(IntegrationFlowDefinition<?> flowDefinition) {
flowDefinition
.bridge(e -> e.reactive(flux -> a ->
flux.log("write to solr")
.flatMap(writeToSolr)
.subscribe()));
}
我首先想知道为什么我从来没有将错误抛出到控制台,但在调试时却看到错误。
我还想知道这是如何工作的以及为什么我需要 a
变量(它总是产生 NullPointerException
,即使流程可以继续并正常工作)。当我省略 a
变量时:
public void writeToSolr(IntegrationFlowDefinition<?> flowDefinition) {
flowDefinition
.bridge(e -> e.reactive(flux ->
flux.log("write to solr")
.flatMap(writeToSolr)
.subscribe()));
}
我遇到异常 Bad return type in lambda expression: Disposable cannot be converted to Publisher<Message<?>>
- 例如由于类型问题,代码无法编译代码。
你在e.reactive()
中写的逻辑不正确。
请参阅该端点配置器选项的文档以了解其用途:
/**
* Make the consumer endpoint as reactive independently of an input channel and
* apply the provided function into the {@link Flux#transform(Function)} operator.
* @param reactiveCustomizer the function to transform {@link Flux} for the input channel.
* @return the spec
* @since 5.5
*/
public S reactive(Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> reactiveCustomizer) {
https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-reactive
您不能在此配置器中执行 flatMap()
等“主动”操作。您完全消除了流程的目的。 flatMap
必须在下游的处理程序中完成。这样的 bridge
没有任何意义。它只是将当前的流动状态变成反应性的。当具有类似 reactive()
配置器的 handle()
会执行相同的操作时,还会应用此处理程序的目的 - 它的句柄部分。
.subscribe()
根本不正确。让框架在 Spring 集成之上处理提供的反应流!这就是为什么你用 flux -> a ->
误导自己。它确实编译并让你 运行 因为它不是编译或配置部分。它确实是在 运行 时评估的回调,当您已经发送消息时。
writeToSolr
可以这样使用:
flowDefinition
.channel(c -> c.flux())
.handle(new ReactiveMessageHandlerAdapter((message) -> writeToSolr(message.getPayload())))
我认为我们将修改端点的 reactive()
以仅公开那些仅用于配置的 Flux
运算符。其余部分超出当前端点配置:必须在目标处理程序方法逻辑中完成。
此外,我认为我们可以引入 handleReactive(ReactiveMessageHandler)
作为 IntegrationFlow
的终端运算符,以简化 ReactiveMessageHandlerAdapter
的用法。