交易骆驼路由标记交换基于表达式不起作用
Transactional Camel route marking exchange as handled based on expression not working
我的骆驼路线正在以事务方式使用和生成 from/to JMS。我们的要求是如果多次处理失败则丢弃有毒消息。我知道更好的选择是将消息移动到死信队列,但出于本练习的目的,丢弃它是很好的选择。
下面是模拟问题的路由定义:
package com.my.comp.playground;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() {
onException(Exception.class)
.process(new Processor() {
private int failureCounter = 0;
@Override
public void process(Exchange exchange) {
exchange.getIn().setHeader("failureCounter", ++failureCounter);
}
})
.log("failureCounter = ${header.failureCounter}")
//.handled(true);
.handled(header("failureCounter").isGreaterThan(3));
from("jms:test.queue")
.routeId("test-route")
.transacted()
.process(exchange -> {
throw new RuntimeException("No good Pal!");
})
.to("mock:discard");
}
}
所以我想做的是保留一个失败计数器,如果该计数器大于某个数字,则将异常标记为已处理并提交事务。
注意异常处理最后的两行代码:
//.handled(true);
.handled(header("failureCounter").isGreaterThan(3));
当我 运行 我的路线与 header("failureCounter").isGreaterThan(3)
处理条件时,消息永远一次又一次地回滚,我可以在日志中看到 failureCounter 正确地增加了:
...
[mer[test.queue]] test-route : failureCounter = 402
[mer[test.queue]] o.a.c.p.e.DefaultErrorHandler : Failed delivery for (MessageId: ...
...
[mer[test.queue]] test-route : failureCounter = 403
...
[mer[test.queue]] test-route : failureCounter = 404
...
然而,当我 运行 具有 true
处理条件的路由时,事务在第一次失败后立即提交,如下所示:
[mer[test.queue]] test-route : failureCounter = 1
[mer[test.queue]] o.a.c.s.spi.TransactionErrorHandler : Transaction commit (0x52b2f795) redelivered(true)
所以我的问题是:我是做错了什么,还是我对如何使用 handled
异常的理解不正确?如果是这样,正确的方法是什么?
我不知道这是设计使然还是错误。
当我调试你的案例时,我看到 handled()
中的谓词是根据 Camel Exchange 评估的。
但是,您的 failureCounter
header 不在 Exchange 中。因此,表达式 header("failureCounter")
的计算结果为 null
并且您的谓词始终为 false
.
在一个简短的测试中,我看到 headers 在异常之前设置 ,但 headers 在异常之后设置(即在内部设置错误处理程序)不存在于用于评估谓词的交换中。
Burki 的评论是很好的观察,帮助我确定了根本原因。
尽管 camel DSL 建议 .handled(header("failureCounter").isGreaterThan(3))
检查将在 运行 故障计数器增加处理器和日志输出之后发生,但实际上 .handled
是第一件事在该 onException(...)
分支中进行评估。这似乎有点误导,我必须承认我对这种方法感到有点失望。不过一定是有原因的。
给定的 camel 将为每个消息传递创建一个新的 Exchange
毫不奇怪,这将永远回滚。
为了解决这个问题,我立即想到了两个解决方案。
解决方案一:
它基于消息代理将设置 JMSRedelivered
消息 header 的事实,因此可以在此基础上增加消息计数器。所以新的路由配置如下所示:
@Override
public void configure() {
onException(Exception.class)
.handled(header("failureCounter").isGreaterThan(3));
from("jms:test.queue")
.routeId("test-route")
.transacted()
.process(new Processor() {
private int failureCounter = 0;
@Override
public void process(Exchange exchange) throws Exception {
if (exchange.getIn().getHeader("JMSRedelivered", Boolean.class)) {
exchange.getIn().setHeader("failureCounter", ++failureCounter);
}
}
})
.log("failureCounter = ${header.failureCounter}")
.process(exchange -> {
throw new RuntimeException("No good Pal!");
})
.to("mock:discard");
}
应用此更改后,邮件将在超过 3 次连续失败后被丢弃,如下面的日志所示:
46849 --- [mer[test.queue]] test-route : failureCounter = 4
46849 --- [mer[test.queue]] o.a.c.s.spi.TransactionErrorHandler : Transaction commit (0x31b273b2) redelivered(true) for (MessageId: ID:414d5120514d31202020202020202020c083da5f02bd1d22 on ExchangeId: ID-C02XN27DJG5H-1608604365426-0-4))
解决方案 2:这更简单,但在这种情况下它是特定于消息代理的 IBM MQ。
@Override
public void configure() {
onException(Exception.class)
.log("failureCounter = ${header.JMSXDeliveryCount}")
.handled(header("JMSXDeliveryCount").isGreaterThan(3));
from("jms:test.queue")
.routeId("test-route")
.transacted()
.process(exchange -> {
throw new RuntimeException("No good Pal!");
})
.to("mock:discard");
}
并且日志将再次显示消息在放弃前被传递了四次。
我的骆驼路线正在以事务方式使用和生成 from/to JMS。我们的要求是如果多次处理失败则丢弃有毒消息。我知道更好的选择是将消息移动到死信队列,但出于本练习的目的,丢弃它是很好的选择。
下面是模拟问题的路由定义:
package com.my.comp.playground;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() {
onException(Exception.class)
.process(new Processor() {
private int failureCounter = 0;
@Override
public void process(Exchange exchange) {
exchange.getIn().setHeader("failureCounter", ++failureCounter);
}
})
.log("failureCounter = ${header.failureCounter}")
//.handled(true);
.handled(header("failureCounter").isGreaterThan(3));
from("jms:test.queue")
.routeId("test-route")
.transacted()
.process(exchange -> {
throw new RuntimeException("No good Pal!");
})
.to("mock:discard");
}
}
所以我想做的是保留一个失败计数器,如果该计数器大于某个数字,则将异常标记为已处理并提交事务。
注意异常处理最后的两行代码:
//.handled(true);
.handled(header("failureCounter").isGreaterThan(3));
当我 运行 我的路线与 header("failureCounter").isGreaterThan(3)
处理条件时,消息永远一次又一次地回滚,我可以在日志中看到 failureCounter 正确地增加了:
...
[mer[test.queue]] test-route : failureCounter = 402
[mer[test.queue]] o.a.c.p.e.DefaultErrorHandler : Failed delivery for (MessageId: ...
...
[mer[test.queue]] test-route : failureCounter = 403
...
[mer[test.queue]] test-route : failureCounter = 404
...
然而,当我 运行 具有 true
处理条件的路由时,事务在第一次失败后立即提交,如下所示:
[mer[test.queue]] test-route : failureCounter = 1
[mer[test.queue]] o.a.c.s.spi.TransactionErrorHandler : Transaction commit (0x52b2f795) redelivered(true)
所以我的问题是:我是做错了什么,还是我对如何使用 handled
异常的理解不正确?如果是这样,正确的方法是什么?
我不知道这是设计使然还是错误。
当我调试你的案例时,我看到 handled()
中的谓词是根据 Camel Exchange 评估的。
但是,您的 failureCounter
header 不在 Exchange 中。因此,表达式 header("failureCounter")
的计算结果为 null
并且您的谓词始终为 false
.
在一个简短的测试中,我看到 headers 在异常之前设置 ,但 headers 在异常之后设置(即在内部设置错误处理程序)不存在于用于评估谓词的交换中。
Burki 的评论是很好的观察,帮助我确定了根本原因。
尽管 camel DSL 建议 .handled(header("failureCounter").isGreaterThan(3))
检查将在 运行 故障计数器增加处理器和日志输出之后发生,但实际上 .handled
是第一件事在该 onException(...)
分支中进行评估。这似乎有点误导,我必须承认我对这种方法感到有点失望。不过一定是有原因的。
给定的 camel 将为每个消息传递创建一个新的 Exchange
毫不奇怪,这将永远回滚。
为了解决这个问题,我立即想到了两个解决方案。
解决方案一:
它基于消息代理将设置 JMSRedelivered
消息 header 的事实,因此可以在此基础上增加消息计数器。所以新的路由配置如下所示:
@Override
public void configure() {
onException(Exception.class)
.handled(header("failureCounter").isGreaterThan(3));
from("jms:test.queue")
.routeId("test-route")
.transacted()
.process(new Processor() {
private int failureCounter = 0;
@Override
public void process(Exchange exchange) throws Exception {
if (exchange.getIn().getHeader("JMSRedelivered", Boolean.class)) {
exchange.getIn().setHeader("failureCounter", ++failureCounter);
}
}
})
.log("failureCounter = ${header.failureCounter}")
.process(exchange -> {
throw new RuntimeException("No good Pal!");
})
.to("mock:discard");
}
应用此更改后,邮件将在超过 3 次连续失败后被丢弃,如下面的日志所示:
46849 --- [mer[test.queue]] test-route : failureCounter = 4
46849 --- [mer[test.queue]] o.a.c.s.spi.TransactionErrorHandler : Transaction commit (0x31b273b2) redelivered(true) for (MessageId: ID:414d5120514d31202020202020202020c083da5f02bd1d22 on ExchangeId: ID-C02XN27DJG5H-1608604365426-0-4))
解决方案 2:这更简单,但在这种情况下它是特定于消息代理的 IBM MQ。
@Override
public void configure() {
onException(Exception.class)
.log("failureCounter = ${header.JMSXDeliveryCount}")
.handled(header("JMSXDeliveryCount").isGreaterThan(3));
from("jms:test.queue")
.routeId("test-route")
.transacted()
.process(exchange -> {
throw new RuntimeException("No good Pal!");
})
.to("mock:discard");
}
并且日志将再次显示消息在放弃前被传递了四次。