Camel:如何使用 CamelContext enable/disable 端点
Camel: how to enable/disable endpoints using CamelContext
我正在使用 Camel 2.14.1 将消息路由到一堆端点,例如:
from(...)
...
.to(monitoringEndpointId, broadcastEndpointId);
我想在运行时动态地enable/disable这些endpoints/routes。到目前为止,我已经尝试使用 CamelContext
的 stopRoute
和 suspendRoute
方法,例如:
camel.suspendRoute(broadcastEndpointId)
但这会导致以下异常:
An unknown exception occurred:
org.apache.camel.component.direct.DirectConsumerNotAvailableException: No consumers available on endpoint: Endpoint[direct://broadcast]. Exchange[JmsMessage[JmsMessageID: ID:NB137-38323-1460630272896-5:1:1:1:1356]]
at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:47)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:120)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:416)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:105)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:87)
at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:103)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:685)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:623)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:591)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:308)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:246)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1142)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1134)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1031)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
所以这确实有效,但日志中充满了这些异常。有没有更好的方法来实现我想要的?
一种方法是使用过滤器。然后 'switch' 使用属性打开和关闭端点:
<filter>
<simple>${properties:endpoint1.isEnabled}</simple>
<to uri="direct:endpoint1"/>
</filter>
如下所示为您的路线提供路线 ID -
from("activemq:queue:test")
.routeId("myrouteid")
.to("file:///hello");
然后您可以使用 camel 上下文使用挂起和恢复路由 -
camelContext.suspendRoute("myrouteid") or camelContext.resumeRoute("myrouteid")
有关详细信息,请访问 http://camel.apache.org/lifecycle.html
我正在使用 Camel 2.14.1 将消息路由到一堆端点,例如:
from(...)
...
.to(monitoringEndpointId, broadcastEndpointId);
我想在运行时动态地enable/disable这些endpoints/routes。到目前为止,我已经尝试使用 CamelContext
的 stopRoute
和 suspendRoute
方法,例如:
camel.suspendRoute(broadcastEndpointId)
但这会导致以下异常:
An unknown exception occurred:
org.apache.camel.component.direct.DirectConsumerNotAvailableException: No consumers available on endpoint: Endpoint[direct://broadcast]. Exchange[JmsMessage[JmsMessageID: ID:NB137-38323-1460630272896-5:1:1:1:1356]]
at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:47)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:120)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:416)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:118)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:105)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:87)
at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:103)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:685)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:623)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:591)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:308)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:246)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1142)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1134)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1031)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
所以这确实有效,但日志中充满了这些异常。有没有更好的方法来实现我想要的?
一种方法是使用过滤器。然后 'switch' 使用属性打开和关闭端点:
<filter>
<simple>${properties:endpoint1.isEnabled}</simple>
<to uri="direct:endpoint1"/>
</filter>
如下所示为您的路线提供路线 ID -
from("activemq:queue:test")
.routeId("myrouteid")
.to("file:///hello");
然后您可以使用 camel 上下文使用挂起和恢复路由 -
camelContext.suspendRoute("myrouteid") or camelContext.resumeRoute("myrouteid")
有关详细信息,请访问 http://camel.apache.org/lifecycle.html