Spring 集成 - 请求-回复实施

Spring Integration - Request-Reply Implementation

我是 Spring 集成的新手,也是 Stack Overflow 的新手。我正在寻求一些帮助来理解 Spring 集成,因为它与请求-回复模式相关。通过在网上阅读,我认为我应该使用服务激活器来启用此类用例。

我正在使用 JMS 来促进基于 XML 的消息的发送和接收。我们的下划线实施是 IBM Websphere MQ。

我也在使用 Spring Boot(1.3 版。6.RELEASE)并尝试使用基于纯注解的配置方法(如果可能的话)。我已经在网上搜索并看到了一些示例,但到目前为止我看不到任何可以帮助我理解它们如何组合在一起的东西。 Spring 集成文档非常好,但我仍在为如何将所有部分组合在一起而苦苦挣扎。如果有什么地方我错过了,我提前道歉。我把在这里发帖作为最后的选择。

这是我的配置:

package com.daluga.spring.integration.configuration

import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.mq.jms.MQQueue;
import com.ibm.msg.client.wmq.WMQConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;

import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;

//import com.ibm.msg.client.services.Trace;

@Configuration
public class MQConfiguration {

    private static final Logger LOGGER = LoggerFactory.getLogger(MQConfiguration.class);

    @Value("${host-name}")
    private String hostName;

    @Value("${port}")
    private int port;

    @Value("${channel}")
    private String channel;

    @Value("${time-to-live}")
    private int timeToLive;

    @Autowired
    @Qualifier("MQConnectionFactory")
    ConnectionFactory connectionFactory;

    @Bean(name = "jmsTemplate")
    public JmsTemplate provideJmsTemplate() {
        JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
        jmsTemplate.setExplicitQosEnabled(true); 
        jmsTemplate.setTimeToLive(timeToLive);
        jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);     
        return jmsTemplate;
    }

    @Bean(name = "MQConnectionFactory")
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory ccf  = new CachingConnectionFactory();

        //Trace.setOn();

        try {
            MQConnectionFactory mqcf = new MQConnectionFactory();
            mqcf.setHostName(hostName);
            mqcf.setPort(port);
            mqcf.setChannel(channel);
            mqcf.setTransportType(WMQConstants.WMQ_CM_CLIENT);
            ccf.setTargetConnectionFactory(mqcf);
            ccf.setSessionCacheSize(2);   
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }

        return ccf;
    }

    @Bean(name = "requestQueue")
    public Destination createRequestQueue() {

        Destination queue = null;

        try {
            queue = new MQQueue("REQUEST.QUEUE");
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }

        return queue;
    }

    @Bean(name = "replyQueue")
    public Destination createReplyQueue() {

        Destination queue = null;

        try {
            queue = new MQQueue("REPLY.QUEUE");
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }

        return queue;
    }

    @Bean(name = "requestChannel")
    public QueueChannel createRequestChannel() {

        QueueChannel channel = new QueueChannel();

        return channel;
    }

    @Bean(name = "replyChannel")
    public QueueChannel createReplyChannel() {

        QueueChannel channel = new QueueChannel();

        return channel;
    }

}

这是我的服务 class:

package com.daluga.spring.integration.service

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.stereotype.Service;


@Service
public class MyRequestReplyService {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyRequestReplyService.class);

    @ServiceActivator(inputChannel = "replyChannel")
    public void sendAndReceive(String requestPayload) {
        // How to get replyPayload
    }

}

所以,在这一点上,我不太确定如何将所有这些粘合在一起才能完成这项工作。我不明白如何将我的请求和回复队列粘合到服务激活器以使这一切正常工作。

我正在调用的服务(JMS/Webshere 基于 MQ)正在使用典型的消息和相关 ID,以便我可以将请求正确地绑定到相应的响应。

任何人都可以为我提供有关如何让它工作的任何指导吗?请让我知道我可以提供哪些额外信息来说明这一点。

在此先感谢您的帮助!

网关提供 request/reply 语义。

您应该使用 Spring Integration's built-in JMS Support.

而不是直接使用 JmsTemplate
@Bean
@ServiceActivator(inputChannel="requestChannel")
public MessageHandler jmsOutGateway() {
    JmsOutboundGateway outGateway = new JmsOutboundGateway();
    // set properties
    outGateway.setOutputChannel(replyChannel());
    return outGateway;
}

如果您想自己推出,请将服务激活器方法更改为 return 回复类型并使用模板 sendAndReceive()convertSendAndReceive() 方法之一。

sample app 使用 XML 配置,但应该提供一些额外的指导。