ActiveMQ 批量消费者
ActiveMQ batch consumer
我需要使用来自 ActiveMQ 主题的消息并将它们保存在 mongo 中。我想知道是否有 way/configuration 用于从主题中批量消费消息,而不是一条一条地读取消息并对每条消息进行数据库调用。
我想象最终的解决方案会做这样的事情:
- 以 100 的批量大小使用消息
- 使用mongo批量插入将批次保存到数据库中
- 向代理发送成功插入消息的 ACK 和失败消息的 NAK。
JMS API 只允许您一次接收一条消息,无论是通过异步 javax.jms.MessageListener
or a synchronous call to javax.jms.MessageConsumer#receive()
in JMS 1.1 or javax.jms.JMSConsumer.receive()
in JMS 2. However, you can batch the receipt of multiple messages up using a transacted session. Here's what the javax.jms.Session
JavaDoc 关于事务处理的会话:
A session may be specified as transacted. Each transacted session supports a single series of transactions. Each transaction groups a set of message sends and a set of message receives into an atomic unit of work. In effect, transactions organize a session's input message stream and output message stream into series of atomic units. When a transaction commits, its atomic unit of input is acknowledged and its associated atomic unit of output is sent. If a transaction rollback is done, the transaction's sent messages are destroyed and the session's input is automatically recovered.
因此,您可以使用事务处理会话单独接收 100 条消息,将该数据插入 Mongo,提交事务处理会话,或者如果出现故障,您可以回滚事务处理会话(这实际上是一种否定确认).例如:
final int TX_SIZE = 100;
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic("myTopic");
MessageConsumer consumer = session.createConsumer(topic);
connection.start();
while (true) {
List messages = new ArrayList<Message>();
for (int i = 0; i < TX_SIZE; i++) {
Message message = consumer.receive(1000);
if (message != null) {
messages.add(message);
} else {
break; // no more messages available for this batch
}
}
if (messages.size() > 0) {
try {
// bulk insert data from messages List into Mongo
session.commit();
} catch (Exception e) {
e.printStackTrace();
session.rollback();
}
} else {
break; // no more messages in the subscription
}
}
值得注意的是,如果您只使用 JMS 事务处理会话而不是完整的 XA 事务,那么在 Mongo 中至少会有一些重复的风险(例如,如果您的应用程序在成功将数据插入 Mongo 但在提交事务处理会话之前)。 XA 事务会以相当多的额外复杂性为代价为您减轻这种风险,具体取决于您的环境。
最后,如果您 运行 遇到 ActiveMQ“Classic”的性能限制,请考虑使用 ActiveMQ Artemis,ActiveMQ 的下一代消息代理。
@Nabeel Ahmad 您可能有兴趣查看 ActiveMQ 中的 Virtual Topics。它们提供了在生产者端使用主题,然后使用队列进行消费的能力。当想要扩展消费时,它们非常有用,因为使用队列比消费者端的主题具有更多的特性和可观察性。
将此配置添加到 activemq.xml
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name="VT.>" prefix="VQ.*." selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
然后让生产者发送到:topic://VT.DATA
然后让消费者接收来自:queue://VQ.CLIENT1.VT.DATA
正如@Justin Bertram 提到的,可以使用事务处理会话并每 100 条左右的消息提交一次来完成批处理读取。
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer messageConsumer = session.createConsumer(session.createQueue("VQ.CLIENT1.VT.DATA");
Message message = null;
long count = 0l;
do {
message = messageConsumer.receive(2000l);
if(message != null) {
// check the message and publisher.send() to .DLQ if it is bad
// if message is good, send to Mongo
if(count % 100 == 0) {
// commit every 100 messages on the JMS-side
session.commit();
}
}
} while(message != null);
我需要使用来自 ActiveMQ 主题的消息并将它们保存在 mongo 中。我想知道是否有 way/configuration 用于从主题中批量消费消息,而不是一条一条地读取消息并对每条消息进行数据库调用。
我想象最终的解决方案会做这样的事情:
- 以 100 的批量大小使用消息
- 使用mongo批量插入将批次保存到数据库中
- 向代理发送成功插入消息的 ACK 和失败消息的 NAK。
JMS API 只允许您一次接收一条消息,无论是通过异步 javax.jms.MessageListener
or a synchronous call to javax.jms.MessageConsumer#receive()
in JMS 1.1 or javax.jms.JMSConsumer.receive()
in JMS 2. However, you can batch the receipt of multiple messages up using a transacted session. Here's what the javax.jms.Session
JavaDoc 关于事务处理的会话:
A session may be specified as transacted. Each transacted session supports a single series of transactions. Each transaction groups a set of message sends and a set of message receives into an atomic unit of work. In effect, transactions organize a session's input message stream and output message stream into series of atomic units. When a transaction commits, its atomic unit of input is acknowledged and its associated atomic unit of output is sent. If a transaction rollback is done, the transaction's sent messages are destroyed and the session's input is automatically recovered.
因此,您可以使用事务处理会话单独接收 100 条消息,将该数据插入 Mongo,提交事务处理会话,或者如果出现故障,您可以回滚事务处理会话(这实际上是一种否定确认).例如:
final int TX_SIZE = 100;
ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic("myTopic");
MessageConsumer consumer = session.createConsumer(topic);
connection.start();
while (true) {
List messages = new ArrayList<Message>();
for (int i = 0; i < TX_SIZE; i++) {
Message message = consumer.receive(1000);
if (message != null) {
messages.add(message);
} else {
break; // no more messages available for this batch
}
}
if (messages.size() > 0) {
try {
// bulk insert data from messages List into Mongo
session.commit();
} catch (Exception e) {
e.printStackTrace();
session.rollback();
}
} else {
break; // no more messages in the subscription
}
}
值得注意的是,如果您只使用 JMS 事务处理会话而不是完整的 XA 事务,那么在 Mongo 中至少会有一些重复的风险(例如,如果您的应用程序在成功将数据插入 Mongo 但在提交事务处理会话之前)。 XA 事务会以相当多的额外复杂性为代价为您减轻这种风险,具体取决于您的环境。
最后,如果您 运行 遇到 ActiveMQ“Classic”的性能限制,请考虑使用 ActiveMQ Artemis,ActiveMQ 的下一代消息代理。
@Nabeel Ahmad 您可能有兴趣查看 ActiveMQ 中的 Virtual Topics。它们提供了在生产者端使用主题,然后使用队列进行消费的能力。当想要扩展消费时,它们非常有用,因为使用队列比消费者端的主题具有更多的特性和可观察性。
将此配置添加到 activemq.xml
<destinationInterceptors>
<virtualDestinationInterceptor>
<virtualDestinations>
<virtualTopic name="VT.>" prefix="VQ.*." selectorAware="false"/>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>
然后让生产者发送到:topic://VT.DATA
然后让消费者接收来自:queue://VQ.CLIENT1.VT.DATA
正如@Justin Bertram 提到的,可以使用事务处理会话并每 100 条左右的消息提交一次来完成批处理读取。
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer messageConsumer = session.createConsumer(session.createQueue("VQ.CLIENT1.VT.DATA");
Message message = null;
long count = 0l;
do {
message = messageConsumer.receive(2000l);
if(message != null) {
// check the message and publisher.send() to .DLQ if it is bad
// if message is good, send to Mongo
if(count % 100 == 0) {
// commit every 100 messages on the JMS-side
session.commit();
}
}
} while(message != null);