为什么我的 ConsumerTemplate 没有从 ActiveMQ Topic 读取任何消息?
Why does my ConsumerTemplate not read any messages from ActiveMQ Topic?
我正在尝试使用持久订阅者端点不时地从 ActiveMQ 主题以轮询消费者的身份使用消息。
在我的 bean 中,我有一个 ConsumerTemplate,我试图从中接收交换,然后发送到另一个 URI。
bean方法是:
public void pollConsumer() throws Exception {
long count = 0;
try {
if ( consumerEndpoint == null ) consumerEndpoint = consumer.getCamelContext().getEndpoint( endpointUri );
logger.debug( "Consuming: " + consumerEndpoint.getEndpointUri() );
consumer.start();
producer.start();
while ( true ) {
logger.trace("Awaiting message: " + ++count );
Exchange exchange = consumer.receive( consumerEndpoint, 1000 );
if ( exchange == null ) break;
logger.trace("Processing message: " + count );
producer.send( exchange );
consumer.doneUoW( exchange );
logger.trace("Processed message: " + count );
}
producer.stop();
consumer.stop();
} catch ( Throwable t ) {
logger.error("Something went wrong!", t );
throw t;
}
}
调用时,记录器以
形式显示 "Consuming" 消息
activemq://topic:fromQueue.Name?clientId=MyClient&durableSubscriptionName=MyClient&selector=RecordType+IN+%28+%271%27%2C+%272%27+%29+AND+SubType+%3D+%272%27
据我所知这是正确的(选择器应该读取 RecordType IN ('1', '2') AND SubType = '2'
而没有 URL 编码。
我得到一个 "Awaiting message" 日志,没有其他任何东西,所以似乎没有检索到任何东西。
奇怪的是,它也没有在 ActiveMQ 上注册为持久订阅者,所以看起来它根本没有做任何事情,但它也没有注册任何错误,所以我很困惑。
任何人都可以建议为什么这可能不起作用,或者至少我应该从哪里开始寻找?
如果 queue/topic 上的消息必须等待超过一秒,您的 pollConsumer 将停止。
它等待一条消息 1 秒,之后它 returns null 并会跳出 while 循环并停止消费者。
Exchange exchange = consumer.receive( consumerEndpoint, 1000 );
==> if ( exchange == null ) break;
logger.trace("Processing message: " + count );
producer.send( exchange );
consumer.doneUoW( exchange );
只使用 apache-camel 路由来执行您描述的操作会更容易。
注意到@pcoates 的回答,并尝试延长超时以进行测试,很明显问题是 URI 上的持久订阅选项没有被执行,并且因为没有新消息主题在等待1秒期间,没有任何反应。
another question 与持久订阅相关的答案说明您不能使用轮询消费者的持久订阅。
因此,我的解决方法是订阅主题并将消息路由到新队列,并让轮询使用者在此新队列上。这不是很好,因为我不想有额外的队列,但它可以工作并且比编写新版本的 JMSPollingConsumer 更省力。
我正在尝试使用持久订阅者端点不时地从 ActiveMQ 主题以轮询消费者的身份使用消息。
在我的 bean 中,我有一个 ConsumerTemplate,我试图从中接收交换,然后发送到另一个 URI。
bean方法是:
public void pollConsumer() throws Exception {
long count = 0;
try {
if ( consumerEndpoint == null ) consumerEndpoint = consumer.getCamelContext().getEndpoint( endpointUri );
logger.debug( "Consuming: " + consumerEndpoint.getEndpointUri() );
consumer.start();
producer.start();
while ( true ) {
logger.trace("Awaiting message: " + ++count );
Exchange exchange = consumer.receive( consumerEndpoint, 1000 );
if ( exchange == null ) break;
logger.trace("Processing message: " + count );
producer.send( exchange );
consumer.doneUoW( exchange );
logger.trace("Processed message: " + count );
}
producer.stop();
consumer.stop();
} catch ( Throwable t ) {
logger.error("Something went wrong!", t );
throw t;
}
}
调用时,记录器以
形式显示 "Consuming" 消息activemq://topic:fromQueue.Name?clientId=MyClient&durableSubscriptionName=MyClient&selector=RecordType+IN+%28+%271%27%2C+%272%27+%29+AND+SubType+%3D+%272%27
据我所知这是正确的(选择器应该读取 RecordType IN ('1', '2') AND SubType = '2'
而没有 URL 编码。
我得到一个 "Awaiting message" 日志,没有其他任何东西,所以似乎没有检索到任何东西。
奇怪的是,它也没有在 ActiveMQ 上注册为持久订阅者,所以看起来它根本没有做任何事情,但它也没有注册任何错误,所以我很困惑。
任何人都可以建议为什么这可能不起作用,或者至少我应该从哪里开始寻找?
如果 queue/topic 上的消息必须等待超过一秒,您的 pollConsumer 将停止。
它等待一条消息 1 秒,之后它 returns null 并会跳出 while 循环并停止消费者。
Exchange exchange = consumer.receive( consumerEndpoint, 1000 );
==> if ( exchange == null ) break;
logger.trace("Processing message: " + count );
producer.send( exchange );
consumer.doneUoW( exchange );
只使用 apache-camel 路由来执行您描述的操作会更容易。
注意到@pcoates 的回答,并尝试延长超时以进行测试,很明显问题是 URI 上的持久订阅选项没有被执行,并且因为没有新消息主题在等待1秒期间,没有任何反应。
another question 与持久订阅相关的答案说明您不能使用轮询消费者的持久订阅。
因此,我的解决方法是订阅主题并将消息路由到新队列,并让轮询使用者在此新队列上。这不是很好,因为我不想有额外的队列,但它可以工作并且比编写新版本的 JMSPollingConsumer 更省力。