当响应来自使用 Spring 集成 DSL 的 rabbitMQ 回复队列时,如何实现 HTTP request/reply?

How to implement HTTP request/reply when the response comes from a rabbitMQ reply queue using Spring Integration DSL?

我正在尝试在 Spring 集成 DSL 中使用单独的 RabbitMQ 队列来实现 HTTP request/reply。它类似于 。不同之处在于我希望将响应返回给原始的 http 调用者。我可以看到测试 http post 消息成功传递到请求队列并转换(大写)到响应队列中。该消息也从响应队列中使用,但从未返回给调用者 (http://localhost:8080/Tunner)。最终调用超时并出现 500 错误。我对此很陌生,所以可能有一些我完全错过的东西。有人可以提供建议吗?代码如下:

public class TunnelApplication
{
    public static void main(String[] args)
    {
        SpringApplication.run(TunnelApplication.class, args);
    }

    @Value("${outboundQueue}")
    private String outboundQueue;

    @Value("${inboundQueue}")
    private String inboundQueue;

    private ConnectionFactory rabbitConnectionFactory;

    @Autowired
    public TunnelApplication(ConnectionFactory factory) {
        rabbitConnectionFactory = factory;
    }

    @Bean
    public Queue targetQueue()
    {
        return new Queue(outboundQueue, true, false, true);
    }

    @Bean
    public Queue requestQueue()
    {
        return new Queue(inboundQueue, true, false, true);
    }

    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter()
    {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate amqpTemplate()
    {
        RabbitTemplate result = new RabbitTemplate(rabbitConnectionFactory);
        result.setMessageConverter(jsonMessageConverter());
        result.setDefaultReceiveQueue(outboundQueue);
        //result.setReplyAddress(outboundQueue);
        result.setReplyTimeout(60000);
        return result;
    }

    @Bean
    public IntegrationFlow sendReceiveFlow(RabbitTemplate amqpTemplate) {
        return IntegrationFlows
                .from(Http.inboundGateway("/tunnel"))
                .handle(Amqp.outboundGateway(amqpTemplate)
                        .routingKey(inboundQueue)
                        .returnChannel(amqpOutboundChannel()))
                .log()
                .bridge(null)
                .get();
    }

    @Bean
    public IntegrationFlow rabbitToWeb(RabbitTemplate amqpTemplate, ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, requestQueue()))
                .transform(String.class, String::toUpperCase)
                .log()
                .handle(Amqp.outboundGateway(amqpTemplate).routingKey(outboundQueue))
                .log()
                .bridge(null)
                .get();
    }

    @Bean
    public IntegrationFlow replyBackToHttp(RabbitTemplate amqpTemplate, ConnectionFactory connectionFactory) {
        return IntegrationFlows.from(Amqp.inboundGateway(connectionFactory, targetQueue()))
                .handle(Http.outboundGateway("http://localhost:8080/tunnel")
                       .expectedResponseType(String.class))
                .log()
                .bridge(null)
                .channel(amqpOutboundChannel())
                .get();
    }

    @Bean
    public MessageChannel amqpOutboundChannel() {
    return new DirectChannel();
}

我们也尝试了以下代码(由我的同事),但我们也没有得到响应:

@Configuration
@EnableIntegration
public class FlowConfig {

 

   @Value("${routingKey}")
   private String routingKey;

 

   @Value("${rabbitSinkChannel}")
   private String rabbitSinkChannel;

 

   @Bean
   public MessageChannel rabbitSinkChannel(ConnectionFactory connectionFactory) {
      return
         Amqp
         .channel(rabbitSinkChannel, connectionFactory)
         .get();
   }

 

   @Bean
   public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
      return new RabbitTemplate(connectionFactory);
   }

 

   @Bean
   public IntegrationFlow httpFlow(RabbitTemplate rabbitTemplate, ConnectionFactory connectionFactory) {
      MessageChannel rabbitSinkChannel = rabbitSinkChannel(connectionFactory);

 

      return IntegrationFlows
         .from(
            Http.inboundGateway("/sendreceive")
         )
         .handle(
            Amqp.outboundGateway(rabbitTemplate)
               .routingKey(routingKey)
               .returnChannel(rabbitSinkChannel)
         )
         .channel(rabbitSinkChannel) // or .handle? if so, what?

 

         .get();
   }
}

您可能误解了 Amqp.outboundGateway 上的 returnChannel 并尝试依靠您的逻辑。请熟悉发布者确认和 Returns 功能:https://docs.spring.io/spring-amqp/docs/current/reference/html/#cf-pub-conf-ret.

也不清楚什么是 replyBackToHttp 流目的,但目前它与对其他 bean 的混合引用混淆了。

您可能需要研究什么是来自 Spring AMQP 的请求-回复配置,并且您可能不会尝试使用另一个队列进行回复。虽然它仍然是可能的:见 replyAddress 属性 或 RabbitTemplate: https://docs.spring.io/spring-amqp/docs/current/reference/html/#request-reply

以下更新有效(我还删除了 replyBackToHttp() 方法):

@Bean
public AmqpTemplate amqpTemplate()
{
    RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
    rabbitTemplate.setMessageConverter(jsonMessageConverter());
    //result.setDefaultReceiveQueue(outboundQueue);
    rabbitTemplate.setReplyAddress(outboundQueue);
    rabbitTemplate.setReplyTimeout(60000);
    rabbitTemplate.setUseDirectReplyToContainer(false);
    return rabbitTemplate;
}

@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(rabbitConnectionFactory);
    container.setQueues(replyQueue());
    container.setMessageListener((MessageListener) amqpTemplate());
    return container;
}

对于那些感到沮丧并且只想继续前进的人来说,这是几乎完整的解决方案。

package com.scdf.poc.config;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.amqp.dsl.Amqp;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.http.dsl.Http;
@Configuration
@EnableIntegration
public class FlowConfig {
   @Value("${rabbitSource}")
   private String rabbitSource;
   @Value("${rabbitSink}")
   private String rabbitSink; // Note: Has to be manually created in rabbit mq, the SCDF flows won't auto create this
   @Bean
   public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
      RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
      rabbitTemplate.setReplyAddress(rabbitSink);
      return rabbitTemplate;
   }
   @Bean
   public SimpleMessageListenerContainer simpleMessageListenerContainer(RabbitTemplate rabbitTemplate, ConnectionFactory connectionFactory) {
      SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
      container.setConnectionFactory(connectionFactory);
      container.setQueueNames(rabbitSink);
      container.setMessageListener(rabbitTemplate);
      return container;
   }
   @Bean
   public IntegrationFlow httpFlow(RabbitTemplate rabbitTemplate) {
      return IntegrationFlows
         .from(
            Http.inboundGateway("/sendreceive")
               .requestPayloadType(String.class)
         )
         .handle(
            Amqp.outboundGateway(rabbitTemplate)
               .routingKey(rabbitSource)
         )
         .get();
   }
}

application.properties - 请注意,SCDF 使用流名称作为队列名称的前缀和后缀

rabbitSource=pocStream.rabbitSource.pocStream
rabbitSink=pocStream.rabbitSink.pocStream

pocStream 的 SCDF 流定义 - 这只是回显请求

rabbitSource: rabbit --queues=rabbitSource | bridge | rabbitSink: rabbit --routing-key=pocStream.rabbitSink.pocStream