拆分 EIP 中的 Camel 异常路由

Camel exception routing in a split EIP

我正在将一个 CSV 文件输入与一个更大的系统集成,我想将其设置为 CSV 的个别行(被 split 分割)无法正确解析被发送到医院队列,所有其他工作行都被转换为一个对象并聚合到一个列表中。但是,我无法将 split 中抛出异常的消息路由到其他地方,并且不会在拆分结束时出现在聚合器中。为了简化它,我编写了一个在拆分中抛出异常的单元测试,并且我试图让它工作。我将字符串 1\n2\n3\n 拆分为三个消息,对其中一个抛出异常,并在末尾连接剩余的字符串。

public void configure() throws Exception {

    onException(Exception.class)
        .handled(true)
        .to("log:dead?level=ERROR");

    from("direct:test")
        .split(body().tokenize("\n"), new MyAggregationStrategy())
            .process(new ThrowMyException())
        .end()
        .to("mock:out");
}

@Test
public void test() throws InterruptedException {

    MockEndpoint out = getMockEndpoint("mock:out");
    out.expectedMessageCount(1);

    template.sendBody("direct:test", "1\n2\n3\n");

    assertMockEndpointsSatisfied();
}

ThrowMyException 控制器:只是抛出一个异常

@Override
public void process(Exchange exchange) throws Exception {
    System.out.println("[" + exchange.getIn().getBody() + "]");
    if (exchange.getIn().getBody(String.class).trim().contentEquals("2")) {
        System.out.println("Throwing exception");
        throw new Exception();
    }
}

MyAggregationStrategy:只是连接字符串

@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
    if (oldExchange == null) {
        return newExchange;
    }
    oldExchange.getIn().setBody(
            oldExchange.getIn().getBody(String.class) + 
            newExchange.getIn().getBody(String.class));
    System.out.println(oldExchange.getIn().getBody());
    return oldExchange;
}

我期望的是 onException 处理程序,正如我所指出的那样,它是处理异常 (handled(true)) 的,它将在获取到消息“2”,我将得到聚合器打印出的结果 13。我确实得到了 onException 处理程序打印出的行:

ERROR dead - Exchange[ExchangePattern: InOnly, BodyType: String, Body: 2]

但是聚合器然后 returns '123' 而不是 '13'。

我也尝试过使用 deadLetterChannel

errorHandler(deadLetterChannel("log:dead?level=ERROR")

我也试过在 onException 处理程序中指定 continued(false),但没有成功。

我意识到我可以通过查看 exchange.getProperty(Exchange.EXCEPTION_CAUGHT) 在我的聚合器中查找异常,然后不聚合它。所以我确实有一个解决方案。但是,如果有一种方法可以在我的路线中使用 onExceptionerrorHandler,我会更喜欢那样。

Camel Documentation 对于 stopOnException 拆分器选项说

Whether or not to stop continue processing immediately when an exception occurred. If disable, then Camel continue splitting and process the sub-messages regardless if one of them failed. You can deal with exceptions in the AggregationStrategy class where you have full control how to handle that.

这和Camel In Action一起,建议两个splitter exception handling options选项来处理Aggregator中的异常,或者通过使用stopOnException(),这将阻止整个splitter迭代器继续.

您可以使用 exchange.getException() 方法或 exchange.isFailed() 来测试聚合器中的异常和错误。如果您在 onException 方法上使用了 handled(true),则必须使用 exchange.getProperty(Exchange.EXCEPTION_CAUGHT)