ActiveMQ 忽略优先级设置

ActiveMQ ignores priority settings

使用此代码:

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

public class CompositeQueuePriority {
  public static void main(String[] args) throws Exception {
    String brokerUrl = "tcp://localhost:61616";

    BrokerService broker = new BrokerService();
    broker.addConnector(brokerUrl);
    broker.setPersistent(false);
    broker.setDestinationPolicy(policyMap());
    broker.start();

    Destination a = ActiveMQDestination.createDestination("queue", ActiveMQDestination.QUEUE_TYPE);

    Session session = createSession();

    MessageProducer lowProducer = session.createProducer(a);
    lowProducer.setPriority(1);

    MessageProducer highProducer = session.createProducer(a);
    highProducer.setPriority(9);

    MessageConsumer consumer = session.createConsumer(a);

    for (int i = 0; i < 10; i++) {
      lowProducer.send(session.createTextMessage("Low"));
      highProducer.send(session.createTextMessage("High"));

      String first = ((TextMessage) consumer.receive()).getText();
      String second = ((TextMessage) consumer.receive()).getText();

      System.out.println(first + ", " + second);
    }

    broker.stop();
  }

  private static Session createSession() throws JMSException {
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
    Connection connection = connectionFactory.createConnection();
    connection.start();
    return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  }

  private static PolicyMap policyMap() {
    PolicyMap policyMap = new PolicyMap();
    policyMap.setDefaultEntry(prioPolicyEntry());
    return policyMap;
  }

  private static PolicyEntry prioPolicyEntry() {
    PolicyEntry policyEntry = new PolicyEntry();
    policyEntry.setPrioritizedMessages(true);
    return policyEntry;
  }
}

输出为:

Low, High
Low, High
Low, High
Low, High
Low, High
Low, High
Low, High
Low, High
Low, High
Low, High

根据 tdocumentation,从 5.4 开始支持优先级,我使用的是 5.15。我做错了什么吗?

我认为您的问题是当您发送消息时已经创建了消费者,这意味着消息将在代理收到消息后立即发送给消费者,因此消息将没有机会优先使用。

先发送所有消息,然后再创建您的消费者。