SpringBoot + Rabbitmq - DLQ 队列不工作

SpringBoot + Rabbitmq - DLQ queue not working

我已经设置了 dlq 和 dlx,但是失败的消息没有重定向到 dlq。 我正在尝试从 java 应用程序以及从 rabbitmq 服务器向 MESSAGES.EXCHANGE 发送消息,在这两种情况下我都收到消息但是在抛出异常消息后应该重定向到 DLX.MESSAGES.EXCHANGE 但是它正在发生。

下面是 java rabbitmq 服务器的代码和屏幕截图。一切对我来说都是正确的。在代码或 rabbitmq 服务器中找不到任何问题。

队列设置代码-

public class DLQAmqpConfiguration {
    public static final String DLX_MESSAGES_EXCHANGE = "DLX.MESSAGES.EXCHANGE";
    public static final String DLQ_MESSAGES_QUEUE = "DLQ.MESSAGES.QUEUE";

    public static final String MESSAGES_QUEUE = "MESSAGES.QUEUE";
    public static final String MESSAGES_EXCHANGE = "MESSAGES.EXCHANGE";
    public static final String ROUTING_KEY_MESSAGES_QUEUE = "ROUTING_KEY_MESSAGES_QUEUE";

    @Bean
    Queue messagesQueue() {
        return QueueBuilder.durable(MESSAGES_QUEUE)
                .withArgument("x-dead-letter-exchange", DLX_MESSAGES_EXCHANGE)
                .build();
    }

    @Bean
    DirectExchange messagesExchange() {
        return new DirectExchange(MESSAGES_EXCHANGE);
    }

    @Bean
    Binding bindingMessages() {
        return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(ROUTING_KEY_MESSAGES_QUEUE);
    }

    @Bean
    FanoutExchange deadLetterExchange() {
        return new FanoutExchange(DLX_MESSAGES_EXCHANGE);
    }

    @Bean
    Queue deadLetterQueue() {
        return QueueBuilder.durable(DLQ_MESSAGES_QUEUE).build();
    }

    @Bean
    Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
    }
}

制作人-

    this.template.convertAndSend(DLQAmqpConfiguration.MESSAGES_EXCHANGE,
                DLQAmqpConfiguration.ROUTING_KEY_MESSAGES_QUEUE, message);

消费者-

    @RabbitListener(queues = DLQAmqpConfiguration.MESSAGES_QUEUE)
    public void receiveMessage(Message message) throws BusinessException {
        System.out.println("Received failed message, re-queueing: " + message.toString());
        System.out.println("Received failed message, re-queueing: " + message.getMessageProperties().getReceivedRoutingKey());
        throw new BusinessException();
    }

    // this code never running 
    @RabbitListener(queues = DLQAmqpConfiguration.DLQ_MESSAGES_QUEUE)
    public void processFailedMessages(Message message) {
        System.out.println("Received failed message: " + message.toString());
    }

交换-

队列 -

日志 -

Received failed message, re-queueing: (Body:'[B@55c36bc9(byte[26])' MessageProperties [headers={}, contentLength=0, receivedDeliveryMode=PERSISTENT, redelivered=true, receivedExchange=MESSAGES.EXCHANGE, receivedRoutingKey=ROUTING_KEY_MESSAGES_QUEUE, deliveryTag=5444, consumerTag=amq.ctag-KrxkDPlc_uoqHOx_bbnvnA, consumerQueue=MESSAGES.QUEUE])
Received failed message, re-queueing: ROUTING_KEY_MESSAGES_QUEUE
2020-08-27 21:36:33.460  WARN 13192 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.example.rabbitmq.RabbitmqApplication.receiveMessage(org.springframework.amqp.core.Message) throws com.example.rabbitmq.errorhandler.BusinessException' threw exception
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:228) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:148) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:133) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:970) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:916) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1291) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1197) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Caused by: com.example.rabbitmq.errorhandler.BusinessException: null

您必须设置 spring.rabbitmq.listener.simple.default-requeue-rejected=false(或 ...direct...,如果使用直接容器而不是简单容器)或抛出 AmqpRejectAndDontRequeueException.

否则,失败的消息将重新排队并重新传送。

@SpringBootApplication
public class So63620066Application {

    public static void main(String[] args) {
        SpringApplication.run(So63620066Application.class, args);
    }

    public static final String DLX_MESSAGES_EXCHANGE = "DLX.MESSAGES.EXCHANGE";

    public static final String DLQ_MESSAGES_QUEUE = "DLQ.MESSAGES.QUEUE";

    public static final String MESSAGES_QUEUE = "MESSAGES.QUEUE";

    public static final String MESSAGES_EXCHANGE = "MESSAGES.EXCHANGE";

    public static final String ROUTING_KEY_MESSAGES_QUEUE = "ROUTING_KEY_MESSAGES_QUEUE";

    @Bean
    Queue messagesQueue() {
        return QueueBuilder.durable(MESSAGES_QUEUE)
                .withArgument("x-dead-letter-exchange", DLX_MESSAGES_EXCHANGE)
                .build();
    }

    @Bean
    DirectExchange messagesExchange() {
        return new DirectExchange(MESSAGES_EXCHANGE);
    }

    @Bean
    Binding bindingMessages() {
        return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(ROUTING_KEY_MESSAGES_QUEUE);
    }

    @Bean
    FanoutExchange deadLetterExchange() {
        return new FanoutExchange(DLX_MESSAGES_EXCHANGE);
    }

    @Bean
    Queue deadLetterQueue() {
        return QueueBuilder.durable(DLQ_MESSAGES_QUEUE).build();
    }

    @Bean
    Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
    }

    @RabbitListener(queues = MESSAGES_QUEUE)
    public void receiveMessage(Message message) {
        System.out.println("Received failed message, re-queueing: " + message.toString());
        System.out.println(
                "Received failed message, re-queueing: " + message.getMessageProperties().getReceivedRoutingKey());
        throw new RuntimeException("fail");
    }

    @RabbitListener(queues = DLQ_MESSAGES_QUEUE)
    public void processFailedMessages(Message message) {
        System.out.println("Received failed message: " + message.toString());
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            template.convertAndSend(MESSAGES_EXCHANGE,
                    ROUTING_KEY_MESSAGES_QUEUE, "foo");
        };
    }

}
spring.rabbitmq.listener.simple.default-requeue-rejected=false
Received failed message, re-queueing: ROUTING_KEY_MESSAGES_QUEUE
2020-08-27 12:49:41.056  WARN 11489 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.example.demo.So63620066Application.receiveMessage(org.springframework.amqp.core.Message)' threw exception
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:228) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:148) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:133) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:970) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:916) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1291) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1197) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_212]
Caused by: java.lang.RuntimeException: fail
    at com.example.demo.So63620066Application.receiveMessage(So63620066Application.java:71) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_212]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_212]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_212]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_212]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:53) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:220) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    ... 13 common frames omitted

Received failed message: (Body:'foo' MessageProperties [headers={x-first-death-exchange=MESSAGES.EXCHANGE, x-death=[{reason=rejected, count=1, exchange=MESSAGES.EXCHANGE, time=Thu Aug 27 12:49:41 EDT 2020, routing-keys=[ROUTING_KEY_MESSAGES_QUEUE], queue=MESSAGES.QUEUE}], x-first-death-reason=rejected, x-first-death-queue=MESSAGES.QUEUE}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=DLX.MESSAGES.EXCHANGE, receivedRoutingKey=ROUTING_KEY_MESSAGES_QUEUE, deliveryTag=3, consumerTag=amq.ctag--VIXT0V3hhBrlfTFqI5uxg, consumerQueue=DLQ.MESSAGES.QUEUE])