骆驼分裂后异常不会升级

After Camel split Exception won't be escalated

我们在 Camel 中定义了一个路由,并且必须找出处理器中是否抛出异常。当我们只有一个处理器时,Camel 会在 sendBody() 方法中重新抛出异常。如果前面有 split/aggregate,则不会抛出异常。所以下面例子的结果是

before throwing Exception

after sendBody

如果我省略从 .split 到 .completionSize(1) 的所有内容,则输出为

before throwing Exception

Exception thrown

任何想法,如果在拆分后发生异常,如何找出?

private static final String DIRECT_START = "direct:start";

public static void main(String[] args) throws Exception {
    CamelContext context = new DefaultCamelContext();

    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {

            from(DIRECT_START)
            .split(body())
                .aggregate(constant(true), new AggregationStrategy() {
                    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                        return oldExchange == null ? newExchange : oldExchange;
                    }
                })
                .completionSize(1)
            .process(new Processor() {
                public void process(Exchange exchange) throws Exception {
                    System.out.println("before throwing Exception");
                    exchange.setException(new Exception());
                    throw new Exception("my Exception");
                }
            });
        }});        

    context.start();

    ProducerTemplate producer = context.createProducerTemplate();
    try {
        producer.sendBody(DIRECT_START, Integer.valueOf(42));
        System.out.println("after sendBody");
    } catch (Exception e) {
        System.out.println("Exception thrown");
    }

    context.stop();

}

为了事后检查异常,我们找到了解决办法。我们向 onException() 注册了一个 ErrorProcessor,它将状态设置到上下文属性中。

但这不会中断 producer.sendBody(..)。我们有超长 运行 处理器,我们必须中断它们。

所以问题是,我们可以配置 Camel 以在 sendBody 中抛出异常,还是可以在 Exceptionhandler 中执行此操作?

我强烈推荐 Camel in Action(第 8.3.5 节)中关于 Splitter EIP 和异常处理的好章节。该部分解释说:

When using a custom AggregationStrategy with the Splitter, it's important to know that you're responsible for handling exceptions. If you don't propagate the exception back, the Splitter will assume you have handled the exception, and ignore it.

您在未指定聚合器的情况下使用了 split() 方法。在 Camel documentation 中,他们指定

The splitter will by default return the original input message

这意味着离开 split() 方法的交换没有异常,因此没有异常传播回您的调用代码。您从处理器抛出的异常在技术上位于拆分器内部。即使您使用了聚合器,它也没有与 split 调用相关联,并且您没有明确以 end() 结束 split。因此,当您的处理器抛出异常时,拆分器会忽略它,因为您没有提供聚合器来处理和传播异常。

我们可以通过将您的聚合策略作为参数传递给 split 调用来进行测试,如下所示:

.split(body(), new AggregationStrategy() {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        System.out.println("Aggregating");
        return oldExchange == null ? newExchange : oldExchange;
    }
})
    .log("test") // inside the split/aggregator EIP
.end() // outside the split/aggregator EIP
.process(new Processor() {
    public void process(Exchange exchange) throws Exception {
        System.out.println("before throwing Exception");
        throw new Exception("my Exception");
    }
});

你会得到输出:

test
Aggregating
before throwing Exception
Exception thrown

如果你希望处理器在 split/aggregator EIP 中,像这样:

.split(body(), new AggregationStrategy() {
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        System.out.println("Aggregating");
        return oldExchange == null ? newExchange : oldExchange;
    }
})
    .process(new Processor() { // inside the split/aggregator EIP
        public void process(Exchange exchange) throws Exception {
            System.out.println("before throwing Exception");
            throw new Exception("my Exception");
        }
    })
.end(); // outside the split/aggregator EIP

您将得到输出:

before throwing Exception
Aggregating
Exception thrown

请注意,在 split/aggregator EIP 中,聚合器在抛出异常后如何变为 运行?这很重要,因为没有聚合器传递异常,拆分器将忽略它。为了使其工作,您需要在聚合器内正确传播您的异常。例如,在您的代码中,如果 newExchange 包含异常,它将被忽略,因为您没有传播它。您需要更改聚合器以添加:

if (newExchange.getException() != null) {
    oldExchange.setException(newExchange.getException());
}

注意:如果您在拆分的 EIP 中有一个 onException() 调用,并且您将异常设置为正在处理,那么当您调用 getException() 时它将不再 return。所以如果你想处理你的异常,但仍然通过聚合器传播它们,你可以使用 exchange.getProperty(Exchange.EXCEPTION_CAUGHT);

你也可以使用.stopOnException(),像这样:

.split(body()).stopOnException()
    .process(new Processor() { // inside the split/aggregator EIP
        public void process(Exchange exchange) throws Exception {
            System.out.println("before throwing Exception");
            throw new Exception("my Exception");
        }
    });

这会导致拆分在出现异常时停止并传播它。但是,当您在 stopOnException() 之后放置聚合器时,它不再有效。我不完全确定为什么。我猜这是因为聚合器改变了 exchange 对象。

另请注意,您无需在处理器中将例外设置为交换。当处理器抛出异常时,Camel 会为你做这件事。因此处理器中的行 exchange.setException(new Exception()); 不是必需的。

tl;dr 所以是的,您可以从拆分内部将异常传播到调用方法。您只需要确保它是通过与拆分关联的聚合器或设置 stopOnException() 完成的。这取决于你想用 splitting/aggregating/processing 实现什么,最好的方法是什么。