Spring RabbitTemplate setRetryTemplate 和 setRecoveryCallback 已忽略

Spring RabbitTemplate setRetryTemplate & setRecoveryCallback ignored

问题: 经过固定次数的重试后,我想将消息发送到错误队列并从其原始队列中使用它。我想找到一个通用的解决方案,因为我处理很多不同的消息,并且根据它们引发的异常,我想采取不同的行动。如何设置 RecoveryCallback 以便 Spring 在 maxRetries 之后触发它?

我目前在做什么
我尝试设置一个 RetryTemplate 和一个 RecoveryCallback。 当我 运行 应用程序并将消息发布到 test 队列时,我希望 EListener#receive 中的处理失败 3 次,然后触发我配置的 RecoveryCallback 然后路由消息基于特定错误队列的上下文。

实际发生了什么 实际发生的是 Spring Boot 使用自己的 RabbitTemplate 初始化 RabbitAdmin 对象,因此不使用我配置的 RabbitTemplate bean。

我的目录结构如下:

rabbit
  |___ EListener.java
  |___ Rabbit.java
test
  |___ Test.java

我在Rabbit.java

中有以下代码
@Configuration
public class Rabbit {

    @Autowired
    ConnectionFactory connectionFactory;

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setRetryTemplate(createRetryTemplate());
        rabbitTemplate.setRecoveryCallback(createRecoveryCallback());
        return rabbitTemplate;
    }

    createRecoveryCallback() // Omitted for brevity
    createRetryTemplate() // Omitted for brevity
}

EListener.java 文件包含:

@Component
public class EListener {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "test", durable = "true"), exchange = @Exchange(value = "test", type = ExchangeTypes.TOPIC, durable = "true", autoDelete = "true"), key = "test"))
    public void receive(Message m) throws Exception {
        System.out.println(m);
        throw new Exception();
    }
}

Test.java 包含:

@SpringBootApplication
@ComponentScan("rabbit")
public class Test {

    public static void main(String[] args) {
        new SpringApplicationBuilder(Test.class).application().run(args);
    }
}

RabbitTemplate 添加 RetryTemplate 是为了重试 发布 消息。

要在消费者端添加重试,您必须向侦听器添加重试拦截器container's advice chain

由于您使用的是 @RabbitListener,建议链在 listener container factory @Bean 上,这意味着您必须自己声明一个,而不是依赖引导创建的默认建议。

无状态重试执行重试in-memory;有状态重试重新排队消息;它需要一个 messageId 属性(或其他一些机制来唯一标识消息)。