交易骆驼路由标记交换基于表达式不起作用

Transactional Camel route marking exchange as handled based on expression not working

我的骆驼路线正在以事务方式使用和生成 from/to JMS。我们的要求是如果多次处理失败则丢弃有毒消息。我知道更好的选择是将消息移动到死信队列,但出于本练习的目的,丢弃它是很好的选择。

下面是模拟问题的路由定义:

package com.my.comp.playground;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;

@Component
public class MyRouteBuilder extends RouteBuilder {
    @Override
    public void configure() {
        onException(Exception.class)
                .process(new Processor() {
                    private int failureCounter = 0;
                    @Override
                    public void process(Exchange exchange) {
                        exchange.getIn().setHeader("failureCounter", ++failureCounter);
                    }
                })
                .log("failureCounter = ${header.failureCounter}")
                //.handled(true);
                .handled(header("failureCounter").isGreaterThan(3));

        from("jms:test.queue")
                .routeId("test-route")
                .transacted()
                .process(exchange -> {
                    throw new RuntimeException("No good Pal!");
                })
                .to("mock:discard");
    }
}

所以我想做的是保留一个失败计数器,如果该计数器大于某个数字,则将异常标记为已处理并提交事务。

注意异常处理最后的两行代码:

 //.handled(true);
 .handled(header("failureCounter").isGreaterThan(3));

当我 运行 我的路线与 header("failureCounter").isGreaterThan(3) 处理条件时,消息永远一次又一次地回滚,我可以在日志中看到 failureCounter 正确地增加了:

...
[mer[test.queue]] test-route                               : failureCounter = 402
[mer[test.queue]] o.a.c.p.e.DefaultErrorHandler            : Failed delivery for (MessageId: ...
...
[mer[test.queue]] test-route                               : failureCounter = 403
...
[mer[test.queue]] test-route                               : failureCounter = 404
...

然而,当我 运行 具有 true 处理条件的路由时,事务在第一次失败后立即提交,如下所示:

[mer[test.queue]] test-route                               : failureCounter = 1
[mer[test.queue]] o.a.c.s.spi.TransactionErrorHandler      : Transaction commit (0x52b2f795) redelivered(true)

所以我的问题是:我是做错了什么,还是我对如何使用 handled 异常的理解不正确?如果是这样,正确的方法是什么?

我不知道这是设计使然还是错误。

当我调试你的案例时,我看到 handled() 中的谓词是根据 Camel Exchange 评估的。

但是,您的 failureCounter header 不在 Exchange 中。因此,表达式 header("failureCounter") 的计算结果为 null 并且您的谓词始终为 false.

在一个简短的测试中,我看到 headers 在异常之前设置 ,但 headers 在异常之后设置(即在内部设置错误处理程序)不存在于用于评估谓词的交换中。

Burki 的评论是很好的观察,帮助我确定了根本原因。

尽管 camel DSL 建议 .handled(header("failureCounter").isGreaterThan(3)) 检查将在 运行 故障计数器增加处理器和日志输出之后发生,但实际上 .handled 是第一件事在该 onException(...) 分支中进行评估。这似乎有点误导,我必须承认我对这种方法感到有点失望。不过一定是有原因的。

给定的 camel 将为每个消息传递创建一个新的 Exchange 毫不奇怪,这将永远回滚。

为了解决这个问题,我立即想到了两个解决方案。

解决方案一: 它基于消息代理将设置 JMSRedelivered 消息 header 的事实,因此可以在此基础上增加消息计数器。所以新的路由配置如下所示:

@Override
public void configure() {
    onException(Exception.class)
            .handled(header("failureCounter").isGreaterThan(3));

    from("jms:test.queue")
            .routeId("test-route")
            .transacted()
            .process(new Processor() {
                private int failureCounter = 0;
                @Override
                public void process(Exchange exchange) throws Exception {
                    if (exchange.getIn().getHeader("JMSRedelivered", Boolean.class)) {
                        exchange.getIn().setHeader("failureCounter", ++failureCounter);
                    }
                }
            })
            .log("failureCounter = ${header.failureCounter}")
            .process(exchange -> {
                throw new RuntimeException("No good Pal!");
            })
            .to("mock:discard");
}

应用此更改后,邮件将在超过 3 次连续失败后被丢弃,如下面的日志所示:

46849 --- [mer[test.queue]] test-route                               : failureCounter = 4
46849 --- [mer[test.queue]] o.a.c.s.spi.TransactionErrorHandler      : Transaction commit (0x31b273b2) redelivered(true) for (MessageId: ID:414d5120514d31202020202020202020c083da5f02bd1d22 on ExchangeId: ID-C02XN27DJG5H-1608604365426-0-4))

解决方案 2:这更简单,但在这种情况下它是特定于消息代理的 IBM MQ。

@Override
public void configure() {
    onException(Exception.class)
            .log("failureCounter = ${header.JMSXDeliveryCount}")
            .handled(header("JMSXDeliveryCount").isGreaterThan(3));

    from("jms:test.queue")
            .routeId("test-route")
            .transacted()
            .process(exchange -> {
                throw new RuntimeException("No good Pal!");
            })
            .to("mock:discard");
}

并且日志将再次显示消息在放弃前被传递了四次。