使用事务处理会话时,使用 Artemis 和 Spring JMS 的消息分组不起作用

Message grouping with Artemis and Spring JMS not working when using transacted sessions

消息分组似乎不起作用

  1. 在将字符串 属性 JMSXGroupID 设置为 'product=paper'
  2. 之后,我的 Producer 应用程序通过 JMS MessageProducer 将消息发送到 queue
  3. 我的生产者应用程序也使用 'product=paper'.
  4. 以相同的方式发送另一条消息
  5. 当我在 Artemis UI 中浏览那条消息的 headers 时,我可以在 queue 中看到这两条消息。 _AMQ_GROUP_ID 在两者中的值为 'product=paper'JMSXGroupID 缺席。
  6. 当我调试使用 Spring JMS 并发为 15-15(15 分钟 15 最大值)的侦听器应用程序时,我可以看到两条消息都记录在不同的侦听器容器下。当我查看每个 headers 的映射时, _AMQ_GROUP_ID 不存在并且 JMSXGroupID 的值为 null 而不是 'product=paper'.

为什么不能使用组 ID 对消息进行分组?这与 Artemis 没有将 _AMQ_GROUP_ID 翻译回 JMSXGroupID 这一事实有关吗?还是 Spring JMS 没有将其多个消费者线程注册为不同的消费者,以便代理查看多个消费者?

编辑: 通过注释掉与使用容器工厂 bean 方法中的事务处理会话有关的行,我能够使消息分组在我的应用程序中工作。这似乎与使用事务处理会话有关。

编辑2:

这是一个针对本地独立 Artemis 代理(版本 2.10.1)并使用 Spring Boot 2.2.0:

的独立应用程序 运行

GroupidApplication(spring 引导应用程序和 bean):

package com.reproduce.groupid;

import java.util.HashMap;
import java.util.Map;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

@SpringBootApplication
@EnableJms
public class GroupidApplication implements CommandLineRunner {

    private static Logger LOG = LoggerFactory
            .getLogger(GroupidApplication.class);

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired MessageConverter messageConverter;

    public static void main(String[] args) {
        LOG.info("STARTING THE APPLICATION");
        SpringApplication.run(GroupidApplication.class, args);

        LOG.info("APPLICATION FINISHED");
    }

    @Override
    public void run(String... args) throws JMSException {
        LOG.info("EXECUTING : command line runner");

        jmsTemplate.setPubSubDomain(true);

        createAndSendTextMessage("Message1");
        createAndSendTextMessage("Message2");
        createAndSendTextMessage("Message3");
        createAndSendTextMessage("Message4");
        createAndSendTextMessage("Message5");
        createAndSendTextMessage("Message6");
    }

    private void createAndSendTextMessage(String messageBody) {
        jmsTemplate.send("local-queue", session -> {
            Message message = session.createTextMessage(messageBody);

            message.setStringProperty("JMSXGroupID", "product=paper");

            return message;
        });
    }


    // BEANS
    @Bean
    public JmsListenerContainerFactory<?> containerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer, JmsTransactionManager jmsTransactionManager) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

        configurer.configure(factory, connectionFactory);
        factory.setSubscriptionDurable(true);
        factory.setSubscriptionShared(true);
        factory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
        factory.setSessionTransacted(Boolean.TRUE);
        factory.setTransactionManager(jmsTransactionManager);

        return factory;
    }

    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

    @Bean
    @Primary
    public JmsTransactionManager jmsTransactionManager(ConnectionFactory connectionFactory) {
        JmsTransactionManager jmsTransactionManager = new JmsTransactionManager(connectionFactory);

        // Lazily retrieve existing JMS Connection from given ConnectionFactory
        jmsTransactionManager.setLazyResourceRetrieval(true);

        return jmsTransactionManager;
    }

    @Bean
    @Primary
    public ConnectionFactory connectionFactory() throws JMSException {
        // Create ConnectionFactory which enables failover between primary and backup brokers
        ActiveMQConnectionFactory activeMqConnectionFactory = ActiveMQJMSClient.createConnectionFactoryWithHA(
                JMSFactoryType.CF, transportConfigurations());

        activeMqConnectionFactory.setBrokerURL("tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=1");
        activeMqConnectionFactory.setUser("admin");
        activeMqConnectionFactory.setPassword("admin");
        activeMqConnectionFactory.setInitialConnectAttempts(1);
        activeMqConnectionFactory.setReconnectAttempts(5);
        activeMqConnectionFactory.setConsumerWindowSize(0);
        activeMqConnectionFactory.setBlockOnAcknowledge(true);
        activeMqConnectionFactory.setCacheDestinations(true);
        activeMqConnectionFactory.setRetryInterval(1000);

        return activeMqConnectionFactory;
    }

    private static TransportConfiguration[] transportConfigurations() {
        String connectorFactoryFqcn = NettyConnectorFactory.class.getName();
        Map<String, Object> primaryTransportParameters = new HashMap<>(2);

        primaryTransportParameters.put("host", "localhost");
        primaryTransportParameters.put("port", "61616");

        TransportConfiguration primaryTransportConfiguration = new TransportConfiguration(connectorFactoryFqcn,
                primaryTransportParameters);

        return new TransportConfiguration[] { primaryTransportConfiguration,
                new TransportConfiguration(connectorFactoryFqcn) };
    }
}

自定义SpringJmsListener:

package com.reproduce.groupid;

import javax.jms.JMSException;
import javax.jms.TextMessage;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class CustomSpringJmsListener {

    protected final Logger LOG = LoggerFactory.getLogger(getClass());

    @JmsListener(destination = "local-queue", subscription = "groupid-example", containerFactory = "containerFactory", concurrency = "15-15")
    public void receive(TextMessage message) throws JMSException {
        LOG.info("Received message: " + message);
    }
}

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.reproduce</groupId>
    <artifactId>groupid</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>groupid</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-artemis</artifactId>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.datatype</groupId>
            <artifactId>jackson-datatype-jsr310</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

您可以看到,即使所有这些消息都具有相同的组 ID,它们也会由不同的侦听器容器线程记录。如果您从 bean 定义中注释掉事务管理器,它会再次开始工作。

一切都与消费者缓存有关。默认情况下,当使用外部 TXM 时,缓存被禁用,因此每条消息都会在新的消费者上接收。

对于这个应用程序,您真的不需要事务管理器,sessionTransacted 就足够了 - 容器将使用本地事务。

如果出于某种原因必须使用外部事务管理器,请考虑更改缓存级别。

factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);

查看 DMLC javadoc...

/**
 * Specify the level of caching that this listener container is allowed to apply.
 * <p>Default is {@link #CACHE_NONE} if an external transaction manager has been specified
 * (to reobtain all resources freshly within the scope of the external transaction),
 * and {@link #CACHE_CONSUMER} otherwise (operating with local JMS resources).
 * <p>Some Java EE servers only register their JMS resources with an ongoing XA
 * transaction in case of a freshly obtained JMS {@code Connection} and {@code Session},
 * which is why this listener container by default does not cache any of those.
 * However, depending on the rules of your server with respect to the caching
 * of transactional resources, consider switching this setting to at least
 * {@link #CACHE_CONNECTION} or {@link #CACHE_SESSION} even in conjunction with an
 * external transaction manager.
 * @see #CACHE_NONE
 * @see #CACHE_CONNECTION
 * @see #CACHE_SESSION
 * @see #CACHE_CONSUMER
 * @see #setCacheLevelName
 * @see #setTransactionManager
 */
public void setCacheLevel(int cacheLevel) {