如何使用 reactor rabbitmq 处理死信队列

How to work with Dead letter queues with reactor rabbitmq

在我们的应用程序中,我们正在从 spring-amqp 转移到 reactor-rabbitmq,以便与应用程序的反应性一起很好地工作。我们一直在阅读来自项目反应器的 official guide。但是我不太确定,一旦重试耗尽,如何将消息发送到死信队列。

在之前的实现中,我们抛出 Spring-AMPQ 提供的 AmqpRejectAndDontRequeueException,这会导致消息自动进入死信队列,而无需使用死信队列的专用发布者。如何在 reactor-rabbitmq 中做类似的事情?我是否需要编写一个专门的发布者,在重试用尽后从听众那里调用,或者有其他方法可以处理它。另外,是否有关于 DLQ 和停车队列的官方项目反应器文档。

以下是两个版本的一些代码示例:

AMQP 版本:

@AllArgsConstructor
public class SampleListener {
    private static final Logger logger = LoggerFactory.getLogger(SampleListener.class);
    private final MessageListenerContainerFactory messageListenerContainerFactory;
    private final Jackson2JsonMessageConverter converter;

    @PostConstruct
    public void subscribe() {
        var mlc = messageListenerContainerFactory
                .createMessageListenerContainer(SAMPLE_QUEUE);
        MessageListener messageListener = message -> {
            try {
                TraceableMessage traceableMessage = (TraceableMessage) converter.fromMessage(message);
                ObjectMapper mapper = new ObjectMapper();
                mapper.registerModule(new JavaTimeModule());
                MyModel myModel = mapper.convertValue(traceableMessage.getMessage(), MyModel.class);
                MDC.put(CORRELATION_ID, traceableMessage.getCorrelationId());
                logger.info("Received message for id : {}", myModel.getId());
                processMessage(myModel)
                        .subscriberContext(ctx -> {
                            Optional<String> correlationId = Optional.ofNullable(MDC.get(CORRELATION_ID));
                            return correlationId.map(id -> ctx.put(CORRELATION_ID, id))
                                    .orElseGet(() -> ctx.put(CORRELATION_ID, UUID.randomUUID().toString()));
                        }).block();
                MDC.clear();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                throw new AmqpRejectAndDontRequeueException(e.getMessage(), e);
            }
        };
        mlc.setupMessageListener(messageListener);
        mlc.start();
    }

processMessage 正在执行业务逻辑,以防万一失败我想将其移至 DLQ。在 AMQP 的情况下工作正常。

Reactor RabbitMQ 版本:

@AllArgsConstructor
public class SampleListener {
    private static final Logger logger = LoggerFactory.getLogger(SampleListener.class);
    private final MessageListenerContainerFactory messageListenerContainerFactory;
    private final Jackson2JsonMessageConverter converter;

    @PostConstruct
    public void subscribe() {
        receiver.consumeAutoAck(SAMPLE_QUEUE)
                .subscribe(delivery -> {
                            TraceableMessage traceableMessage = Serializer.to(delivery.getBody(), TraceableMessage.class);
                            Mono.just(traceableMessage)
                                    .map(this::extractMyModel)
                                    .doOnNext(myModel -> logger.info("Received message for id : {}", myModel.getId()))
                                    .flatMap(this::processMessage)
                                    .doFinally(signalType -> MDC.clear())
                                    .retryWhen(Retry
                                            .fixedDelay(1, Duration.ofMillis(10000))
                                            .onRetryExhaustedThrow() //Move to DLQ
                                            .doAfterRetry(retrySignal -> {
                                                if ((retrySignal.totalRetries() + 1) >= 1) {
                                                    logger.info("Exhausted retries");
                                                    //Move to DLQ
                                                }
                                            }))
                                    .subscriberContext(ctx -> ctx.put(CORRELATION_ID, traceableMessage.getCorrelationId()))
                                    .subscribe();
                        }
                );

    }

有评论的两个地方之一//Move to DLQ,我猜消息应该去哪里 DLQ。这就是我决定不能再处理这个的地方。如果有不同的发布者推送到 DLQ 或任何特定设置可以自动处理它。

请告诉我。

我得到了这个问题的答案。我正在使用异步侦听器并使用 consumeAutoAck。当我切换到 consumeManualAck 时,我得到一个 AcknowledgebleDelivery,我可以在其中执行 nack(false),这应该将其移至死信队列。