Rabbitmq 死信 spring 集成 xml
Rabbitmq dead letter spring integration xml
您好,我正在尝试在 spring 集成 XML 中实现死信交换,因此场景是 AAA 交换 绑定 BBB 队列 如果 BBB 队列在某些情况下失败,例如 lister 抛出异常 我想将异常导航到死交换队列以存储下面的消息是代码
创建示例项目
main.java
package com.spring.rabbit.first.deadletter;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Main {
public static void main(String[] args) {
new ClassPathXmlApplicationContext("/applicationContext.xml");
}
}
xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- Spring configuration -->
<context:component-scan base-package="com.spring.rabbit.first" />
<context:mbean-export default-domain="com.spring.rabbit.first.deadletter" />
<!-- RabbitMQ common configuration -->
<rabbit:connection-factory id="connectionFactory"
username="guest" password="guest" port="5672" virtual-host="/" host="localhost" />
<!-- <rabbit:connection-factory id="connectionFactory"/> -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<rabbit:admin connection-factory="connectionFactory" />
<!-- Queues -->
<rabbit:queue id="springQueue" name="spring.queue"
auto-delete="true" durable="false" />
<rabbit:listener-container
connection-factory="connectionFactory" advice-chain="retryAdvice">
<rabbit:listener queues="BBBqueue" ref="messageListener" />
</rabbit:listener-container>
<bean id="messageListener" class="com.spring.rabbit.first.deadletter.MessageHandler" />
<bean id="retryAdvice"
class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
<property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer" />
<property name="retryOperations" ref="retryTemplate" />
</bean>
<bean id="rejectAndDontRequeueRecoverer"
class="org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer" />
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="2000" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="30000" />
</bean>
</property>
<property name="retryPolicy">
<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="3" />
</bean>
</property>
</bean>
<rabbit:topic-exchange name="AAAqueue">
<rabbit:bindings>
<rabbit:binding queue="BBBqueue" pattern="" />
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:queue name="BBBqueue">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="XXX.dead.letter"></entry>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Long" />
</rabbit:queue-arguments>
</rabbit:queue>
<!-- dead letter -->
<rabbit:topic-exchange name="XXX.dead.letter">
<rabbit:bindings>
<rabbit:binding queue="XXX.dead.letter.queue" pattern=""></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:queue name="XXX.dead.letter.queue"></rabbit:queue>
</beans>
消息处理程序
package com.spring.rabbit.first.deadletter;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class MessageHandler implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("Received message: " + message);
System.out.println("Text: " + new String(message.getBody()));
message = null;
if (message == null) {
throw new NullPointerException();
}
}
}
消息发送者
package com.spring.rabbit.first.deadletter;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.stereotype.Service;
@Service
@ManagedResource
public class MessageSender {
@Autowired
private AmqpTemplate template;
@ManagedOperation
public void send(String text) {
send("amq.fanout", "NDPAR.SPRING.JAVA", text);
}
@ManagedOperation
public void send(String exchange, String key, String text) {
template.convertAndSend(exchange, key, text);
}
}
输出:
23:57:33.753 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
23:57:33.777 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=0
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:06.952 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.backoff.ExponentialBackOffPolicy - Sleeping for 2000
23:58:08.953 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=1
23:58:08.953 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=1
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:12.888 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.backoff.ExponentialBackOffPolicy - Sleeping for 20000
23:58:39.016 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=2
23:58:39.016 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=2
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:42.391 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=3
23:58:42.391 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry failed last attempt: count=3
23:58:42.393 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] WARN org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer -
Retries exhausted for message (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue,
receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access[=16=]1(SimpleMessageListenerContainer.java:95)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:187)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
但我仍然没有在死信中看到任何消息 queue.is 我遗漏了什么??
请对此提供帮助
我不确定你的 XXX-channel
和适配器应该做什么,但你需要添加一个 RejectAndDontRequeueRecoverer
到重试建议工厂 bean(在 messageRecoverer
属性).
默认恢复仅记录重试次数耗尽并丢弃消息。
编辑
这是一个自定义 MessageRecoverer
,它将失败的消息从队列 A
发布到名为 A.dlq
的队列 - 队列和绑定会根据需要自动声明。
/*
* Copyright 2014-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
public class AutoConfiguringRepublishMessageRecoverer implements MessageRecoverer {
public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";
public static final String X_EXCEPTION_MESSAGE = "x-exception-message";
public static final String X_ORIGINAL_EXCHANGE = "x-original-exchange";
public static final String X_ORIGINAL_ROUTING_KEY = "x-original-routingKey";
private final Log logger = LogFactory.getLog(getClass());
private final RabbitTemplate errorTemplate;
private final RabbitAdmin admin;
private final String deadLetterExchangeName = "DLX";
private final DirectExchange deadletterExchange = new DirectExchange(this.deadLetterExchangeName);
private boolean initialized;
public AutoConfiguringRepublishMessageRecoverer(RabbitTemplate errorTemplate) {
this.errorTemplate = errorTemplate;
this.admin = new RabbitAdmin(errorTemplate.getConnectionFactory());
}
@Override
public void recover(Message message, Throwable cause) {
if (!this.initialized) {
initialize();
}
Map<String, Object> headers = message.getMessageProperties().getHeaders();
headers.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(cause));
headers.put(X_EXCEPTION_MESSAGE, cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
headers.put(X_ORIGINAL_EXCHANGE, message.getMessageProperties().getReceivedExchange());
headers.put(X_ORIGINAL_ROUTING_KEY, message.getMessageProperties().getReceivedRoutingKey());
String dlqName = message.getMessageProperties().getConsumerQueue() + ".dlq";
if (this.admin.getQueueProperties(dlqName) == null) {
bindDlq(dlqName);
}
this.errorTemplate.send(this.deadLetterExchangeName, dlqName, message);
if (this.logger.isWarnEnabled()) {
this.logger.warn("Republishing failed message to " + dlqName);
}
}
private void initialize() {
this.admin.declareExchange(this.deadletterExchange);
this.initialized = true;
}
private void bindDlq(String dlqName) {
Queue dlq = new Queue(dlqName);
this.admin.declareQueue(dlq);
this.admin.declareBinding(BindingBuilder.bind(dlq).to(this.deadletterExchange).with(dlqName));
}
private String getStackTraceAsString(Throwable cause) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter, true);
cause.printStackTrace(printWriter);
return stringWriter.getBuffer().toString();
}
}
您好,我正在尝试在 spring 集成 XML 中实现死信交换,因此场景是 AAA 交换 绑定 BBB 队列 如果 BBB 队列在某些情况下失败,例如 lister 抛出异常 我想将异常导航到死交换队列以存储下面的消息是代码
创建示例项目
main.java
package com.spring.rabbit.first.deadletter;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Main {
public static void main(String[] args) {
new ClassPathXmlApplicationContext("/applicationContext.xml");
}
}
xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- Spring configuration -->
<context:component-scan base-package="com.spring.rabbit.first" />
<context:mbean-export default-domain="com.spring.rabbit.first.deadletter" />
<!-- RabbitMQ common configuration -->
<rabbit:connection-factory id="connectionFactory"
username="guest" password="guest" port="5672" virtual-host="/" host="localhost" />
<!-- <rabbit:connection-factory id="connectionFactory"/> -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<rabbit:admin connection-factory="connectionFactory" />
<!-- Queues -->
<rabbit:queue id="springQueue" name="spring.queue"
auto-delete="true" durable="false" />
<rabbit:listener-container
connection-factory="connectionFactory" advice-chain="retryAdvice">
<rabbit:listener queues="BBBqueue" ref="messageListener" />
</rabbit:listener-container>
<bean id="messageListener" class="com.spring.rabbit.first.deadletter.MessageHandler" />
<bean id="retryAdvice"
class="org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterceptorFactoryBean">
<property name="messageRecoverer" ref="rejectAndDontRequeueRecoverer" />
<property name="retryOperations" ref="retryTemplate" />
</bean>
<bean id="rejectAndDontRequeueRecoverer"
class="org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer" />
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="2000" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="30000" />
</bean>
</property>
<property name="retryPolicy">
<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="3" />
</bean>
</property>
</bean>
<rabbit:topic-exchange name="AAAqueue">
<rabbit:bindings>
<rabbit:binding queue="BBBqueue" pattern="" />
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:queue name="BBBqueue">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="XXX.dead.letter"></entry>
<entry key="x-message-ttl" value="10000" value-type="java.lang.Long" />
</rabbit:queue-arguments>
</rabbit:queue>
<!-- dead letter -->
<rabbit:topic-exchange name="XXX.dead.letter">
<rabbit:bindings>
<rabbit:binding queue="XXX.dead.letter.queue" pattern=""></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<rabbit:queue name="XXX.dead.letter.queue"></rabbit:queue>
</beans>
消息处理程序
package com.spring.rabbit.first.deadletter;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class MessageHandler implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("Received message: " + message);
System.out.println("Text: " + new String(message.getBody()));
message = null;
if (message == null) {
throw new NullPointerException();
}
}
}
消息发送者
package com.spring.rabbit.first.deadletter;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.stereotype.Service;
@Service
@ManagedResource
public class MessageSender {
@Autowired
private AmqpTemplate template;
@ManagedOperation
public void send(String text) {
send("amq.fanout", "NDPAR.SPRING.JAVA", text);
}
@ManagedOperation
public void send(String exchange, String key, String text) {
template.convertAndSend(exchange, key, text);
}
}
输出:
23:57:33.753 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.amqp.rabbit.listener.BlockingQueueConsumer - Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
23:57:33.777 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=0
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:06.952 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.backoff.ExponentialBackOffPolicy - Sleeping for 2000
23:58:08.953 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=1
23:58:08.953 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=1
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:12.888 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.backoff.ExponentialBackOffPolicy - Sleeping for 20000
23:58:39.016 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=2
23:58:39.016 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry: count=2
Received message: (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
Text: send
23:58:42.391 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Checking for rethrow: count=3
23:58:42.391 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] DEBUG org.springframework.retry.support.RetryTemplate - Retry failed last attempt: count=3
23:58:42.393 [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#0-1] WARN org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer -
Retries exhausted for message (Body:'[B@1c18b12a(byte[4])' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=null, contentEncoding=null, contentLength=0, deliveryMode=null, receivedDeliveryMode=NON_PERSISTENT, expiration=null, priority=null, redelivered=false, receivedExchange=, receivedRoutingKey=BBBqueue,
receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-yPBzkqcKH2zX2IjWqT20wg, consumerQueue=BBBqueue])
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:870)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:780)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access[=16=]1(SimpleMessageListenerContainer.java:95)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:187)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
但我仍然没有在死信中看到任何消息 queue.is 我遗漏了什么?? 请对此提供帮助
我不确定你的 XXX-channel
和适配器应该做什么,但你需要添加一个 RejectAndDontRequeueRecoverer
到重试建议工厂 bean(在 messageRecoverer
属性).
默认恢复仅记录重试次数耗尽并丢弃消息。
编辑
这是一个自定义 MessageRecoverer
,它将失败的消息从队列 A
发布到名为 A.dlq
的队列 - 队列和绑定会根据需要自动声明。
/*
* Copyright 2014-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
public class AutoConfiguringRepublishMessageRecoverer implements MessageRecoverer {
public static final String X_EXCEPTION_STACKTRACE = "x-exception-stacktrace";
public static final String X_EXCEPTION_MESSAGE = "x-exception-message";
public static final String X_ORIGINAL_EXCHANGE = "x-original-exchange";
public static final String X_ORIGINAL_ROUTING_KEY = "x-original-routingKey";
private final Log logger = LogFactory.getLog(getClass());
private final RabbitTemplate errorTemplate;
private final RabbitAdmin admin;
private final String deadLetterExchangeName = "DLX";
private final DirectExchange deadletterExchange = new DirectExchange(this.deadLetterExchangeName);
private boolean initialized;
public AutoConfiguringRepublishMessageRecoverer(RabbitTemplate errorTemplate) {
this.errorTemplate = errorTemplate;
this.admin = new RabbitAdmin(errorTemplate.getConnectionFactory());
}
@Override
public void recover(Message message, Throwable cause) {
if (!this.initialized) {
initialize();
}
Map<String, Object> headers = message.getMessageProperties().getHeaders();
headers.put(X_EXCEPTION_STACKTRACE, getStackTraceAsString(cause));
headers.put(X_EXCEPTION_MESSAGE, cause.getCause() != null ? cause.getCause().getMessage() : cause.getMessage());
headers.put(X_ORIGINAL_EXCHANGE, message.getMessageProperties().getReceivedExchange());
headers.put(X_ORIGINAL_ROUTING_KEY, message.getMessageProperties().getReceivedRoutingKey());
String dlqName = message.getMessageProperties().getConsumerQueue() + ".dlq";
if (this.admin.getQueueProperties(dlqName) == null) {
bindDlq(dlqName);
}
this.errorTemplate.send(this.deadLetterExchangeName, dlqName, message);
if (this.logger.isWarnEnabled()) {
this.logger.warn("Republishing failed message to " + dlqName);
}
}
private void initialize() {
this.admin.declareExchange(this.deadletterExchange);
this.initialized = true;
}
private void bindDlq(String dlqName) {
Queue dlq = new Queue(dlqName);
this.admin.declareQueue(dlq);
this.admin.declareBinding(BindingBuilder.bind(dlq).to(this.deadletterExchange).with(dlqName));
}
private String getStackTraceAsString(Throwable cause) {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter, true);
cause.printStackTrace(printWriter);
return stringWriter.getBuffer().toString();
}
}