使用 Spring 集成为 request/response 创建命名回复目标

Creating a named reply destination for request/response using Spring Integration

我有两个单独的应用程序 运行 在 ActiveMQ 代理的两边;应用程序 1 向应用程序 2 发送同步请求,returns 将响应返回给应用程序 1。目前,回复是通过临时队列进行的,我现在正在尝试创建一个命名的回复目标,以避免创建多个临时队列的开销。

申请 1

@MessagingGateway
public interface OrderGateway {

    @Gateway(requestChannel = "requestChannel", replyChannel = "responseChannel")
    public OrderDto fetchOrder(OrderRequest orderRequest);
}

@Bean
public IntegrationFlow outgoingRequestFlow(ConnectionFactory connectionFactory) {

    return IntegrationFlows.from("requestChannel")
                           .handle(Jms.outboundGateway(connectionFactory)
                                      .requestDestination("request.queue")
                                      .replyDestination("response.topic")
                                      .correlationKey("JMSCorrelationID"))
                           .channel("responseChannel")
                           .get();
}

应用程序 2

@Bean
public IntegrationFlow incomingRequestFlow(ConnectionFactory connectionFactory) {

    return IntegrationFlows.from(Jms.inboundGateway(connectionFactory)
                                    .destination("request.queue")
                                    .correlationKey("JMSCorrelationID"))
                           .channel("requestChannel")
                           .handle("requestServiceActivator", "handleRequest")
                           .channel("responseChannel")
                           .get();
}

@Component
public class OrderServiceActivator {

    @Autowired
    OrderService orderService;

    @ServiceActivator
    public OrderDto fetchOrder(OrderRequest orderRequest) {

        return orderService.getById(orderRequest.getId());
    }
}

当我启动两个应用程序时,request.queue 被创建并有一个消费者(应用程序 2)。 response.topic 已创建,但由于某种原因它没有消费者。因此,当我向应用程序 1 发送请求时,它会到达应用程序 2,但 5 秒后应用程序 1 未收到回复并超时,并记录以下错误:

应用程序 2

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'org.springframework.web.context.WebApplicationContext:/application-2.responseChannel'

申请 1

org.springframework.integration.MessageTimeoutException: failed to receive JMS response within timeout of: 5000ms

我想我犯了一些简单的配置错误,我们将不胜感激。

我没找到@ServiceActivator 是如何连接的...

通常是这样的:

@ServiceActivator(inputChannel = "requestChannel", outputChannel = "responseChannel")
public .....

也许这就是你所缺少的。

根据您的配置,回复队列没有长期使用的消费者 - 为每个请求创建一个消费者(使用特定相关 ID 的消息选择器)。

如果添加 .replyContainer() 将有一个永久消费者。

但是,它在功能上应该没有什么区别。

我只是 运行 测试与你的类似,有和没有 replyContainer(),这对我来说都很好......

@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
    return f -> f.handleWithAdapter(a ->
            a.jmsGateway(this.jmsConnectionFactory)
//                  .replyContainer()
                    .replyDestination("pipereplies")
                    .correlationKey("JmsCorrelationID")
                    .requestDestination("jmsPipelineTest"));
}

我建议您打开调试日志记录,看看是否有帮助。

@Bean
public IntegrationFlow jmsInboundGatewayFlow() {
    return IntegrationFlows.from((MessagingGateways g) ->
            g.jms(this.jmsConnectionFactory)
                    .correlationKey("JmsCorrelationID")
                    .destination("jmsPipelineTest"))
            .<String, String>transform(String::toUpperCase)
            .get();
}