Spring 集成:通过配置连接到多个 MQ 服务器
Spring Integration: connection to multiple MQ servers by config
我有一个 Spring Boot 5 应用程序,我也有它 运行 针对一台 IBM MQ 服务器。
现在我们希望它连接到三个或更多的 MQ 服务器。我现在的目的是将 XY 连接信息添加到环境中,然后我得到 XY MQConnectionFactory bean 和处理所需的所有其他 bean。
目前我拥有的是:
@Bean
@Qualifier(value="MQConnection")
public MQConnectionFactory getIbmConnectionFactory() throws JMSException {
MQConnectionFactory factory = new MQConnectionFactory();
// seeting all the parameters here
return factory;
}
但这是相当静态的。有没有一种优雅的方式来做到这一点?
我偶然发现了 IntegrationFlow。这是一个可行的解决方案吗?
谢谢你的小费!
韩国
解决方案
基于 Artem Bilan's 的响应,我构建了这个 class。
@Configuration
public class ConnectionWithIntegrationFlowMulti {
protected static final Logger LOG = Logger.create();
@Value("${mq.queue.jms.sources.queue.queue-manager}")
private String queueManager;
@Autowired
private ConnectionConfig connectionConfig;
@Autowired
private SSLSocketFactory sslSocketFactory;
@Bean
public MessageChannel queureader() {
return new DirectChannel();
}
@Autowired
private IntegrationFlowContext flowContext;
@PostConstruct
public void processBeanDefinitionRegistry() throws BeansException {
Assert.notEmpty(connectionConfig.getTab().getLocations(), "At least one CCDT file locations must be provided.");
for (String tabLocation : connectionConfig.getTab().getLocations()) {
try {
IntegrationFlowRegistration theFlow = this.flowContext.registration(createFlow(tabLocation)).register();
LOG.info("Registered bean flow for %s with id = %s", queueManager, theFlow.getId());
} catch (JMSException e) {
LOG.error(e);
}
}
}
public IntegrationFlow createFlow(String tabLocation) throws JMSException {
LOG.info("creating ibmInbound");
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(getConnection(tabLocation)).destination(createDestinationBean()))
.handle(m -> LOG.info("received payload: " + m.getPayload().toString()))
.get();
}
public MQConnectionFactory getConnection(String tabLocation) throws JMSException {
MQConnectionFactory factory = new MQConnectionFactory();
// doing stuff
return factory;
}
@Bean
public MQQueue createDestinationBean() {
LOG.info("creating destination bean");
MQQueue queue = new MQQueue();
try {
queue.setBaseQueueManagerName(queueManager);
queue.setBaseQueueName(queueName);
} catch (Exception e) {
LOG.error(e, "destination bean: Error for integration flow");
}
return queue;
}
}
如果您可以静态地创建它们,您可以像现在这样创建 bean(每个都有一个唯一的限定符),但是您可以通过 [=10= 在您的服务/组件中动态访问它们] 字段或 @Autowired Map<String, MQConnectionFactory>
字段。 Spring 将使用类型为 MQConnectionFactory
的所有 bean 自动填充字段
在 Map 实现中,String
将是限定符值。
如果您还想根据某些属性等动态创建 bean,它会变得有点复杂。您需要按照
的思路进行研究
通过 Spring 集成,您可以在运行时动态创建 IntegrationFlow
个实例。为此,有一个 IntegrationFlowContext
及其 registration()
API。返回的 IntegrationFlowRegistrationBuilder
作为回调,如:
/**
* Add an object which will be registered as an {@link IntegrationFlow} dependant bean in the
* application context. Usually it is some support component, which needs an application context.
* For example dynamically created connection factories or header mappers for AMQP, JMS, TCP etc.
* @param bean an additional arbitrary bean to register into the application context.
* @return the current builder instance
*/
IntegrationFlowRegistrationBuilder addBean(Object bean);
因此,您的 MQConnectionFactory
实例可以与其他流一起填充,用作特定 JMS 组件中的引用并注册为 bean。
我有一个 Spring Boot 5 应用程序,我也有它 运行 针对一台 IBM MQ 服务器。
现在我们希望它连接到三个或更多的 MQ 服务器。我现在的目的是将 XY 连接信息添加到环境中,然后我得到 XY MQConnectionFactory bean 和处理所需的所有其他 bean。
目前我拥有的是:
@Bean
@Qualifier(value="MQConnection")
public MQConnectionFactory getIbmConnectionFactory() throws JMSException {
MQConnectionFactory factory = new MQConnectionFactory();
// seeting all the parameters here
return factory;
}
但这是相当静态的。有没有一种优雅的方式来做到这一点?
我偶然发现了 IntegrationFlow。这是一个可行的解决方案吗?
谢谢你的小费!
韩国
解决方案
基于 Artem Bilan's 的响应,我构建了这个 class。
@Configuration
public class ConnectionWithIntegrationFlowMulti {
protected static final Logger LOG = Logger.create();
@Value("${mq.queue.jms.sources.queue.queue-manager}")
private String queueManager;
@Autowired
private ConnectionConfig connectionConfig;
@Autowired
private SSLSocketFactory sslSocketFactory;
@Bean
public MessageChannel queureader() {
return new DirectChannel();
}
@Autowired
private IntegrationFlowContext flowContext;
@PostConstruct
public void processBeanDefinitionRegistry() throws BeansException {
Assert.notEmpty(connectionConfig.getTab().getLocations(), "At least one CCDT file locations must be provided.");
for (String tabLocation : connectionConfig.getTab().getLocations()) {
try {
IntegrationFlowRegistration theFlow = this.flowContext.registration(createFlow(tabLocation)).register();
LOG.info("Registered bean flow for %s with id = %s", queueManager, theFlow.getId());
} catch (JMSException e) {
LOG.error(e);
}
}
}
public IntegrationFlow createFlow(String tabLocation) throws JMSException {
LOG.info("creating ibmInbound");
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(getConnection(tabLocation)).destination(createDestinationBean()))
.handle(m -> LOG.info("received payload: " + m.getPayload().toString()))
.get();
}
public MQConnectionFactory getConnection(String tabLocation) throws JMSException {
MQConnectionFactory factory = new MQConnectionFactory();
// doing stuff
return factory;
}
@Bean
public MQQueue createDestinationBean() {
LOG.info("creating destination bean");
MQQueue queue = new MQQueue();
try {
queue.setBaseQueueManagerName(queueManager);
queue.setBaseQueueName(queueName);
} catch (Exception e) {
LOG.error(e, "destination bean: Error for integration flow");
}
return queue;
}
}
如果您可以静态地创建它们,您可以像现在这样创建 bean(每个都有一个唯一的限定符),但是您可以通过 [=10= 在您的服务/组件中动态访问它们] 字段或 @Autowired Map<String, MQConnectionFactory>
字段。 Spring 将使用类型为 MQConnectionFactory
在 Map 实现中,String
将是限定符值。
如果您还想根据某些属性等动态创建 bean,它会变得有点复杂。您需要按照
通过 Spring 集成,您可以在运行时动态创建 IntegrationFlow
个实例。为此,有一个 IntegrationFlowContext
及其 registration()
API。返回的 IntegrationFlowRegistrationBuilder
作为回调,如:
/**
* Add an object which will be registered as an {@link IntegrationFlow} dependant bean in the
* application context. Usually it is some support component, which needs an application context.
* For example dynamically created connection factories or header mappers for AMQP, JMS, TCP etc.
* @param bean an additional arbitrary bean to register into the application context.
* @return the current builder instance
*/
IntegrationFlowRegistrationBuilder addBean(Object bean);
因此,您的 MQConnectionFactory
实例可以与其他流一起填充,用作特定 JMS 组件中的引用并注册为 bean。