使用 Wildfly 进行批处理/批量消息 (JMS) 处理
Batch / Bulk Message (JMS) Processing with Wildfly
我有一个 JMS 队列,其中填充了一些时间序列数据。为了防止成千上万的单个事务 SQL 插入,我想以一种笨重的方式处理它们,而不是 MessageListener onMessage "per Message" 方式。
我想到的唯一解决方案是从队列中获取大量消息并定期保存它们的计划。
@Stateless
public class SensorDataReceiver {
private static final int THRESHOLD_IN_SECONDS = 10;
private static final int QUEUE_TIMEOUT_IN_MILLIS = 1000;
@Resource(mappedName = "java:jboss/jms/queue/sensorData")
private Queue queue;
@Inject
private JMSContext context;
@Inject
private SensorDataDAO sensorDataDAO;
@SneakyThrows
@Schedule(hour = "*", minute = "*", second = "*/15", persistent = false)
public void scheduled() {
LocalDateTime statUpPlusThreshold = now().plusSeconds(THRESHOLD_IN_SECONDS);
JMSConsumer consumer = context.createConsumer(queue);
List<SensorData> sensorDataToInsert = new ArrayList<>();
do {
ObjectMessage message = (ObjectMessage) consumer.receive(QUEUE_TIMEOUT_IN_MILLIS);
if (message == null) {
break;
}
sensorDataToInsert.add((sensorData) message.getObject());
} while (now().isBefore(statUpPlusThreshold) && sensorDataToInsert.size() < 10_000);
logger.info(format("Got \"%d\" SensorData to persist.", sensorDataToInsert.size()));
sensorDataDAO.batchSaveOrUpdate(sensorDataToInsert);
logger.info(format("Persisted \"%d\" SensorData.", sensorDataToInsert.size()));
}
}
但我不认为这是最明智的做法,因此当计划执行速度快于配置的间隔时,我浪费时间每分钟处理更多消息(我可以在大约 2-3 秒内插入 10k 行。在我的测试系统上),另一方面,这段代码很容易产生 "overlapping scheduled execution"。
我建议有一个无状态 Bean 池,它们始终处于活动状态(即它们未被安排)消耗一定数量的消息(即直到队列为空,这将是任意数量的消息),然后在单个数据库操作中从这些消息中插入数据。
池中的所有 bean 都可以同时处于活动状态,并且可以尽快使用和插入它们的批次。这将确保及时使用消息,从而有望避免消息在队列中堆积。
您可以在 receive
上设置超时,这样即使您在达到批量大小之前到达队列末尾,数据仍会及时插入。
为了在应用程序服务器启动时启动它,您可以用 @Startup
和 @Singleton
注释一个 bean,然后用 @PostConstruct
注释一个方法,它循环足够的次数来填充您的 "pool" 并调用 @Stateless
bean 上的方法,该方法将接收成批的消息并进行处理。
我有一个 JMS 队列,其中填充了一些时间序列数据。为了防止成千上万的单个事务 SQL 插入,我想以一种笨重的方式处理它们,而不是 MessageListener onMessage "per Message" 方式。
我想到的唯一解决方案是从队列中获取大量消息并定期保存它们的计划。
@Stateless
public class SensorDataReceiver {
private static final int THRESHOLD_IN_SECONDS = 10;
private static final int QUEUE_TIMEOUT_IN_MILLIS = 1000;
@Resource(mappedName = "java:jboss/jms/queue/sensorData")
private Queue queue;
@Inject
private JMSContext context;
@Inject
private SensorDataDAO sensorDataDAO;
@SneakyThrows
@Schedule(hour = "*", minute = "*", second = "*/15", persistent = false)
public void scheduled() {
LocalDateTime statUpPlusThreshold = now().plusSeconds(THRESHOLD_IN_SECONDS);
JMSConsumer consumer = context.createConsumer(queue);
List<SensorData> sensorDataToInsert = new ArrayList<>();
do {
ObjectMessage message = (ObjectMessage) consumer.receive(QUEUE_TIMEOUT_IN_MILLIS);
if (message == null) {
break;
}
sensorDataToInsert.add((sensorData) message.getObject());
} while (now().isBefore(statUpPlusThreshold) && sensorDataToInsert.size() < 10_000);
logger.info(format("Got \"%d\" SensorData to persist.", sensorDataToInsert.size()));
sensorDataDAO.batchSaveOrUpdate(sensorDataToInsert);
logger.info(format("Persisted \"%d\" SensorData.", sensorDataToInsert.size()));
}
}
但我不认为这是最明智的做法,因此当计划执行速度快于配置的间隔时,我浪费时间每分钟处理更多消息(我可以在大约 2-3 秒内插入 10k 行。在我的测试系统上),另一方面,这段代码很容易产生 "overlapping scheduled execution"。
我建议有一个无状态 Bean 池,它们始终处于活动状态(即它们未被安排)消耗一定数量的消息(即直到队列为空,这将是任意数量的消息),然后在单个数据库操作中从这些消息中插入数据。
池中的所有 bean 都可以同时处于活动状态,并且可以尽快使用和插入它们的批次。这将确保及时使用消息,从而有望避免消息在队列中堆积。
您可以在 receive
上设置超时,这样即使您在达到批量大小之前到达队列末尾,数据仍会及时插入。
为了在应用程序服务器启动时启动它,您可以用 @Startup
和 @Singleton
注释一个 bean,然后用 @PostConstruct
注释一个方法,它循环足够的次数来填充您的 "pool" 并调用 @Stateless
bean 上的方法,该方法将接收成批的消息并进行处理。