RabbitMQ 无限循环问题
RabbitMQ infinite loop issue
我在使用 rabbitMQ 将消息从服务 A 发送到服务 B 时遇到问题,服务 B 也向服务 C 发送通知,问题是,我必须将 @RabbitListener 和 Rabbittemplate 放在相同的方法中,例如所以:
@Autowired private RabbitTemplate template;
@RabbitListener(queues=RabbitConfig.QUEUETD)
public ResponseEntity<String> AddSas_Campaign(SasCampaign sasCampaign){
if
//...
template.convertAndSend(RabbitConfig.EXCHANGE,RabbitConfig.ROUTING_KEY,sasCampaign);
return new ResponseEntity<String>( "New line inserted ", HttpStatus.OK);}
//...
}
else return new ResponseEntity<String>("Campaign Code exists",HttpStatus.OK);
}
它正在不停地创建 (+2000/min) 消息和异常的无限循环。
2021-06-29 14:34:16,560 WARN [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler: Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1746)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1636)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.amqp.rabbit.listener.adapter.ReplyFailureException: Failed to send reply with payload 'InvocationResult [returnValue=<200 OK OK,Test SasCamapign inserted ,[]>, returnType=org.springframework.http.ResponseEntity<java.lang.String>, bean=tn.itserv.services.Sas_CampaignService@4232ecc, method=public org.springframework.http.ResponseEntity tn.itserv.services.Sas_CampaignService.AddSas_Campaign(tn.itserv.entities.SasCampaign)]'
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.doHandleResult(AbstractAdaptableMessageListener.java:476)
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.handleResult(AbstractAdaptableMessageListener.java:400)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:152)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:135)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
... 10 common frames omitted
Caused by: org.springframework.amqp.AmqpException: Cannot determine ReplyTo message property value: Request message does not contain reply-to property, and no default response Exchange was set.
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.getReplyToAddress(AbstractAdaptableMessageListener.java:576)
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.doHandleResult(AbstractAdaptableMessageListener.java:472)
... 14 common frames omitted
老实说,我还没有 100% 理解它是如何工作的,我应该为每个交换创建不同的队列吗?因为它部分工作(从服务 A 到 B 或从 B 到 C)但是当我同时使用它们时它会产生这些异常。
我在生产中遇到过类似的问题,每年都会出现几次。 'requeue on failure' 的概念似乎是个坏主意,但它是由以前的开发人员实现的。
原因是消息在异常时被放回队列中。要修复它,您不会从 RabbitListener 中抛出异常。尝试使用 try catch 和 return 带有服务器错误的响应实体。
@Autowired private RabbitTemplate template;
@RabbitListener(queues=RabbitConfig.QUEUETD)
public ResponseEntity<String> AddSas_Campaign(SasCampaign sasCampaign){
try {
if
//...
template.convertAndSend(RabbitConfig.EXCHANGE,RabbitConfig.ROUTING_KEY,sasCampaign);
return new ResponseEntity<String>( "New line inserted ", HttpStatus.OK);}
//...
}
else return new ResponseEntity<String>("Campaign Code exists",HttpStatus.OK);
} catch(Exception e) {
return new ResponseEntity<String>("Error on Message Processing", HttpStatus.INTERNAL_SERVER_ERROR);
}
}
所以我意识到 @RabbitListener 函数应该没有 return 值,这意味着 void
而不是我的 ResponseEntity
所以我的想法是创建一个调用我的新方法方法并且没有 return 值:
@RabbitListener(queues=RabbitConfig.QUEUETD)
public void receiveFromService(SasCampaign sasCampaign){
AddSas_Campaign(sasCampaign);
}
我在使用 rabbitMQ 将消息从服务 A 发送到服务 B 时遇到问题,服务 B 也向服务 C 发送通知,问题是,我必须将 @RabbitListener 和 Rabbittemplate 放在相同的方法中,例如所以:
@Autowired private RabbitTemplate template;
@RabbitListener(queues=RabbitConfig.QUEUETD)
public ResponseEntity<String> AddSas_Campaign(SasCampaign sasCampaign){
if
//...
template.convertAndSend(RabbitConfig.EXCHANGE,RabbitConfig.ROUTING_KEY,sasCampaign);
return new ResponseEntity<String>( "New line inserted ", HttpStatus.OK);}
//...
}
else return new ResponseEntity<String>("Campaign Code exists",HttpStatus.OK);
}
它正在不停地创建 (+2000/min) 消息和异常的无限循环。
2021-06-29 14:34:16,560 WARN [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler: Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1746)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1636)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.amqp.rabbit.listener.adapter.ReplyFailureException: Failed to send reply with payload 'InvocationResult [returnValue=<200 OK OK,Test SasCamapign inserted ,[]>, returnType=org.springframework.http.ResponseEntity<java.lang.String>, bean=tn.itserv.services.Sas_CampaignService@4232ecc, method=public org.springframework.http.ResponseEntity tn.itserv.services.Sas_CampaignService.AddSas_Campaign(tn.itserv.entities.SasCampaign)]'
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.doHandleResult(AbstractAdaptableMessageListener.java:476)
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.handleResult(AbstractAdaptableMessageListener.java:400)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:152)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:135)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
... 10 common frames omitted
Caused by: org.springframework.amqp.AmqpException: Cannot determine ReplyTo message property value: Request message does not contain reply-to property, and no default response Exchange was set.
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.getReplyToAddress(AbstractAdaptableMessageListener.java:576)
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.doHandleResult(AbstractAdaptableMessageListener.java:472)
... 14 common frames omitted
老实说,我还没有 100% 理解它是如何工作的,我应该为每个交换创建不同的队列吗?因为它部分工作(从服务 A 到 B 或从 B 到 C)但是当我同时使用它们时它会产生这些异常。
我在生产中遇到过类似的问题,每年都会出现几次。 'requeue on failure' 的概念似乎是个坏主意,但它是由以前的开发人员实现的。
原因是消息在异常时被放回队列中。要修复它,您不会从 RabbitListener 中抛出异常。尝试使用 try catch 和 return 带有服务器错误的响应实体。
@Autowired private RabbitTemplate template;
@RabbitListener(queues=RabbitConfig.QUEUETD)
public ResponseEntity<String> AddSas_Campaign(SasCampaign sasCampaign){
try {
if
//...
template.convertAndSend(RabbitConfig.EXCHANGE,RabbitConfig.ROUTING_KEY,sasCampaign);
return new ResponseEntity<String>( "New line inserted ", HttpStatus.OK);}
//...
}
else return new ResponseEntity<String>("Campaign Code exists",HttpStatus.OK);
} catch(Exception e) {
return new ResponseEntity<String>("Error on Message Processing", HttpStatus.INTERNAL_SERVER_ERROR);
}
}
所以我意识到 @RabbitListener 函数应该没有 return 值,这意味着 void
而不是我的 ResponseEntity
所以我的想法是创建一个调用我的新方法方法并且没有 return 值:
@RabbitListener(queues=RabbitConfig.QUEUETD)
public void receiveFromService(SasCampaign sasCampaign){
AddSas_Campaign(sasCampaign);
}