如何使用 reactor rabbitmq 处理死信队列
How to work with Dead letter queues with reactor rabbitmq
在我们的应用程序中,我们正在从 spring-amqp 转移到 reactor-rabbitmq,以便与应用程序的反应性一起很好地工作。我们一直在阅读来自项目反应器的 official guide。但是我不太确定,一旦重试耗尽,如何将消息发送到死信队列。
在之前的实现中,我们抛出 Spring-AMPQ 提供的 AmqpRejectAndDontRequeueException,这会导致消息自动进入死信队列,而无需使用死信队列的专用发布者。如何在 reactor-rabbitmq 中做类似的事情?我是否需要编写一个专门的发布者,在重试用尽后从听众那里调用,或者有其他方法可以处理它。另外,是否有关于 DLQ 和停车队列的官方项目反应器文档。
以下是两个版本的一些代码示例:
AMQP 版本:
@AllArgsConstructor
public class SampleListener {
private static final Logger logger = LoggerFactory.getLogger(SampleListener.class);
private final MessageListenerContainerFactory messageListenerContainerFactory;
private final Jackson2JsonMessageConverter converter;
@PostConstruct
public void subscribe() {
var mlc = messageListenerContainerFactory
.createMessageListenerContainer(SAMPLE_QUEUE);
MessageListener messageListener = message -> {
try {
TraceableMessage traceableMessage = (TraceableMessage) converter.fromMessage(message);
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
MyModel myModel = mapper.convertValue(traceableMessage.getMessage(), MyModel.class);
MDC.put(CORRELATION_ID, traceableMessage.getCorrelationId());
logger.info("Received message for id : {}", myModel.getId());
processMessage(myModel)
.subscriberContext(ctx -> {
Optional<String> correlationId = Optional.ofNullable(MDC.get(CORRELATION_ID));
return correlationId.map(id -> ctx.put(CORRELATION_ID, id))
.orElseGet(() -> ctx.put(CORRELATION_ID, UUID.randomUUID().toString()));
}).block();
MDC.clear();
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new AmqpRejectAndDontRequeueException(e.getMessage(), e);
}
};
mlc.setupMessageListener(messageListener);
mlc.start();
}
processMessage
正在执行业务逻辑,以防万一失败我想将其移至 DLQ。在 AMQP 的情况下工作正常。
Reactor RabbitMQ 版本:
@AllArgsConstructor
public class SampleListener {
private static final Logger logger = LoggerFactory.getLogger(SampleListener.class);
private final MessageListenerContainerFactory messageListenerContainerFactory;
private final Jackson2JsonMessageConverter converter;
@PostConstruct
public void subscribe() {
receiver.consumeAutoAck(SAMPLE_QUEUE)
.subscribe(delivery -> {
TraceableMessage traceableMessage = Serializer.to(delivery.getBody(), TraceableMessage.class);
Mono.just(traceableMessage)
.map(this::extractMyModel)
.doOnNext(myModel -> logger.info("Received message for id : {}", myModel.getId()))
.flatMap(this::processMessage)
.doFinally(signalType -> MDC.clear())
.retryWhen(Retry
.fixedDelay(1, Duration.ofMillis(10000))
.onRetryExhaustedThrow() //Move to DLQ
.doAfterRetry(retrySignal -> {
if ((retrySignal.totalRetries() + 1) >= 1) {
logger.info("Exhausted retries");
//Move to DLQ
}
}))
.subscriberContext(ctx -> ctx.put(CORRELATION_ID, traceableMessage.getCorrelationId()))
.subscribe();
}
);
}
有评论的两个地方之一//Move to DLQ
,我猜消息应该去哪里 DLQ。这就是我决定不能再处理这个的地方。如果有不同的发布者推送到 DLQ 或任何特定设置可以自动处理它。
请告诉我。
我得到了这个问题的答案。我正在使用异步侦听器并使用 consumeAutoAck
。当我切换到 consumeManualAck
时,我得到一个 AcknowledgebleDelivery
,我可以在其中执行 nack(false)
,这应该将其移至死信队列。
在我们的应用程序中,我们正在从 spring-amqp 转移到 reactor-rabbitmq,以便与应用程序的反应性一起很好地工作。我们一直在阅读来自项目反应器的 official guide。但是我不太确定,一旦重试耗尽,如何将消息发送到死信队列。
在之前的实现中,我们抛出 Spring-AMPQ 提供的 AmqpRejectAndDontRequeueException,这会导致消息自动进入死信队列,而无需使用死信队列的专用发布者。如何在 reactor-rabbitmq 中做类似的事情?我是否需要编写一个专门的发布者,在重试用尽后从听众那里调用,或者有其他方法可以处理它。另外,是否有关于 DLQ 和停车队列的官方项目反应器文档。
以下是两个版本的一些代码示例:
AMQP 版本:
@AllArgsConstructor
public class SampleListener {
private static final Logger logger = LoggerFactory.getLogger(SampleListener.class);
private final MessageListenerContainerFactory messageListenerContainerFactory;
private final Jackson2JsonMessageConverter converter;
@PostConstruct
public void subscribe() {
var mlc = messageListenerContainerFactory
.createMessageListenerContainer(SAMPLE_QUEUE);
MessageListener messageListener = message -> {
try {
TraceableMessage traceableMessage = (TraceableMessage) converter.fromMessage(message);
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
MyModel myModel = mapper.convertValue(traceableMessage.getMessage(), MyModel.class);
MDC.put(CORRELATION_ID, traceableMessage.getCorrelationId());
logger.info("Received message for id : {}", myModel.getId());
processMessage(myModel)
.subscriberContext(ctx -> {
Optional<String> correlationId = Optional.ofNullable(MDC.get(CORRELATION_ID));
return correlationId.map(id -> ctx.put(CORRELATION_ID, id))
.orElseGet(() -> ctx.put(CORRELATION_ID, UUID.randomUUID().toString()));
}).block();
MDC.clear();
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new AmqpRejectAndDontRequeueException(e.getMessage(), e);
}
};
mlc.setupMessageListener(messageListener);
mlc.start();
}
processMessage
正在执行业务逻辑,以防万一失败我想将其移至 DLQ。在 AMQP 的情况下工作正常。
Reactor RabbitMQ 版本:
@AllArgsConstructor
public class SampleListener {
private static final Logger logger = LoggerFactory.getLogger(SampleListener.class);
private final MessageListenerContainerFactory messageListenerContainerFactory;
private final Jackson2JsonMessageConverter converter;
@PostConstruct
public void subscribe() {
receiver.consumeAutoAck(SAMPLE_QUEUE)
.subscribe(delivery -> {
TraceableMessage traceableMessage = Serializer.to(delivery.getBody(), TraceableMessage.class);
Mono.just(traceableMessage)
.map(this::extractMyModel)
.doOnNext(myModel -> logger.info("Received message for id : {}", myModel.getId()))
.flatMap(this::processMessage)
.doFinally(signalType -> MDC.clear())
.retryWhen(Retry
.fixedDelay(1, Duration.ofMillis(10000))
.onRetryExhaustedThrow() //Move to DLQ
.doAfterRetry(retrySignal -> {
if ((retrySignal.totalRetries() + 1) >= 1) {
logger.info("Exhausted retries");
//Move to DLQ
}
}))
.subscriberContext(ctx -> ctx.put(CORRELATION_ID, traceableMessage.getCorrelationId()))
.subscribe();
}
);
}
有评论的两个地方之一//Move to DLQ
,我猜消息应该去哪里 DLQ。这就是我决定不能再处理这个的地方。如果有不同的发布者推送到 DLQ 或任何特定设置可以自动处理它。
请告诉我。
我得到了这个问题的答案。我正在使用异步侦听器并使用 consumeAutoAck
。当我切换到 consumeManualAck
时,我得到一个 AcknowledgebleDelivery
,我可以在其中执行 nack(false)
,这应该将其移至死信队列。