抽象出站网关实现的最佳方式
best way to abstract outbound gateway implementation
我的 spring-integration 应用程序需要能够在 Kafka 和遗留消息库(tibco rendezvous,spring-integration 不提供任何默认出站网关)之间按需切换实施)通过简单的配置更改。
遗留消息传递库提供了一些基本的 request/reply 方法
Class LegacyTransport {
Object request(Object query,String topic);
}
我正在尝试找出什么是抽象消息出站网关(Kafka 和旧版)的最佳方法,以便我可以在我的主要 IntegrationFlow 中将一个交换为另一个(通过简单的配置更改)。
我目前的想法是使用以下方法作为我的主要 IntegrationFlow 的一部分:
IntegrationFlowDefinition.gateway(IntegrationFlow flow)
首先创建 2 个具有相同名称的条件子流工厂 bean 来包装我的每个消息传递网关:
@ConditionalOnProperty(name="messaging",havingValue=[TIBCO/KAFKA])
@Bean
Function<String,IntegrationFlow> gatewaySubflowFactory() {
return (String topic) -> ((IntegrationFlowDefinition<?> f) ->
f.handle(
[messaging library specific implementation here]
));
}
然后在我的主要 IntegrationFlow 中使用该 bean:
@Bean
public IntegrationFlow mainFlow(Function<String,IntegrationFlow> gatewaySubflowFactory)
return IntegrationFlows.from(INPUT_CHANNEL)
...
[do some useful processing here]
...
.gateway(gatewaySubflowFactory.apply("some_topic"))
...
[do more useful stuff with gateway output here]
...
.get()
有没有更好(更简单?)的方法?
非常感谢您的专业知识、想法和时间。
最好的问候
任何出站网关只是更通用的服务激活器模式的特定实现。因此,您的 LegacyTransport.request()
可以包装到服务激活器配置中。这是第一个。
第二:别忘了。永远不要忘记。 Spring 集成中的首批 class 公民之一是 MessageChannel
抽象:常规服务激活器,Kafka 的特定出站网关 - 与它们交互的主要点是消息通道配置端点输入。
因此,您的 Kafka 和 Tibco 流都可以从同一个通道开始。您的主流只是将其输出发送到该通道。有关详细信息,请参阅 IntegrationFlowDefinition.channel()
。
这两个特定的流程绝对可以用 @ConditionalOnProperty
标记,以免在运行时同时出现。
总结一下我的推理,这里是一些配置草稿:
@Bean
public IntegrationFlow mainFlow() {
return IntegrationFlows.from(INPUT_CHANNEL)
...
[do some useful processing here]
...
.gateway(OUTBOUND_GATEWAY_CHANNEL)
...
[do more useful stuff with gateway output here]
...
.get()
}
@ConditionalOnProperty(name="messaging",havingValue=KAFKA)
@Bean
public IntegrationFlow kafkaFlow() {
return IntegrationFlows.from(OUTBOUND_GATEWAY_CHANNEL)
.handle(Kafka.outboundGateway())
.get();
}
@ConditionalOnProperty(name="messaging",havingValue=TIBCO)
@Bean
public IntegrationFlow tibcoFlow() {
return IntegrationFlows.from(OUTBOUND_GATEWAY_CHANNEL)
.handle(legacyTransport, "request")
.get();
}
我的 spring-integration 应用程序需要能够在 Kafka 和遗留消息库(tibco rendezvous,spring-integration 不提供任何默认出站网关)之间按需切换实施)通过简单的配置更改。
遗留消息传递库提供了一些基本的 request/reply 方法
Class LegacyTransport {
Object request(Object query,String topic);
}
我正在尝试找出什么是抽象消息出站网关(Kafka 和旧版)的最佳方法,以便我可以在我的主要 IntegrationFlow 中将一个交换为另一个(通过简单的配置更改)。
我目前的想法是使用以下方法作为我的主要 IntegrationFlow 的一部分:
IntegrationFlowDefinition.gateway(IntegrationFlow flow)
首先创建 2 个具有相同名称的条件子流工厂 bean 来包装我的每个消息传递网关:
@ConditionalOnProperty(name="messaging",havingValue=[TIBCO/KAFKA])
@Bean
Function<String,IntegrationFlow> gatewaySubflowFactory() {
return (String topic) -> ((IntegrationFlowDefinition<?> f) ->
f.handle(
[messaging library specific implementation here]
));
}
然后在我的主要 IntegrationFlow 中使用该 bean:
@Bean
public IntegrationFlow mainFlow(Function<String,IntegrationFlow> gatewaySubflowFactory)
return IntegrationFlows.from(INPUT_CHANNEL)
...
[do some useful processing here]
...
.gateway(gatewaySubflowFactory.apply("some_topic"))
...
[do more useful stuff with gateway output here]
...
.get()
有没有更好(更简单?)的方法?
非常感谢您的专业知识、想法和时间。
最好的问候
任何出站网关只是更通用的服务激活器模式的特定实现。因此,您的 LegacyTransport.request()
可以包装到服务激活器配置中。这是第一个。
第二:别忘了。永远不要忘记。 Spring 集成中的首批 class 公民之一是 MessageChannel
抽象:常规服务激活器,Kafka 的特定出站网关 - 与它们交互的主要点是消息通道配置端点输入。
因此,您的 Kafka 和 Tibco 流都可以从同一个通道开始。您的主流只是将其输出发送到该通道。有关详细信息,请参阅 IntegrationFlowDefinition.channel()
。
这两个特定的流程绝对可以用 @ConditionalOnProperty
标记,以免在运行时同时出现。
总结一下我的推理,这里是一些配置草稿:
@Bean
public IntegrationFlow mainFlow() {
return IntegrationFlows.from(INPUT_CHANNEL)
...
[do some useful processing here]
...
.gateway(OUTBOUND_GATEWAY_CHANNEL)
...
[do more useful stuff with gateway output here]
...
.get()
}
@ConditionalOnProperty(name="messaging",havingValue=KAFKA)
@Bean
public IntegrationFlow kafkaFlow() {
return IntegrationFlows.from(OUTBOUND_GATEWAY_CHANNEL)
.handle(Kafka.outboundGateway())
.get();
}
@ConditionalOnProperty(name="messaging",havingValue=TIBCO)
@Bean
public IntegrationFlow tibcoFlow() {
return IntegrationFlows.from(OUTBOUND_GATEWAY_CHANNEL)
.handle(legacyTransport, "request")
.get();
}