SpringAMQP - Retry/Resend 消息 dlx
SpringAMQP - Retry/Resend messages dlx
我正在尝试使用 DLX 的重试机制。
所以,基本上我想发送一条消息 3 次,而不是停止并让这条消息在 dlx 队列上停止;
我做了什么:
创建绑定到 WorkExchange 的 WorkQueue
创建了绑定到 RetryExchange
的 RetryQueue
WorkQueue -> 将 x-dead-letter-exchange 设置为 RetryExchange
RetryQueue -> 将 x-dead-letter-exchange 设置为 WorkExchange 并将 x-message-ttl 设置为 300000 毫秒(5 分钟)
所以,现在当我向 WorkQueue 发送任何消息时它失败了..这条消息进入 RetryQueue 5 分钟然后返回 WorkQueue..但它可能会一直失败,我想在 3 次尝试后停止它...
有可能吗?是否可以设置 RetryQueue 尝试 3 次后停止?
谢谢。
没有办法单独在代理中执行此操作。
您可以向侦听器添加代码 - 检查 x-death
header 以确定重试消息的次数,然后 discard/log 它(and/or 发送它当你想放弃时,你的听众中的第三个 queue)。
编辑
@SpringBootApplication
public class So59741067Application {
public static void main(String[] args) {
SpringApplication.run(So59741067Application.class, args);
}
@Bean
public Queue main() {
return QueueBuilder.durable("mainQueue")
.deadLetterExchange("")
.deadLetterRoutingKey("dlQueue")
.build();
}
@Bean
public Queue dlq() {
return QueueBuilder.durable("dlQueue")
.deadLetterExchange("")
.deadLetterRoutingKey("mainQueue")
.ttl(5_000)
.build();
}
@RabbitListener(queues = "mainQueue")
public void listen(String in,
@Header(name = "x-death", required = false) List<Map<String, ?>> xDeath) {
System.out.println(in + xDeath);
if (xDeath != null && (long) xDeath.get(0).get("count") > 2L) {
System.out.println("Given up on this one");
}
else {
throw new AmqpRejectAndDontRequeueException("test");
}
}
}
我正在尝试使用 DLX 的重试机制。 所以,基本上我想发送一条消息 3 次,而不是停止并让这条消息在 dlx 队列上停止;
我做了什么: 创建绑定到 WorkExchange 的 WorkQueue 创建了绑定到 RetryExchange
的 RetryQueueWorkQueue -> 将 x-dead-letter-exchange 设置为 RetryExchange
RetryQueue -> 将 x-dead-letter-exchange 设置为 WorkExchange 并将 x-message-ttl 设置为 300000 毫秒(5 分钟)
所以,现在当我向 WorkQueue 发送任何消息时它失败了..这条消息进入 RetryQueue 5 分钟然后返回 WorkQueue..但它可能会一直失败,我想在 3 次尝试后停止它...
有可能吗?是否可以设置 RetryQueue 尝试 3 次后停止?
谢谢。
没有办法单独在代理中执行此操作。
您可以向侦听器添加代码 - 检查 x-death
header 以确定重试消息的次数,然后 discard/log 它(and/or 发送它当你想放弃时,你的听众中的第三个 queue)。
编辑
@SpringBootApplication
public class So59741067Application {
public static void main(String[] args) {
SpringApplication.run(So59741067Application.class, args);
}
@Bean
public Queue main() {
return QueueBuilder.durable("mainQueue")
.deadLetterExchange("")
.deadLetterRoutingKey("dlQueue")
.build();
}
@Bean
public Queue dlq() {
return QueueBuilder.durable("dlQueue")
.deadLetterExchange("")
.deadLetterRoutingKey("mainQueue")
.ttl(5_000)
.build();
}
@RabbitListener(queues = "mainQueue")
public void listen(String in,
@Header(name = "x-death", required = false) List<Map<String, ?>> xDeath) {
System.out.println(in + xDeath);
if (xDeath != null && (long) xDeath.get(0).get("count") > 2L) {
System.out.println("Given up on this one");
}
else {
throw new AmqpRejectAndDontRequeueException("test");
}
}
}