RabbitMQ 无限循环问题

RabbitMQ infinite loop issue

我在使用 rabbitMQ 将消息从服​​务 A 发送到服务 B 时遇到问题,服务 B 也向服务 C 发送通知,问题是,我必须将 @RabbitListener 和 Rabbittemplate 放在相同的方法中,例如所以:

@Autowired private RabbitTemplate template;

@RabbitListener(queues=RabbitConfig.QUEUETD)
public ResponseEntity<String> AddSas_Campaign(SasCampaign sasCampaign){
        if
     //...
     template.convertAndSend(RabbitConfig.EXCHANGE,RabbitConfig.ROUTING_KEY,sasCampaign);

     return new ResponseEntity<String>( "New line inserted ", HttpStatus.OK);}
     //...
     }
     else return new ResponseEntity<String>("Campaign Code exists",HttpStatus.OK);
     
}

它正在不停地创建 (+2000/min) 消息和异常的无限循环。

2021-06-29 14:34:16,560 WARN  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler: Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1746)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1636)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.amqp.rabbit.listener.adapter.ReplyFailureException: Failed to send reply with payload 'InvocationResult [returnValue=<200 OK OK,Test SasCamapign inserted ,[]>, returnType=org.springframework.http.ResponseEntity<java.lang.String>, bean=tn.itserv.services.Sas_CampaignService@4232ecc, method=public org.springframework.http.ResponseEntity tn.itserv.services.Sas_CampaignService.AddSas_Campaign(tn.itserv.entities.SasCampaign)]'
    at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.doHandleResult(AbstractAdaptableMessageListener.java:476)
    at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.handleResult(AbstractAdaptableMessageListener.java:400)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:152)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:135)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
    ... 10 common frames omitted
Caused by: org.springframework.amqp.AmqpException: Cannot determine ReplyTo message property value: Request message does not contain reply-to property, and no default response Exchange was set.
    at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.getReplyToAddress(AbstractAdaptableMessageListener.java:576)
    at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.doHandleResult(AbstractAdaptableMessageListener.java:472)
    ... 14 common frames omitted

老实说,我还没有 100% 理解它是如何工作的,我应该为每个交换创建不同的队列吗?因为它部分工作(从服务 A 到 B 或从 B 到 C)但是当我同时使用它们时它会产生这些异常。

我在生产中遇到过类似的问题,每年都会出现几次。 'requeue on failure' 的概念似乎是个坏主意,但它是由以前的开发人员实现的。

原因是消息在异常时被放回队列中。要修复它,您不会从 RabbitListener 中抛出异常。尝试使用 try catch 和 return 带有服务器错误的响应实体。

@Autowired private RabbitTemplate template;

@RabbitListener(queues=RabbitConfig.QUEUETD)
public ResponseEntity<String> AddSas_Campaign(SasCampaign sasCampaign){
     try {
        if
        //...
        template.convertAndSend(RabbitConfig.EXCHANGE,RabbitConfig.ROUTING_KEY,sasCampaign);

        return new ResponseEntity<String>( "New line inserted ", HttpStatus.OK);}
       //...
       }
       else return new ResponseEntity<String>("Campaign Code exists",HttpStatus.OK);
     
   } catch(Exception e) {
      return new ResponseEntity<String>("Error on Message Processing", HttpStatus.INTERNAL_SERVER_ERROR);
   }
}

所以我意识到 @RabbitListener 函数应该没有 return 值,这意味着 void 而不是我的 ResponseEntity 所以我的想法是创建一个调用我的新方法方法并且没有 return 值:

@RabbitListener(queues=RabbitConfig.QUEUETD)
public void receiveFromService(SasCampaign sasCampaign){
    AddSas_Campaign(sasCampaign);
}