Spring JMS ActiveMQ 消息传递咨询主题不工作

Spring JMS ActiveMQ Message Delivered Advisory Topic not Working

出于 POC 目的,我构建了一个 Spring 引导应用程序,它使用 ActiveMQ 通过 JMSTemplate 进行消息传递。

为了监控,我想使用 "Advisory Topics".

监听放入队列和从队列中移除的消息

我已更新ActiveMQ配置以启用相关建议:

<!-- activemq.xml -->
 <broker xmlns="http://activemq.apache.org/schema/core" useJmx="true" brokerName="localhost" dataDirectory="${activemq.data}">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" advisoryForConsumed="true" advisoryForDelivery="true">
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
                <policyEntry queue=">" advisoryForConsumed="true" advisoryForDelivery="true">
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

</broker>

在应用程序中,我配置了 JMS 连接工厂和 JMS 侦听器容器工厂以启用咨询和 pubsub 域,并为咨询主题设置侦听器:

@Configuration
public class JmsConfig {
    @Autowired
    MessageListener messageListener;

    @Bean
    public ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setWatchTopicAdvisories(true);
        connectionFactory.setBrokerURL("vm://localhost?broker.persistent=false");
        return connectionFactory;
    }

    @Bean
    public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) throws JMSException {

        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        configurer.configure(factory, connectionFactory);

        Connection connection = connectionFactory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Queue bulkQueue = session.createQueue("bulk");

        Topic deliveredAdvisoryTopic = AdvisorySupport.getMessageDeliveredAdvisoryTopic(bulkQueue);
        MessageConsumer deliveredAdvisoryTopicConsumer = session.createConsumer(deliveredAdvisoryTopic);
        deliveredAdvisoryTopicConsumer.setMessageListener(messageListener);

        Topic consumedAdvisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic(bulkQueue);
        MessageConsumer consumedAdvisoryTopicConsumer = session.createConsumer(consumedAdvisoryTopic);
        consumedAdvisoryTopicConsumer.setMessageListener(messageListener);

        return factory;
    }

将阅读咨询主题的侦听器仅用于记录:

@Component
public class AdvisoryMessageListener implements MessageListener {
    @Override public void onMessage(Message message) {
        System.out.println("Received advisory message");
        System.out.println(message);
    }
}

将从队列中读取的实际侦听器类似于咨询消息侦听器:

@Component
public class Receiver {

    @JmsListener(destination = "bulk", containerFactory = "jmsListenerContainerFactory")
    public void receiveMessage(Email email) {
        System.out.println("Received <" + email + ">");
    }

}

其余API将触发应用程序将消息放入队列:


@RestController("/emails")
public class EmailController {

    @Autowired
    private JmsTemplate jmsTemplate;

    @PostMapping("/")
    public void persistEmail(@RequestBody Email email) {
        jmsTemplate.setExplicitQosEnabled(true);
        jmsTemplate.setTimeToLive(0L);
        jmsTemplate.convertAndSend("bulk", email);
    }
}

每当调用 API 并将电子邮件放入队列时,Receiver.receiveMessage 会读取并记录它,但 AdvisoryMessageListener 中没有任何操作。

控制台中唯一显示的内容如下: Received <Email{to=foo@bar.com, body=Hello}> 由 Receiver.receiveMessage

印刷

我做错了什么?

这对我来说很好...

@SpringBootApplication
public class So59196698Application {

    public static void main(String[] args) {
        SpringApplication.run(So59196698Application.class, args);
    }

    @JmsListener(destination = "so59196698")
    public void listen(Message in) {
        System.out.println("Received:" + in);
    }

    @JmsListener(destination = "#{advisoryTopicNames.deliveredTopic}", containerFactory = "topicFactory")
    public void delivered(Message in) {
        System.out.println("Delivered:" + in);
    }

    @JmsListener(destination = "#{advisoryTopicNames.consumedTopic}", containerFactory = "topicFactory")
    public void consumed(Message in) {
        System.out.println("Consumed:" + in);
    }

    @Bean
    public ApplicationRunner runner(JmsTemplate template) {
        return args -> {
            Thread.sleep(5000);
            template.convertAndSend("so59196698", "test");
        };
    }

    @Bean
    public JmsListenerContainerFactory<?> topicFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {

        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setPubSubDomain(true);
        return factory;
    }

}

@Component
class AdvisoryTopicNames {

    private static final Destination QUEUE = new ActiveMQQueue("so59196698");

    public String getDeliveredTopic() throws JMSException {
        return AdvisorySupport.getMessageDeliveredAdvisoryTopic(QUEUE).getTopicName();
    }

    public String getConsumedTopic() throws JMSException {
        return AdvisorySupport.getMessageConsumedAdvisoryTopic(QUEUE).getTopicName();
    }

}

Received:ActiveMQTextMessage {commandId = 11, ...
Delivered:ActiveMQMessage {commandId = 0, ...
Consumed:ActiveMQMessage {commandId = 0, ...