使用基于时间的 PollingConsumer 到直接端点
Using a timing based PollingConsumer to a direct endpoint
在功能上,我希望在从 JMS (WMQ) 端点消费之前检查 URL 是否处于活动状态。
如果无法到达 URL 或服务器错误,那么我不想从队列中取货。所以我想通过轮询消费者继续尝试(无限制重试) URL 。因此,只要它可用,我就可以从 JMS 取货。
我有一个设置了直接端点的 RouteBuilder,它被配置为 运行 一个将 ping 服务的处理器。
所以:
public class PingRoute extends RouteBuilder {
@Override
public void configureCamel() {
from("direct:pingRoute").routeId(PingRoute.class.getSimpleName())
.process(new PingProcessor(url))
.to("log://PingRoute?showAll=true");
}
}
在另一条路线中,我正在设置我的计时器:
@Override
public void configureCamel() {
from(timerEndpoint).beanRef(PollingConsumerBean.class.getSimpleName(), "checkPingRoute");
...
}
并且 PollingConsumerBean
我正在尝试通过消费者接收正文:
public void checkPingRoute(){
// loop to check the consumer. Check we can carry on with the pick up from the JMS queue.
while(true){
Boolean pingAvailable = consumer.receiveBody("direct:pingRoute", Boolean.class);
...
}
我将路由添加到上下文中并使用生产者发送:
context.addRoutes(new PingRoute());
context.start();
producer.sendBody(TimerPollingRoute.TIMER_POLLING_ROUTE_ENDPOINT, "a body");
我得到以下 IllegalArgumentException
:
Cannot add a 2nd consumer to the same endpoint. Endpoint Endpoint[direct://pingRoute] only allows one consumer.
有没有办法将直接路由设置为轮询消费者?
不幸的是,业务逻辑不是很清楚。据我了解-您需要等待服务的响应。恕我直言,您必须使用 Content Enricher EIP http://camel.apache.org/content-enricher.html 。 pollEnrich
是您在定时器路由中所需要的。
.pollEnrich("direct:waitForResponce", -1)
或
.pollEnrich("seda:waitForResponce", -1)
public class PingRoute extends RouteBuilder {
@Override
public void configureCamel() {
from("direct:pingRoute").routeId(PingRoute.class.getSimpleName())
.process(new PingProcessor(url))
.choice().when(body())
.to("log://PingRoute?showAll=true")
.to("direct:waitForResponce")
.otherwise()
.to("direct:pingRoute")
.end();
}
};
定时器:
@Override
public void configureCamel() {
from(timerEndpoint)
.inOnly("direct:pingRoute")
.pollEnrich("direct:waitForResponce", -1)
...
}
我很难弄清楚你到底想做什么,但在我看来你想在一个时间间隔内使用来自端点的数据。为此,最好的模式是轮询消费者:http://camel.apache.org/polling-consumer.html
您当前收到的错误是因为您有两个消费者都试图从 "direct://pingRoute" 中读取如果这是有意的,您可以将直接更改为 seda://pingRoute 以便它在内存队列中您的数据将在。
根据他们的用例,他们有几个问题需要解决:
- 当且仅当对 URL 的 ping 为 正 .
时,才使用来自 JMS 队列的消息
- 如果 URL 没有响应,JMS 消息不应从队列中消失并且必须进行 重试,直到 URL 再次响应,在这种情况下,消息将最终被消费。
- OP 没有指定重试次数是有限还是无限。
基于这个问题场景,我建议重新设计他们的解决方案,利用 ActiveMQ retries, broker-side redelivery and JMS transactions in Camel 来:
如果 URL ping 失败(通过事务回滚),- Return 消息到队列。
- 确保消息不丢失(通过使用 JMS 持久性和 broker-side 重新传递,AMQ 将持久地安排重试周期)。
- 能够为每条消息指定复杂的重试周期,例如具有指数退避、最大重试次数等
- 如果重试周期已用完但没有肯定结果,可选择将消息发送到 Dead Letter Queue,以便可以计划其他一些(可能是手动的)操作。
现在,implementation-wise:
from("activemq:queue:abc?transacted=true") // (1)
.to("http4://host.endpoint.com/foo?method=GET") // (2) (3)
.process(new HandleSuccess()); // (4)
评论:
- 注意
transacted
标志。
- 如果 HTTP 调用失败,HTTP4 端点将引发异常。
- 由于没有配置异常处理程序,Camel 会将异常传播到消费者端点 (
activemq
),这将回滚 事务。
- 如果调用成功,流程将继续,交换主体现在将包含 HTTP 服务器返回的有效负载,您可以按照您希望的方式处理它。这里我使用的是处理器。
接下来,重要的是在 ActiveMQ 中配置重新传送策略,并启用 broker-side 重新传送。您在 activemq.xml
配置文件中执行此操作:
<plugins>
<redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true">
<redeliveryPolicyMap>
<redeliveryPolicyMap>
<redeliveryPolicyEntries>
<redeliveryPolicy queue="my.queue"
initialRedeliveryDelay="30000"
maximumRedeliveries="17"
maximumRedeliveryDelay="259200000"
redeliveryDelay="30000"
useExponentialBackOff="true"
backOffMultiplier="2" />
</redeliveryPolicyEntries>
</redeliveryPolicyMap>
</redeliveryPolicyMap>
</redeliveryPlugin>
</plugins>
并确保在 top-level <broker />
元素中启用了调度程序支持:
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="mybroker"
schedulerSupport="true">
...
</broker>
希望对您有所帮助。
编辑 1: OP 使用 IBM WebSphere MQ 作为代理,我错过了。您可以使用 JMS QueueBrowser 来查看消息并在实际使用消息之前尝试其相应的 URLs,但不可能有选择地使用单个消息 – 这不是 MOM(messaging-oriented 中间件)是关于。
所以我坚持认为您应该探索 JMS 事务,而不是让代理重新传送消息,您可以在 TX 主体本身内开始对 URL 的 ping 周期。关于骆驼,您可以按如下方式实现它:
from("jms:queue:myqueue?transacted=true")
.bean(new UrlPinger());
UrlPinger.java:
public class UrlPinger {
@EndpointInject
private ProducerTemplate template;
private Pattern pattern = Pattern.compile("^(http(?:s)?)\:");
@Handler
public void pingUrl(@Body String url, CamelContext context) throws InterruptedException {
// Replace http(s): with http(s)4: to use the Camel HTTP4 endpoint.
Matcher m = pattern.matcher(url);
if (m.matches()) {
url = m.replaceFirst(m.group(1) + "4:");
}
// Try forever until the status code is 200.
while (getStatusCode(url, context) != 200) {
Thread.sleep(5000);
}
}
private int getStatusCode(String url, CamelContext context) {
Exchange response = template.request(url + "?method=GET&throwExceptionOnFailure=false", new Processor() {
@Override public void process(Exchange exchange) throws Exception {
// No body since this is a GET request.
exchange.getIn().getBody(null);
}
});
return response.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE, Integer.class);
}
}
备注:
- 注意
throwExceptionOnFailure=false
选项。不会引发异常,因此循环将执行直到条件为真。
- 在 bean 内部,我一直在循环,直到 HTTP 状态为 200。当然,您的逻辑会有所不同。
- 在尝试和尝试之间,我睡了 5000 毫秒。
- 我假设要 ping 的 URL 在传入 JMS 消息的正文中。我将
http(s):
替换为 http(s)4:
以使用 Camel HTTP4 endpoint.
- 在 TX 内部执行 ping 保证只有在 ping 条件为真时才会使用消息(在这种情况下 HTTP 状态 == 200)。
- 您可能想要引入终止条件(您不想永远尝试下去)。也许引入一些后退,以免对方不知所措。
- 如果 Camel 或代理在重试周期内出现故障,消息将自动回滚。
- 考虑到 JMS 事务是
Session
绑定的,因此如果您想启动多个并发消费者(concurrentConsumers
JMS 端点选项),您需要设置 cacheLevelName=CACHE_NONE
为每个线程使用不同的 JMS Session
.
这里的所有答案都为我指明了正确的方向,但我最终想出了一个适合我们的代码库和框架的解决方案。
首先,我发现不需要 bean 来充当轮询消费者,而是可以使用处理器。
@Override
public void configureCamel() {
from("timer://fnzPoller?period=2000&delay=2000").processRef(UrlPingProcessor.class.getSimpleName())
.processRef(StopStartProcessor.class.getSimpleName()).to("log://TimerPollingRoute?showAll=true");
}
然后在 UrlPingProcessor
中有 CXF 服务来 ping url 并且可以检查响应:
@Override
public void process(Exchange exchange) {
try {
// CXF service
FnzPingServiceImpl fnzPingService = new FnzPingServiceImpl(url);
fnzPingService.getPing();
} catch (WebApplicationException e) {
int responseCode = e.getResponse().getStatus();
boolean isValidResponseCode = ResponseCodeUtil.isResponseCodeValid(responseCode);
if (!isValidResponseCode) {
// Sets a flag to stop for the StopStartProcessor
stopRoute(exchange);
}
}
}
然后在 StopStartProcessor
中使用 ExecutorService
通过新线程停止或启动路由。:
@Override
public void process(final Exchange exchange) {
// routeBuilder is set on the constructor.
final String routeId = routeBuilder.getClass().getSimpleName();
Boolean stopRoute = ExchangeHeaderUtil.getHeader(exchange, Exchange.ROUTE_STOP, Boolean.class);
boolean stopRoutePrim = BooleanUtils.isTrue(stopRoute);
if (stopRoutePrim) {
StopRouteThread stopRouteThread = new StopRouteThread(exchange, routeId);
executorService.execute(stopRouteThread);
} else {
CamelContext context = exchange.getContext();
Route route = context.getRoute(routeId);
if (route == null) {
try {
context.addRoutes(routeBuilder);
} catch (Exception e) {
String msg = "Unable to add a route: " + routeBuilder;
LOGGER.warn(msg, e);
}
}
}
}
在功能上,我希望在从 JMS (WMQ) 端点消费之前检查 URL 是否处于活动状态。
如果无法到达 URL 或服务器错误,那么我不想从队列中取货。所以我想通过轮询消费者继续尝试(无限制重试) URL 。因此,只要它可用,我就可以从 JMS 取货。
我有一个设置了直接端点的 RouteBuilder,它被配置为 运行 一个将 ping 服务的处理器。
所以:
public class PingRoute extends RouteBuilder {
@Override
public void configureCamel() {
from("direct:pingRoute").routeId(PingRoute.class.getSimpleName())
.process(new PingProcessor(url))
.to("log://PingRoute?showAll=true");
}
}
在另一条路线中,我正在设置我的计时器:
@Override
public void configureCamel() {
from(timerEndpoint).beanRef(PollingConsumerBean.class.getSimpleName(), "checkPingRoute");
...
}
并且 PollingConsumerBean
我正在尝试通过消费者接收正文:
public void checkPingRoute(){
// loop to check the consumer. Check we can carry on with the pick up from the JMS queue.
while(true){
Boolean pingAvailable = consumer.receiveBody("direct:pingRoute", Boolean.class);
...
}
我将路由添加到上下文中并使用生产者发送:
context.addRoutes(new PingRoute());
context.start();
producer.sendBody(TimerPollingRoute.TIMER_POLLING_ROUTE_ENDPOINT, "a body");
我得到以下 IllegalArgumentException
:
Cannot add a 2nd consumer to the same endpoint. Endpoint Endpoint[direct://pingRoute] only allows one consumer.
有没有办法将直接路由设置为轮询消费者?
不幸的是,业务逻辑不是很清楚。据我了解-您需要等待服务的响应。恕我直言,您必须使用 Content Enricher EIP http://camel.apache.org/content-enricher.html 。 pollEnrich
是您在定时器路由中所需要的。
.pollEnrich("direct:waitForResponce", -1)
或
.pollEnrich("seda:waitForResponce", -1)
public class PingRoute extends RouteBuilder {
@Override
public void configureCamel() {
from("direct:pingRoute").routeId(PingRoute.class.getSimpleName())
.process(new PingProcessor(url))
.choice().when(body())
.to("log://PingRoute?showAll=true")
.to("direct:waitForResponce")
.otherwise()
.to("direct:pingRoute")
.end();
}
};
定时器:
@Override
public void configureCamel() {
from(timerEndpoint)
.inOnly("direct:pingRoute")
.pollEnrich("direct:waitForResponce", -1)
...
}
我很难弄清楚你到底想做什么,但在我看来你想在一个时间间隔内使用来自端点的数据。为此,最好的模式是轮询消费者:http://camel.apache.org/polling-consumer.html
您当前收到的错误是因为您有两个消费者都试图从 "direct://pingRoute" 中读取如果这是有意的,您可以将直接更改为 seda://pingRoute 以便它在内存队列中您的数据将在。
根据他们的用例
- 当且仅当对 URL 的 ping 为 正 . 时,才使用来自 JMS 队列的消息
- 如果 URL 没有响应,JMS 消息不应从队列中消失并且必须进行 重试,直到 URL 再次响应,在这种情况下,消息将最终被消费。
- OP 没有指定重试次数是有限还是无限。
基于这个问题场景,我建议重新设计他们的解决方案,利用 ActiveMQ retries, broker-side redelivery and JMS transactions in Camel 来:
-
如果 URL ping 失败(通过事务回滚),
- Return 消息到队列。
- 确保消息不丢失(通过使用 JMS 持久性和 broker-side 重新传递,AMQ 将持久地安排重试周期)。
- 能够为每条消息指定复杂的重试周期,例如具有指数退避、最大重试次数等
- 如果重试周期已用完但没有肯定结果,可选择将消息发送到 Dead Letter Queue,以便可以计划其他一些(可能是手动的)操作。
现在,implementation-wise:
from("activemq:queue:abc?transacted=true") // (1)
.to("http4://host.endpoint.com/foo?method=GET") // (2) (3)
.process(new HandleSuccess()); // (4)
评论:
- 注意
transacted
标志。 - 如果 HTTP 调用失败,HTTP4 端点将引发异常。
- 由于没有配置异常处理程序,Camel 会将异常传播到消费者端点 (
activemq
),这将回滚 事务。 - 如果调用成功,流程将继续,交换主体现在将包含 HTTP 服务器返回的有效负载,您可以按照您希望的方式处理它。这里我使用的是处理器。
接下来,重要的是在 ActiveMQ 中配置重新传送策略,并启用 broker-side 重新传送。您在 activemq.xml
配置文件中执行此操作:
<plugins>
<redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true">
<redeliveryPolicyMap>
<redeliveryPolicyMap>
<redeliveryPolicyEntries>
<redeliveryPolicy queue="my.queue"
initialRedeliveryDelay="30000"
maximumRedeliveries="17"
maximumRedeliveryDelay="259200000"
redeliveryDelay="30000"
useExponentialBackOff="true"
backOffMultiplier="2" />
</redeliveryPolicyEntries>
</redeliveryPolicyMap>
</redeliveryPolicyMap>
</redeliveryPlugin>
</plugins>
并确保在 top-level <broker />
元素中启用了调度程序支持:
<broker xmlns="http://activemq.apache.org/schema/core"
brokerName="mybroker"
schedulerSupport="true">
...
</broker>
希望对您有所帮助。
编辑 1: OP 使用 IBM WebSphere MQ 作为代理,我错过了。您可以使用 JMS QueueBrowser 来查看消息并在实际使用消息之前尝试其相应的 URLs,但不可能有选择地使用单个消息 – 这不是 MOM(messaging-oriented 中间件)是关于。
所以我坚持认为您应该探索 JMS 事务,而不是让代理重新传送消息,您可以在 TX 主体本身内开始对 URL 的 ping 周期。关于骆驼,您可以按如下方式实现它:
from("jms:queue:myqueue?transacted=true")
.bean(new UrlPinger());
UrlPinger.java:
public class UrlPinger {
@EndpointInject
private ProducerTemplate template;
private Pattern pattern = Pattern.compile("^(http(?:s)?)\:");
@Handler
public void pingUrl(@Body String url, CamelContext context) throws InterruptedException {
// Replace http(s): with http(s)4: to use the Camel HTTP4 endpoint.
Matcher m = pattern.matcher(url);
if (m.matches()) {
url = m.replaceFirst(m.group(1) + "4:");
}
// Try forever until the status code is 200.
while (getStatusCode(url, context) != 200) {
Thread.sleep(5000);
}
}
private int getStatusCode(String url, CamelContext context) {
Exchange response = template.request(url + "?method=GET&throwExceptionOnFailure=false", new Processor() {
@Override public void process(Exchange exchange) throws Exception {
// No body since this is a GET request.
exchange.getIn().getBody(null);
}
});
return response.getIn().getHeader(Exchange.HTTP_RESPONSE_CODE, Integer.class);
}
}
备注:
- 注意
throwExceptionOnFailure=false
选项。不会引发异常,因此循环将执行直到条件为真。 - 在 bean 内部,我一直在循环,直到 HTTP 状态为 200。当然,您的逻辑会有所不同。
- 在尝试和尝试之间,我睡了 5000 毫秒。
- 我假设要 ping 的 URL 在传入 JMS 消息的正文中。我将
http(s):
替换为http(s)4:
以使用 Camel HTTP4 endpoint. - 在 TX 内部执行 ping 保证只有在 ping 条件为真时才会使用消息(在这种情况下 HTTP 状态 == 200)。
- 您可能想要引入终止条件(您不想永远尝试下去)。也许引入一些后退,以免对方不知所措。
- 如果 Camel 或代理在重试周期内出现故障,消息将自动回滚。
- 考虑到 JMS 事务是
Session
绑定的,因此如果您想启动多个并发消费者(concurrentConsumers
JMS 端点选项),您需要设置cacheLevelName=CACHE_NONE
为每个线程使用不同的 JMSSession
.
这里的所有答案都为我指明了正确的方向,但我最终想出了一个适合我们的代码库和框架的解决方案。
首先,我发现不需要 bean 来充当轮询消费者,而是可以使用处理器。
@Override
public void configureCamel() {
from("timer://fnzPoller?period=2000&delay=2000").processRef(UrlPingProcessor.class.getSimpleName())
.processRef(StopStartProcessor.class.getSimpleName()).to("log://TimerPollingRoute?showAll=true");
}
然后在 UrlPingProcessor
中有 CXF 服务来 ping url 并且可以检查响应:
@Override
public void process(Exchange exchange) {
try {
// CXF service
FnzPingServiceImpl fnzPingService = new FnzPingServiceImpl(url);
fnzPingService.getPing();
} catch (WebApplicationException e) {
int responseCode = e.getResponse().getStatus();
boolean isValidResponseCode = ResponseCodeUtil.isResponseCodeValid(responseCode);
if (!isValidResponseCode) {
// Sets a flag to stop for the StopStartProcessor
stopRoute(exchange);
}
}
}
然后在 StopStartProcessor
中使用 ExecutorService
通过新线程停止或启动路由。:
@Override
public void process(final Exchange exchange) {
// routeBuilder is set on the constructor.
final String routeId = routeBuilder.getClass().getSimpleName();
Boolean stopRoute = ExchangeHeaderUtil.getHeader(exchange, Exchange.ROUTE_STOP, Boolean.class);
boolean stopRoutePrim = BooleanUtils.isTrue(stopRoute);
if (stopRoutePrim) {
StopRouteThread stopRouteThread = new StopRouteThread(exchange, routeId);
executorService.execute(stopRouteThread);
} else {
CamelContext context = exchange.getContext();
Route route = context.getRoute(routeId);
if (route == null) {
try {
context.addRoutes(routeBuilder);
} catch (Exception e) {
String msg = "Unable to add a route: " + routeBuilder;
LOGGER.warn(msg, e);
}
}
}
}