这是通过 Spring 集成流程处理结果的正确方法吗?

Is this the correct way to process results through the Spring Integration Flow?

我目前正在做一个个人项目 - 我需要我的 Spring 应用程序从 EMQX(MQTT 服务器)获取查询并查询其数据以获得相应的结果,然后将结果推送到带有查询 UUID 的主题。

这是有效的 - 经过几个小时了解 Spring 集成框架的工作原理。但我认为处理程序使用“块”的方式是不正确的 - 并且不符合集成流程应该运行的方式。虽然这项工作有效,但我确实想确保它正在正确完成 - 出于对工作的尊重 - 并避免未来出现问题。

下面的代码片段应该足以理解我要实现的目标 - 以及潜在问题所在。

    @Bean
fun mqttInFlow() : Publisher<Message<String>> {
    return IntegrationFlows.from(inbound())
        .handle<String> { payload, headers ->
            val emotionalOutput: EmotionalOutput = gson.fromJson(payload, EmotionalOutput::class.java)
            emotionalPrintService.populateEmotionalOutput(emotionalOutput).map {
                MessageBuilder.withPayload(gson.toJson(it))
                    .copyHeaders(headers)
                    .setHeader(MqttHeaders.TOPIC, "query/" + it.query_uuid).build()
            }.block()
        }
        .channel(outgoingChannel())
        .toReactivePublisher()
}

编辑 - 感谢您的建议 - 这是我所理解的 Kotlin DSL 解决方案的潜在编辑 - 现在产生错误 - 抱怨 output-channel 或 replyChannel 不可用 - 什么都没有此函数之外的内容已更改。

    @Bean
fun newMqttInFlow() =
    integrationFlow (inbound()) {
       wireTap {
            handle<String> { payload, headers ->
                gson.fromJson<EmotionalOutput>(payload, EmotionalOutput::class.java).let { emotionalOutput ->
                    emotionalPrintService.populateEmotionalOutput(emotionalOutput).map { populatedEmotionalOutput ->
                        MessageBuilder.withPayload(gson.toJson(populatedEmotionalOutput))
                            .copyHeaders(headers)
                            .setHeader(MqttHeaders.TOPIC, populatedEmotionalOutput.query_uuid)
                    }
                }
            }
        }
        channel("outgoingChannel")
    }

例外是:

例外是 org.springframework.messaging.core.DestinationResolutionException:没有 output-channel 或 replyChannel header 可用

虽然我有多年使用 Java 的经验 - 这种方法是新的 - 非常感谢您的帮助。值得赞赏。如果整个 class 有用 - 我可以 post 那。

编辑

这是配置文件 - 它可以更好地了解可能导致此次要错误的原因 -

021-03-28 21:59:48.008 ERROR 84492 --- [T Call: divnrin] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'mqttOutbound'; defined in: 'class path resource [io/divnr/appserver/configuration/MQTTConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@4a9419d7']; nested exception is java.lang.IllegalArgumentException: This default converter can only handle 'byte[]' or 'String' payloads; consider adding a transformer to your flow definition, or provide a BytesMessageMapper, or subclass this converter for reactor.core.publisher.MonoMapFuseable payloads, failedMessage=GenericMessage [payload=MonoMapFuseable, headers={mqtt_receivedRetained=false, mqtt_id=0, mqtt_duplicate=false, id=c5a75283-c0fe-ebac-4168-dabddd989da9, mqtt_receivedTopic=source/d9e50e8f-67e0-4505-7ca2-4d05b1242207, mqtt_receivedQos=0, timestamp=1616961588004}] at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) at

这里提供了完整的class。

@Configuration
@EnableIntegration
@IntegrationComponentScan
class MQTTConfiguration(val emotionalPrintService: EmotionalPrintService,
                    val gson: Gson,
                    val applicationConfiguration: ApplicationConfiguration) {

@Bean
fun mqttServiceFactory() : MqttPahoClientFactory {
    return DefaultMqttPahoClientFactory().apply {
        connectionOptions = MqttConnectOptions().apply {
            serverURIs = arrayOf<String>(applicationConfiguration.mqttServerAddress)
        }
    }
}

@Bean
fun newMqttInFlow() =
    integrationFlow (inbound()) {
        handle<String> { payload, headers ->
            gson.fromJson<EmotionalOutput>(payload, EmotionalOutput::class.java).let { emotionalOutput ->
                emotionalPrintService.populateEmotionalOutput(emotionalOutput).map { populatedEmotionalOutput ->
                    MessageBuilder.withPayload(gson.toJson(populatedEmotionalOutput))
                        .copyHeaders(headers)
                        .setHeader(MqttHeaders.TOPIC, populatedEmotionalOutput.query_uuid).build()
                }
            }
        }
        channel(outgoingChannel())
    }

@Bean
@ServiceActivator(requiresReply = "false", inputChannel = "outgoingChannel")
fun mqttOutbound(): MessageHandler {
    val messageHandler = MqttPahoMessageHandler("divnrout", mqttServiceFactory())
    messageHandler.setAsync(true)
    return messageHandler
}

@Bean
fun outgoingChannel() : FluxMessageChannel {
    return FluxMessageChannel()
}

@Bean
fun inbound(): MessageProducerSupport {
    return MqttPahoMessageDrivenChannelAdapter("divnrin", mqttServiceFactory(),
        "source/" + applicationConfiguration.sourceUuid).apply {
        setConverter(DefaultPahoMessageConverter())
        setQos(1)
    }
}
}

您确实不需要 handle() 末尾的那个 block()。您只需 return 来自 emotionalPrintService.populateEmotionalOutput()Mono,框架将为您提供正确的订阅和背压处理。

您还需要将 outgoingChannel() 设为 FluxMessageChannel。 在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html#reactive-streams

另外考虑将您的 IntegrationFlow 解决方案移至正确的 Kotlin DSL:https://docs.spring.io/spring-integration/docs/current/reference/html/kotlin-dsl.html#kotlin-dsl

另外:当它是一个 FluxMessageChannel 在流的末端时,没有理由担心 toReactivePublisher() - FluxMessageChannel 是一个 Publisher<Message<?>> by本身。

更新

问题出在这里:

handle<String>( { payload, headers ->
        gson.fromJson<EmotionalOutput>(payload, EmotionalOutput::class.java).let { emotionalOutput ->
            emotionalPrintService.populateEmotionalOutput(emotionalOutput).map { populatedEmotionalOutput ->
                MessageBuilder.withPayload(gson.toJson(populatedEmotionalOutput))
                    .copyHeaders(headers)
                    .setHeader(MqttHeaders.TOPIC, populatedEmotionalOutput.query_uuid).build()
            }
        }
    }) { async(true) }

查看 async(true) 选项。不幸的是,在当前版本中,我们不允许它默认以反应方式处理反应回复。你必须说你想在这个端点是异步的。因此,您的 Publisher 回复和 FluxMessageChannel 作为输出将起到正确的作用。