ActiveMq、springBoot - 立即处理队列中发送的每条消息

ActiveMq, springBoot - immediately process every message sent on queue

我正在将 ActiveMq 与 SpringBoot 结合使用,以将每个记录从大型 csv 文件发送到另一个服务。我正在将记录加载到地图,然后在每个循环中将记录发送到 ActiveMq 队列。

我的问题是,在我的映射中的所有记录都发送到 ActiveMq 之前,ActiveMq 不会让任何消费者从队列中获取记录。

我可以将 ActiveMq 配置为允许在放入队列后立即使用消息(而不是等待某种提交事务)吗?

这是我的 ActiveMq 配置:

@EnableJms
@Configuration
public class JmsConfig implements JmsListenerConfigurer {

@Autowired
private JmsErrorHandler jmsErrorHandler;

@Autowired
private MessageConverter messageConverter;

@Autowired
private DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory;

@Autowired
private DefaultMessageHandlerMethodFactory handlerMethodFactory;

@Autowired
private JsonMessageConverter jsonMessageConverter;

@Value("${spring.activemq.broker-url}")
private String brokerUrl;

@Value("${spring.activemq.user}")
private String username;

@Value("${spring.activemq.password}")
private String password;

@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(username, password, brokerUrl);
    activeMQConnectionFactory.setUseAsyncSend(true);
    return activeMQConnectionFactory;
}

@Bean
public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory() {
    return new DefaultJmsListenerContainerFactory();
}

@Bean
public JmsListenerContainerFactory jmsListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory,
                                                                  DefaultJmsListenerContainerFactoryConfigurer configurer) {
    defaultJmsListenerContainerFactory.setErrorHandler(jmsErrorHandler);
    configurer.configure(defaultJmsListenerContainerFactory, activeMQConnectionFactory);
    return defaultJmsListenerContainerFactory;
}

@Bean
public DefaultMessageHandlerMethodFactory handlerMethodFactory() {
    DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
    factory.setMessageConverter(messageConverter);
    return factory;
}

@Bean
public MessageConverter messageConverter() {
    MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
    converter.setObjectMapper(createJacksonObjectMapper());
    return converter;
}

@NotNull
private ObjectMapper createJacksonObjectMapper() {
    return Jackson2ObjectMapperBuilder
            .json()
            .modules(new JavaTimeModule())
            .build();
}

@Override
public void configureJmsListeners(@NotNull JmsListenerEndpointRegistrar registrar) {
    registrar.setMessageHandlerMethodFactory(handlerMethodFactory);
}

@Bean
public JmsTemplate createJmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory) {
    JmsTemplate jmsTemplate = new JmsTemplate();
    jmsTemplate.setMessageConverter(jsonMessageConverter);
    jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
    jmsTemplate.setDeliveryPersistent(false);
    return jmsTemplate;
}

}

我正在使用以下代码发送消息:

   public void sendRecordToLogbook(Record record) {
    jmsTemplate.convertAndSend(logbookDestination, record);
}

与您使用活动 MQ 所做的相同的事情,我正在使用 AWS SQS 做同样的事情。 在SQS中,队列消息可以在写入的同时被读取,一旦读取就会自动从队列中删除。 所以我建议使用 AWS SQS 来实现你的功能。

我通过为每条消息实现自己的连接、会话和生成器而不是使用 JMSTemplate 来做到这一点。