spring jmsListener 监听多个队列
spring jmsListener to listen to multiple Queues
在此 post 中,Garry Russell 解释了如何以编程方式创建多个 KafkaListener 来监听多个主题。[这个设置实际上对我来说很成功]
Kafka Spring: How to create Listeners dynamically or in a loop?
现在我也想为 JMSListeners 使用类似的设置 - 我可以在其中设置一个 class 和一个 @JMSListener 并且我可以编程方式创建该 JMSListener 的多个实例,每个实例都注入了自己的实例队列名称。
我找到了这个 post
最后 post Gary 发表了类似的评论,
If you wish to dynamically create lots of containers, then just create the containers programmatically, call afterPropertiesSet(), then start()
我使用了我从上面第一个 post 开始工作的设置(与 KafkaListeners 相关),我的多个 JMS 侦听器实例正在启动但没有使用任何消息。
基本上我不明白我在哪里做这个
then just create the containers programmatically, call afterPropertiesSet(), then start()
我对容器这个词感到困惑,我知道有 JMSListener 并且有
JmsListenerContainerFactory,在此上下文中什么是容器 - 我猜是 JMSListener?
我已确认队列中有消息。此外,当我不以编程方式创建侦听器并且只有一个侦听器上面提到了硬编码队列时,它会很好地消耗消息。
当我以编程方式创建多个 JMS 侦听器时,基本上 none 的侦听器正在使用消息
@SpringBootApplication
@EnableJms
public class MqProdConsumerApplication {
private static Logger logger = LogManager.getLogger(MqProdConsumerApplication.class.getName());
private static Consumers consumersStatic;
@Autowired
Consumers consumers;
@PostConstruct
public void init() {
consumersStatic = this.consumers;
}
@Bean
public Gson gson() {
return new Gson();
}
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(MqProdConsumerApplication.class, args);
List<QueueInformation> queueInformationList = consumersStatic.getQueueInformationList();
Assert.notEmpty(queueInformationList, "queueInformationList cannot be empty");
logger.debug("queueInformationList ************" + queueInformationList.toString());
for (QueueInformation queueInformation : queueInformationList) {
AnnotationConfigApplicationContext child = new AnnotationConfigApplicationContext();
child.setParent(context);
child.register(MQConfig.class);
Properties props = new Properties();
props.setProperty("mqQueueName", queueInformation.getMqQueueName());
//
PropertiesPropertySource pps = new PropertiesPropertySource("listenerProps", props);
child.getEnvironment().getPropertySources().addLast(pps);
child.refresh();
}
}
}
这是具有 listenerContainerFactory
的 MQConfig
@Configuration
public class MQConfig {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${ibm.mq.user}")
private String mqUserName;
@Bean
public MQListener listener() {
return new MQListener();
}
@PostConstruct
public void afterConstruct() {
logger.debug("************* initialized MQ Config successfully for user =" + mqUserName);
}
@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
// Put the MQ username in the PCF environment.
// Otherwise, the connection is identified by PCF's default user, "VCAP"
System.setProperty("user.name", mqUserName);
return factory;
}
}
然后是具有实际@JMSListener 的 MQListener
public class MQListener {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${mqQueueName}")
private String mqQueueName;
@PostConstruct
public void afteConstruct() {
logger.debug("************* initialized MQ Listener successfully, will read from =" + mqQueueName);
}
@JmsListener(destination = "${mqQueueName}", containerFactory = "myFactory")
public void receiveMessage(String receivedMessage) throws JAXBException, ExecutionException, InterruptedException {
logger.debug("***********************************************receivedMessage:" + receivedMessage);
}
}
这是我的 application.yml
ibm.mq.queueManager: ABCTOD01
ibm.mq.channel: QMD00.SERVER
ibm.mq.connName: mqdv1.devfg.ABC.com
ibm.mq.user: pmd0app1
ibm.mq.password:
consumers:
queueInformationList:
-
mqQueueName: QMD00.D.SRF.PERSON.LITE.PHONE.LOAD
-
mqQueueName: QMD00.D.SRF.PERSON.PHONE.LOAD
好的,我找到了另一个 post,Gary 已经回答了我正在寻找的问题
基本上这是适合我的解决方案。
干得好@GaryRussell - 我现在是粉丝了:)
@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
List<QueueInformation> queueInformationList = consumersStatic.getQueueInformationList();
int i = 0;
for (QueueInformation queueInformation :
queueInformationList) {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("myJmsEndpoint-" + i++);
endpoint.setDestination(queueInformation.getMqQueueName());
endpoint.setMessageListener(message -> {
logger.debug("***********************************************receivedMessage:" + message);
});
registrar.registerEndpoint(endpoint);
logger.debug("registered the endpoint for queue" + queueInformation.getMqQueueName());
}
}
在此 post 中,Garry Russell 解释了如何以编程方式创建多个 KafkaListener 来监听多个主题。[这个设置实际上对我来说很成功] Kafka Spring: How to create Listeners dynamically or in a loop?
现在我也想为 JMSListeners 使用类似的设置 - 我可以在其中设置一个 class 和一个 @JMSListener 并且我可以编程方式创建该 JMSListener 的多个实例,每个实例都注入了自己的实例队列名称。
我找到了这个 post
最后 post Gary 发表了类似的评论,
If you wish to dynamically create lots of containers, then just create the containers programmatically, call afterPropertiesSet(), then start()
我使用了我从上面第一个 post 开始工作的设置(与 KafkaListeners 相关),我的多个 JMS 侦听器实例正在启动但没有使用任何消息。
基本上我不明白我在哪里做这个
then just create the containers programmatically, call afterPropertiesSet(), then start()
我对容器这个词感到困惑,我知道有 JMSListener 并且有 JmsListenerContainerFactory,在此上下文中什么是容器 - 我猜是 JMSListener?
我已确认队列中有消息。此外,当我不以编程方式创建侦听器并且只有一个侦听器上面提到了硬编码队列时,它会很好地消耗消息。
当我以编程方式创建多个 JMS 侦听器时,基本上 none 的侦听器正在使用消息
@SpringBootApplication
@EnableJms
public class MqProdConsumerApplication {
private static Logger logger = LogManager.getLogger(MqProdConsumerApplication.class.getName());
private static Consumers consumersStatic;
@Autowired
Consumers consumers;
@PostConstruct
public void init() {
consumersStatic = this.consumers;
}
@Bean
public Gson gson() {
return new Gson();
}
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(MqProdConsumerApplication.class, args);
List<QueueInformation> queueInformationList = consumersStatic.getQueueInformationList();
Assert.notEmpty(queueInformationList, "queueInformationList cannot be empty");
logger.debug("queueInformationList ************" + queueInformationList.toString());
for (QueueInformation queueInformation : queueInformationList) {
AnnotationConfigApplicationContext child = new AnnotationConfigApplicationContext();
child.setParent(context);
child.register(MQConfig.class);
Properties props = new Properties();
props.setProperty("mqQueueName", queueInformation.getMqQueueName());
//
PropertiesPropertySource pps = new PropertiesPropertySource("listenerProps", props);
child.getEnvironment().getPropertySources().addLast(pps);
child.refresh();
}
}
}
这是具有 listenerContainerFactory
的 MQConfig@Configuration
public class MQConfig {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${ibm.mq.user}")
private String mqUserName;
@Bean
public MQListener listener() {
return new MQListener();
}
@PostConstruct
public void afterConstruct() {
logger.debug("************* initialized MQ Config successfully for user =" + mqUserName);
}
@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
// Put the MQ username in the PCF environment.
// Otherwise, the connection is identified by PCF's default user, "VCAP"
System.setProperty("user.name", mqUserName);
return factory;
}
}
然后是具有实际@JMSListener 的 MQListener
public class MQListener {
Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${mqQueueName}")
private String mqQueueName;
@PostConstruct
public void afteConstruct() {
logger.debug("************* initialized MQ Listener successfully, will read from =" + mqQueueName);
}
@JmsListener(destination = "${mqQueueName}", containerFactory = "myFactory")
public void receiveMessage(String receivedMessage) throws JAXBException, ExecutionException, InterruptedException {
logger.debug("***********************************************receivedMessage:" + receivedMessage);
}
}
这是我的 application.yml
ibm.mq.queueManager: ABCTOD01
ibm.mq.channel: QMD00.SERVER
ibm.mq.connName: mqdv1.devfg.ABC.com
ibm.mq.user: pmd0app1
ibm.mq.password:
consumers:
queueInformationList:
-
mqQueueName: QMD00.D.SRF.PERSON.LITE.PHONE.LOAD
-
mqQueueName: QMD00.D.SRF.PERSON.PHONE.LOAD
好的,我找到了另一个 post,Gary 已经回答了我正在寻找的问题
基本上这是适合我的解决方案。 干得好@GaryRussell - 我现在是粉丝了:)
@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
List<QueueInformation> queueInformationList = consumersStatic.getQueueInformationList();
int i = 0;
for (QueueInformation queueInformation :
queueInformationList) {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("myJmsEndpoint-" + i++);
endpoint.setDestination(queueInformation.getMqQueueName());
endpoint.setMessageListener(message -> {
logger.debug("***********************************************receivedMessage:" + message);
});
registrar.registerEndpoint(endpoint);
logger.debug("registered the endpoint for queue" + queueInformation.getMqQueueName());
}
}