Spring 集成 + 向 kafka 发送 2 个事件
Spring Integration + send 2 events to kafka
我有一个消息网关,它在输入通道中接收一个 http Json 请求。
我想将 2 个事件作为同一请求的一部分发送到 kafka,一个事件用于接收,一个事件用于处理。
通过 Spring 集成实现此目的的最佳方法是什么?
我就是这样做的,而且有效,但不确定是否有更好的方法:
@Bean
public IntegrationFlow processMessage() {
return IntegrationFlows
.from("inputChannel")
.routeToRecipients(r -> r.recipient("inputChannel2")
.recipient("inputChannel3"))
.get();
}
@Bean
public IntegrationFlow sendReceived(MessageTransformer messageTransformer) {
return IntegrationFlows
.from("inputChannel2")
.transform(messageTransformer)
.handle( this.kafkaMessageHandler() )
.get();
}
@Bean
public IntegrationFlow sendProcessed(MessageTransformer2 messageTransformer) {
return IntegrationFlows
.from("inputChannel3")
.transform(messageTransformer)
.handle( this.kafkaMessageHandler() )
.get();
}
谢谢。
嗯,你做的还可以。实现相同目标的另一种方法是使用 poublishSubscribeChannel
代替:
https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-subflows
您也可以考虑对 Kafka 消息处理程序进行单一配置,并从这两个分发流中为其重用一个通道。
这是它完成工作的另一种方式:
@Bean
public IntegrationFlow subscribersFlow(MessageTransformer messageTransformer, MessageTransformer2 messageTransformer2) {
return IntegrationFlows
.from("inputChannel")
.publishSubscribeChannel(s -> s
.subscribe(f -> f
.transform(messageTransformer)
.handle( this.kafkaMessageHandler() )
)
.subscribe(f -> f
.transform(messageTransformer2)
.handle( this.kafkaMessageHandler() )
)
)
.get();
}
我有一个消息网关,它在输入通道中接收一个 http Json 请求。
我想将 2 个事件作为同一请求的一部分发送到 kafka,一个事件用于接收,一个事件用于处理。
通过 Spring 集成实现此目的的最佳方法是什么?
我就是这样做的,而且有效,但不确定是否有更好的方法:
@Bean
public IntegrationFlow processMessage() {
return IntegrationFlows
.from("inputChannel")
.routeToRecipients(r -> r.recipient("inputChannel2")
.recipient("inputChannel3"))
.get();
}
@Bean
public IntegrationFlow sendReceived(MessageTransformer messageTransformer) {
return IntegrationFlows
.from("inputChannel2")
.transform(messageTransformer)
.handle( this.kafkaMessageHandler() )
.get();
}
@Bean
public IntegrationFlow sendProcessed(MessageTransformer2 messageTransformer) {
return IntegrationFlows
.from("inputChannel3")
.transform(messageTransformer)
.handle( this.kafkaMessageHandler() )
.get();
}
谢谢。
嗯,你做的还可以。实现相同目标的另一种方法是使用 poublishSubscribeChannel
代替:
https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-subflows
您也可以考虑对 Kafka 消息处理程序进行单一配置,并从这两个分发流中为其重用一个通道。
这是它完成工作的另一种方式:
@Bean
public IntegrationFlow subscribersFlow(MessageTransformer messageTransformer, MessageTransformer2 messageTransformer2) {
return IntegrationFlows
.from("inputChannel")
.publishSubscribeChannel(s -> s
.subscribe(f -> f
.transform(messageTransformer)
.handle( this.kafkaMessageHandler() )
)
.subscribe(f -> f
.transform(messageTransformer2)
.handle( this.kafkaMessageHandler() )
)
)
.get();
}