ActiveMQ 批量消费者

ActiveMQ batch consumer

我需要使用来自 ActiveMQ 主题的消息并将它们保存在 mongo 中。我想知道是否有 way/configuration 用于从主题中批量消费消息,而不是一条一条地读取消息并对每条消息进行数据库调用。

我想象最终的解决方案会做这样的事情:

  1. 以 100 的批量大小使用消息
  2. 使用mongo批量插入将批次保存到数据库中
  3. 向代理发送成功插入消息的 ACK 和失败消息的 NAK。

JMS API 只允许您一次接收一条消息,无论是通过异步 javax.jms.MessageListener or a synchronous call to javax.jms.MessageConsumer#receive() in JMS 1.1 or javax.jms.JMSConsumer.receive() in JMS 2. However, you can batch the receipt of multiple messages up using a transacted session. Here's what the javax.jms.Session JavaDoc 关于事务处理的会话:

A session may be specified as transacted. Each transacted session supports a single series of transactions. Each transaction groups a set of message sends and a set of message receives into an atomic unit of work. In effect, transactions organize a session's input message stream and output message stream into series of atomic units. When a transaction commits, its atomic unit of input is acknowledged and its associated atomic unit of output is sent. If a transaction rollback is done, the transaction's sent messages are destroyed and the session's input is automatically recovered.

因此,您可以使用事务处理会话单独接收 100 条消息,将该数据插入 Mongo,提交事务处理会话,或者如果出现故障,您可以回滚事务处理会话(这实际上是一种否定确认).例如:

final int TX_SIZE = 100;
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic("myTopic");
MessageConsumer consumer = session.createConsumer(topic);
connection.start();
while (true) {
   List messages = new ArrayList<Message>();
   for (int i = 0; i < TX_SIZE; i++) {
      Message message = consumer.receive(1000);
      if (message != null) {
         messages.add(message);
      } else {
         break; // no more messages available for this batch
      }
   }

   if (messages.size() > 0) {
      try {
         // bulk insert data from messages List into Mongo
         session.commit();
      } catch (Exception e) {
         e.printStackTrace();
         session.rollback();
      }
   } else {
      break; // no more messages in the subscription
   }
}

值得注意的是,如果您只使用 JMS 事务处理会话而不是完整的 XA 事务,那么在 Mongo 中至少会有一些重复的风险(例如,如果您的应用程序在成功将数据插入 Mongo 但在提交事务处理会话之前)。 XA 事务会以相当多的额外复杂性为代价为您减轻这种风险,具体取决于您的环境。

最后,如果您 运行 遇到 ActiveMQ“Classic”的性能限制,请考虑使用 ActiveMQ Artemis,ActiveMQ 的下一代消息代理。

@Nabeel Ahmad 您可能有兴趣查看 ActiveMQ 中的 Virtual Topics。它们提供了在生产者端使用主题,然后使用队列进行消费的能力。当想要扩展消费时,它们非常有用,因为使用队列比消费者端的主题具有更多的特性和可观察性。

将此配置添加到 activemq.xml

<destinationInterceptors> 
  <virtualDestinationInterceptor> 
    <virtualDestinations> 
      <virtualTopic name="VT.>" prefix="VQ.*." selectorAware="false"/>   
    </virtualDestinations>
  </virtualDestinationInterceptor> 
</destinationInterceptors>

然后让生产者发送到:topic://VT.DATA

然后让消费者接收来自:queue://VQ.CLIENT1.VT.DATA

正如@Justin Bertram 提到的,可以使用事务处理会话并每 100 条左右的消息提交一次来完成批处理读取。

Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer messageConsumer = session.createConsumer(session.createQueue("VQ.CLIENT1.VT.DATA");

Message message = null;
long count = 0l;
do {
  message = messageConsumer.receive(2000l);
  
  if(message != null) {
     // check the message and publisher.send() to .DLQ if it is bad

     // if message is good, send to Mongo

     if(count % 100 == 0) {
        // commit every 100 messages on the JMS-side
        session.commit();
     }
  }
} while(message != null);