无法在 apache camel 中测试 deadLetterChannel

Cannot test deadLetterChannel in apache camel

我正在尝试在 apache camel 中测试 deadLetterChanel

这是生产代码片段(我的 RouterBuilder 的一部分):

from(dateQueue())
        .routeId(ROUTE_DATE)
        .to(log(myConsumer))
        .transform().body(String.class, it -> myConsumer.consume(LocalDate.parse(it)))
        .split(body())
        .to(log(myConsumer))
        .marshal().json()
        .to(processorExchange());

errorHandler(deadLetterChannel(dlqDirect()));

// @formatter:off
from(dlqDirect())
        .to("log:dlq")
        .choice()
            .when(it -> ROUTE_DATE.equals(it.getFromRouteId())).to(dateDLExchange())
            // other when expressions here 
        .end();
// @formatter:on

这是测试代码:

@Test
void testErrorCaseOfDateRoute_ShouldGoToDateDlExchange() throws Exception {
    // given:
    MockEndpoint dateDLExchangeMock = camelContext.getEndpoint("mock:test", MockEndpoint.class);
    AdviceWith.adviceWith(camelContext, ROUTE_DATE,
            in -> in.interceptSendToEndpoint(dateDLExchange()).to(dateDLExchangeMock)
    );
    dateDLExchangeMock.expectedMessageCount(1);
    dateDLExchangeMock.expectedBodiesReceived("invalid date");

    // when:
    producerTemplate.sendBody(dateExchange(), "invalid date");

    // then:
    dateDLExchangeMock.assertIsSatisfied();
}

断言失败!

但是,如果我将模拟更改为拦截 dlqDirect()(直接端点)而不是 dateDLExchange()rabbitmq 端点),它工作正常,但我想包括 choice() 在我的测试中也起作用。

我正在 TestContainer 中使用 RabbitMQ 进行小型集成测试,所以我真的想在 rabbitmq 实例中监听队列并在那里声明消息,而不是使用 Camel 进行测试! (骆驼可以帮忙吗?)

It worth noting that when I test this case manullay, I got my invalid message into the rabbitmq date_dl_queue which is bound to date_dl_exchange. ( I mean, it works)

我通过使用路由范围 errorHandler 解决了它,而不是对所有路由使用通用错误处理程序,然后使用 when 表达式。

因此,代码更改为如下所示:

from(dateQueue())
        .routeId(ROUTE_DATE)
        .errorHandler(buildErrorHandler(dateDLExchange()))          // <<< Notice this
        .to(log(myConsumer))
        .transform().body(String.class, it -> myConsumer.consume(LocalDate.parse(it)))
        .split(body())
        .to(log(myConsumer))
        .marshal().json()
        .to(processorExchange());


private DefaultErrorHandlerBuilder buildErrorHandler(String deadLetterUri) {
    return deadLetterChannel(deadLetterUri)
            .logExhausted(true)
            .logHandled(true)
            .....
            .;
}