为什么我的 ConsumerTemplate 没有从 ActiveMQ Topic 读取任何消息?

Why does my ConsumerTemplate not read any messages from ActiveMQ Topic?

我正在尝试使用持久订阅者端点不时地从 ActiveMQ 主题以轮询消费者的身份使用消息。

在我的 bean 中,我有一个 ConsumerTemplate,我试图从中接收交换,然后发送到另一个 URI。

bean方法是:

public void pollConsumer() throws Exception {
    long count = 0;
    try {
    if ( consumerEndpoint == null ) consumerEndpoint = consumer.getCamelContext().getEndpoint( endpointUri );
    logger.debug( "Consuming: " + consumerEndpoint.getEndpointUri() );
    consumer.start();
    producer.start();
    while ( true ) {
        logger.trace("Awaiting message: " + ++count );
        Exchange exchange = consumer.receive( consumerEndpoint, 1000 );
        if ( exchange == null ) break;
        logger.trace("Processing message: " + count );
        producer.send( exchange );
        consumer.doneUoW( exchange );
        logger.trace("Processed message: " + count );
    }
    producer.stop();
    consumer.stop();
    } catch ( Throwable t ) {
        logger.error("Something went wrong!", t );
        throw t;
    }
}

调用时,记录器以

形式显示 "Consuming" 消息
activemq://topic:fromQueue.Name?clientId=MyClient&durableSubscriptionName=MyClient&selector=RecordType+IN+%28+%271%27%2C+%272%27+%29+AND+SubType+%3D+%272%27

据我所知这是正确的(选择器应该读取 RecordType IN ('1', '2') AND SubType = '2' 而没有 URL 编码。

我得到一个 "Awaiting message" 日志,没有其他任何东西,所以似乎没有检索到任何东西。

奇怪的是,它也没有在 ActiveMQ 上注册为持久订阅者,所以看起来它根本没有做任何事情,但它也没有注册任何错误,所以我很困惑。

任何人都可以建议为什么这可能不起作用,或者至少我应该从哪里开始寻找?

如果 queue/topic 上的消息必须等待超过一秒,您的 pollConsumer 将停止。

它等待一条消息 1 秒,之后它 returns null 并会跳出 while 循环并停止消费者。

    Exchange exchange = consumer.receive( consumerEndpoint, 1000 );
==> if ( exchange == null ) break;
    logger.trace("Processing message: " + count );
    producer.send( exchange );
    consumer.doneUoW( exchange );

只使用 apache-camel 路由来执行您描述的操作会更容易。

注意到@pcoates 的回答,并尝试延长超时以进行测试,很明显问题是 URI 上的持久订阅选项没有被执行,并且因为没有新消息主题在等待1秒期间,没有任何反应。

another question 与持久订阅相关的答案说明您不能使用轮询消费者的持久订阅。

因此,我的解决方法是订阅主题并将消息路由到新队列,并让轮询使用者在此新队列上。这不是很好,因为我不想有额外的队列,但它可以工作并且比编写新版本的 JMSPollingConsumer 更省力。