这是通过 Spring 集成流程处理结果的正确方法吗?
Is this the correct way to process results through the Spring Integration Flow?
我目前正在做一个个人项目 - 我需要我的 Spring 应用程序从 EMQX(MQTT 服务器)获取查询并查询其数据以获得相应的结果,然后将结果推送到带有查询 UUID 的主题。
这是有效的 - 经过几个小时了解 Spring 集成框架的工作原理。但我认为处理程序使用“块”的方式是不正确的 - 并且不符合集成流程应该运行的方式。虽然这项工作有效,但我确实想确保它正在正确完成 - 出于对工作的尊重 - 并避免未来出现问题。
下面的代码片段应该足以理解我要实现的目标 - 以及潜在问题所在。
@Bean
fun mqttInFlow() : Publisher<Message<String>> {
return IntegrationFlows.from(inbound())
.handle<String> { payload, headers ->
val emotionalOutput: EmotionalOutput = gson.fromJson(payload, EmotionalOutput::class.java)
emotionalPrintService.populateEmotionalOutput(emotionalOutput).map {
MessageBuilder.withPayload(gson.toJson(it))
.copyHeaders(headers)
.setHeader(MqttHeaders.TOPIC, "query/" + it.query_uuid).build()
}.block()
}
.channel(outgoingChannel())
.toReactivePublisher()
}
编辑 - 感谢您的建议 - 这是我所理解的 Kotlin DSL 解决方案的潜在编辑 - 现在产生错误 - 抱怨 output-channel 或 replyChannel 不可用 - 什么都没有此函数之外的内容已更改。
@Bean
fun newMqttInFlow() =
integrationFlow (inbound()) {
wireTap {
handle<String> { payload, headers ->
gson.fromJson<EmotionalOutput>(payload, EmotionalOutput::class.java).let { emotionalOutput ->
emotionalPrintService.populateEmotionalOutput(emotionalOutput).map { populatedEmotionalOutput ->
MessageBuilder.withPayload(gson.toJson(populatedEmotionalOutput))
.copyHeaders(headers)
.setHeader(MqttHeaders.TOPIC, populatedEmotionalOutput.query_uuid)
}
}
}
}
channel("outgoingChannel")
}
例外是:
例外是 org.springframework.messaging.core.DestinationResolutionException:没有 output-channel 或 replyChannel header 可用
虽然我有多年使用 Java 的经验 - 这种方法是新的 - 非常感谢您的帮助。值得赞赏。如果整个 class 有用 - 我可以 post 那。
编辑
这是配置文件 - 它可以更好地了解可能导致此次要错误的原因 -
021-03-28 21:59:48.008 ERROR 84492 --- [T Call: divnrin] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'mqttOutbound'; defined in: 'class path resource [io/divnr/appserver/configuration/MQTTConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@4a9419d7']; nested exception is java.lang.IllegalArgumentException: This default converter can only handle 'byte[]' or 'String' payloads; consider adding a transformer to your flow definition, or provide a BytesMessageMapper, or subclass this converter for reactor.core.publisher.MonoMapFuseable payloads, failedMessage=GenericMessage [payload=MonoMapFuseable, headers={mqtt_receivedRetained=false, mqtt_id=0, mqtt_duplicate=false, id=c5a75283-c0fe-ebac-4168-dabddd989da9, mqtt_receivedTopic=source/d9e50e8f-67e0-4505-7ca2-4d05b1242207, mqtt_receivedQos=0, timestamp=1616961588004}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
at
这里提供了完整的class。
@Configuration
@EnableIntegration
@IntegrationComponentScan
class MQTTConfiguration(val emotionalPrintService: EmotionalPrintService,
val gson: Gson,
val applicationConfiguration: ApplicationConfiguration) {
@Bean
fun mqttServiceFactory() : MqttPahoClientFactory {
return DefaultMqttPahoClientFactory().apply {
connectionOptions = MqttConnectOptions().apply {
serverURIs = arrayOf<String>(applicationConfiguration.mqttServerAddress)
}
}
}
@Bean
fun newMqttInFlow() =
integrationFlow (inbound()) {
handle<String> { payload, headers ->
gson.fromJson<EmotionalOutput>(payload, EmotionalOutput::class.java).let { emotionalOutput ->
emotionalPrintService.populateEmotionalOutput(emotionalOutput).map { populatedEmotionalOutput ->
MessageBuilder.withPayload(gson.toJson(populatedEmotionalOutput))
.copyHeaders(headers)
.setHeader(MqttHeaders.TOPIC, populatedEmotionalOutput.query_uuid).build()
}
}
}
channel(outgoingChannel())
}
@Bean
@ServiceActivator(requiresReply = "false", inputChannel = "outgoingChannel")
fun mqttOutbound(): MessageHandler {
val messageHandler = MqttPahoMessageHandler("divnrout", mqttServiceFactory())
messageHandler.setAsync(true)
return messageHandler
}
@Bean
fun outgoingChannel() : FluxMessageChannel {
return FluxMessageChannel()
}
@Bean
fun inbound(): MessageProducerSupport {
return MqttPahoMessageDrivenChannelAdapter("divnrin", mqttServiceFactory(),
"source/" + applicationConfiguration.sourceUuid).apply {
setConverter(DefaultPahoMessageConverter())
setQos(1)
}
}
}
您确实不需要 handle()
末尾的那个 block()
。您只需 return 来自 emotionalPrintService.populateEmotionalOutput()
的 Mono
,框架将为您提供正确的订阅和背压处理。
您还需要将 outgoingChannel()
设为 FluxMessageChannel
。
在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html#reactive-streams
另外考虑将您的 IntegrationFlow
解决方案移至正确的 Kotlin DSL:https://docs.spring.io/spring-integration/docs/current/reference/html/kotlin-dsl.html#kotlin-dsl
另外:当它是一个 FluxMessageChannel
在流的末端时,没有理由担心 toReactivePublisher()
- FluxMessageChannel
是一个 Publisher<Message<?>>
by本身。
更新
问题出在这里:
handle<String>( { payload, headers ->
gson.fromJson<EmotionalOutput>(payload, EmotionalOutput::class.java).let { emotionalOutput ->
emotionalPrintService.populateEmotionalOutput(emotionalOutput).map { populatedEmotionalOutput ->
MessageBuilder.withPayload(gson.toJson(populatedEmotionalOutput))
.copyHeaders(headers)
.setHeader(MqttHeaders.TOPIC, populatedEmotionalOutput.query_uuid).build()
}
}
}) { async(true) }
查看 async(true)
选项。不幸的是,在当前版本中,我们不允许它默认以反应方式处理反应回复。你必须说你想在这个端点是异步的。因此,您的 Publisher
回复和 FluxMessageChannel
作为输出将起到正确的作用。
我目前正在做一个个人项目 - 我需要我的 Spring 应用程序从 EMQX(MQTT 服务器)获取查询并查询其数据以获得相应的结果,然后将结果推送到带有查询 UUID 的主题。
这是有效的 - 经过几个小时了解 Spring 集成框架的工作原理。但我认为处理程序使用“块”的方式是不正确的 - 并且不符合集成流程应该运行的方式。虽然这项工作有效,但我确实想确保它正在正确完成 - 出于对工作的尊重 - 并避免未来出现问题。
下面的代码片段应该足以理解我要实现的目标 - 以及潜在问题所在。
@Bean
fun mqttInFlow() : Publisher<Message<String>> {
return IntegrationFlows.from(inbound())
.handle<String> { payload, headers ->
val emotionalOutput: EmotionalOutput = gson.fromJson(payload, EmotionalOutput::class.java)
emotionalPrintService.populateEmotionalOutput(emotionalOutput).map {
MessageBuilder.withPayload(gson.toJson(it))
.copyHeaders(headers)
.setHeader(MqttHeaders.TOPIC, "query/" + it.query_uuid).build()
}.block()
}
.channel(outgoingChannel())
.toReactivePublisher()
}
编辑 - 感谢您的建议 - 这是我所理解的 Kotlin DSL 解决方案的潜在编辑 - 现在产生错误 - 抱怨 output-channel 或 replyChannel 不可用 - 什么都没有此函数之外的内容已更改。
@Bean
fun newMqttInFlow() =
integrationFlow (inbound()) {
wireTap {
handle<String> { payload, headers ->
gson.fromJson<EmotionalOutput>(payload, EmotionalOutput::class.java).let { emotionalOutput ->
emotionalPrintService.populateEmotionalOutput(emotionalOutput).map { populatedEmotionalOutput ->
MessageBuilder.withPayload(gson.toJson(populatedEmotionalOutput))
.copyHeaders(headers)
.setHeader(MqttHeaders.TOPIC, populatedEmotionalOutput.query_uuid)
}
}
}
}
channel("outgoingChannel")
}
例外是:
例外是 org.springframework.messaging.core.DestinationResolutionException:没有 output-channel 或 replyChannel header 可用
虽然我有多年使用 Java 的经验 - 这种方法是新的 - 非常感谢您的帮助。值得赞赏。如果整个 class 有用 - 我可以 post 那。
编辑
这是配置文件 - 它可以更好地了解可能导致此次要错误的原因 -
021-03-28 21:59:48.008 ERROR 84492 --- [T Call: divnrin] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'mqttOutbound'; defined in: 'class path resource [io/divnr/appserver/configuration/MQTTConfiguration.class]'; from source: 'org.springframework.core.type.classreading.SimpleMethodMetadata@4a9419d7']; nested exception is java.lang.IllegalArgumentException: This default converter can only handle 'byte[]' or 'String' payloads; consider adding a transformer to your flow definition, or provide a BytesMessageMapper, or subclass this converter for reactor.core.publisher.MonoMapFuseable payloads, failedMessage=GenericMessage [payload=MonoMapFuseable, headers={mqtt_receivedRetained=false, mqtt_id=0, mqtt_duplicate=false, id=c5a75283-c0fe-ebac-4168-dabddd989da9, mqtt_receivedTopic=source/d9e50e8f-67e0-4505-7ca2-4d05b1242207, mqtt_receivedQos=0, timestamp=1616961588004}] at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) at
这里提供了完整的class。
@Configuration
@EnableIntegration
@IntegrationComponentScan
class MQTTConfiguration(val emotionalPrintService: EmotionalPrintService,
val gson: Gson,
val applicationConfiguration: ApplicationConfiguration) {
@Bean
fun mqttServiceFactory() : MqttPahoClientFactory {
return DefaultMqttPahoClientFactory().apply {
connectionOptions = MqttConnectOptions().apply {
serverURIs = arrayOf<String>(applicationConfiguration.mqttServerAddress)
}
}
}
@Bean
fun newMqttInFlow() =
integrationFlow (inbound()) {
handle<String> { payload, headers ->
gson.fromJson<EmotionalOutput>(payload, EmotionalOutput::class.java).let { emotionalOutput ->
emotionalPrintService.populateEmotionalOutput(emotionalOutput).map { populatedEmotionalOutput ->
MessageBuilder.withPayload(gson.toJson(populatedEmotionalOutput))
.copyHeaders(headers)
.setHeader(MqttHeaders.TOPIC, populatedEmotionalOutput.query_uuid).build()
}
}
}
channel(outgoingChannel())
}
@Bean
@ServiceActivator(requiresReply = "false", inputChannel = "outgoingChannel")
fun mqttOutbound(): MessageHandler {
val messageHandler = MqttPahoMessageHandler("divnrout", mqttServiceFactory())
messageHandler.setAsync(true)
return messageHandler
}
@Bean
fun outgoingChannel() : FluxMessageChannel {
return FluxMessageChannel()
}
@Bean
fun inbound(): MessageProducerSupport {
return MqttPahoMessageDrivenChannelAdapter("divnrin", mqttServiceFactory(),
"source/" + applicationConfiguration.sourceUuid).apply {
setConverter(DefaultPahoMessageConverter())
setQos(1)
}
}
}
您确实不需要 handle()
末尾的那个 block()
。您只需 return 来自 emotionalPrintService.populateEmotionalOutput()
的 Mono
,框架将为您提供正确的订阅和背压处理。
您还需要将 outgoingChannel()
设为 FluxMessageChannel
。
在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html#reactive-streams
另外考虑将您的 IntegrationFlow
解决方案移至正确的 Kotlin DSL:https://docs.spring.io/spring-integration/docs/current/reference/html/kotlin-dsl.html#kotlin-dsl
另外:当它是一个 FluxMessageChannel
在流的末端时,没有理由担心 toReactivePublisher()
- FluxMessageChannel
是一个 Publisher<Message<?>>
by本身。
更新
问题出在这里:
handle<String>( { payload, headers ->
gson.fromJson<EmotionalOutput>(payload, EmotionalOutput::class.java).let { emotionalOutput ->
emotionalPrintService.populateEmotionalOutput(emotionalOutput).map { populatedEmotionalOutput ->
MessageBuilder.withPayload(gson.toJson(populatedEmotionalOutput))
.copyHeaders(headers)
.setHeader(MqttHeaders.TOPIC, populatedEmotionalOutput.query_uuid).build()
}
}
}) { async(true) }
查看 async(true)
选项。不幸的是,在当前版本中,我们不允许它默认以反应方式处理反应回复。你必须说你想在这个端点是异步的。因此,您的 Publisher
回复和 FluxMessageChannel
作为输出将起到正确的作用。