Spring 集成:通过配置连接到多个 MQ 服务器

Spring Integration: connection to multiple MQ servers by config

我有一个 Spring Boot 5 应用程序,我也有它 运行 针对一台 IBM MQ 服务器。

现在我们希望它连接到三个或更多的 MQ 服务器。我现在的目的是将 XY 连接信息添加到环境中,然后我得到 XY MQConnectionFactory bean 和处理所需的所有其他 bean。

目前我拥有的是:

    @Bean
    @Qualifier(value="MQConnection")
    public MQConnectionFactory getIbmConnectionFactory() throws JMSException {
        MQConnectionFactory factory = new MQConnectionFactory();
// seeting all the parameters here

        return factory;
    }

但这是相当静态的。有没有一种优雅的方式来做到这一点?

我偶然发现了 IntegrationFlow。这是一个可行的解决方案吗?

谢谢你的小费!

韩国

解决方案

基于 Artem Bilan's 的响应,我构建了这个 class。

@Configuration
public class ConnectionWithIntegrationFlowMulti {
    protected static final Logger LOG = Logger.create();
    
    @Value("${mq.queue.jms.sources.queue.queue-manager}")
    private String queueManager;


    @Autowired
    private ConnectionConfig connectionConfig;


    @Autowired
    private SSLSocketFactory sslSocketFactory;

    @Bean
    public MessageChannel queureader() {
        return new DirectChannel();
    }

    @Autowired
    private IntegrationFlowContext flowContext;
    
    @PostConstruct
    public void processBeanDefinitionRegistry() throws BeansException {

        Assert.notEmpty(connectionConfig.getTab().getLocations(), "At least one CCDT file locations must be provided.");
        for (String tabLocation : connectionConfig.getTab().getLocations()) {
            try {
                IntegrationFlowRegistration theFlow = this.flowContext.registration(createFlow(tabLocation)).register();
                LOG.info("Registered bean flow for %s with id = %s", queueManager, theFlow.getId());
            } catch (JMSException e) {
                LOG.error(e);
            }
        }
    }
    
    public IntegrationFlow createFlow(String tabLocation) throws JMSException {
        LOG.info("creating ibmInbound");
        return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(getConnection(tabLocation)).destination(createDestinationBean()))
                .handle(m -> LOG.info("received payload: " + m.getPayload().toString()))
                .get();
    }

    
    public MQConnectionFactory getConnection(String tabLocation) throws JMSException {
        MQConnectionFactory factory = new MQConnectionFactory();
        
        // doing stuff
        
        return factory;
    }

    @Bean
    public MQQueue createDestinationBean() {
        LOG.info("creating destination bean");
        MQQueue queue = new MQQueue();

        try {
            queue.setBaseQueueManagerName(queueManager);
            queue.setBaseQueueName(queueName);

        } catch (Exception e) {
            LOG.error(e, "destination bean: Error for integration flow");
        }
        return queue;
    }


}

如果您可以静态地创建它们,您可以像现在这样创建 bean(每个都有一个唯一的限定符),但是您可以通过 [=10= 在您的服务/组件中动态访问它们] 字段或 @Autowired Map<String, MQConnectionFactory> 字段。 Spring 将使用类型为 MQConnectionFactory

的所有 bean 自动填充字段

在 Map 实现中,String 将是限定符值。

如果您还想根据某些属性等动态创建 bean,它会变得有点复杂。您需要按照

的思路进行研究

通过 Spring 集成,您可以在运行时动态创建 IntegrationFlow 个实例。为此,有一个 IntegrationFlowContext 及其 registration() API。返回的 IntegrationFlowRegistrationBuilder 作为回调,如:

/**
     * Add an object which will be registered as an {@link IntegrationFlow} dependant bean in the
     * application context. Usually it is some support component, which needs an application context.
     * For example dynamically created connection factories or header mappers for AMQP, JMS, TCP etc.
     * @param bean an additional arbitrary bean to register into the application context.
     * @return the current builder instance
     */
    IntegrationFlowRegistrationBuilder addBean(Object bean);

因此,您的 MQConnectionFactory 实例可以与其他流一起填充,用作特定 JMS 组件中的引用并注册为 bean。

在文档中查看更多信息:https://docs.spring.io/spring-integration/docs/5.2.3.RELEASE/reference/html/dsl.html#java-dsl-runtime-flows