如何为多个主题配置多个 JmsListener
How to configure multiple JmsListener for multiple topics
在我的项目中,我添加了两个不同的 JmsListener,但是当我 运行 在 ActiveMQ 面板中进行项目时,只有其中一个主题有消费者!
那么我应该为每个 JmsListener 添加单独的 jmsListenerContainerFactory 配置吗??
@JmsListener(destination = "foo1")
public void foo1(final Message jsonMessage) throws JMSException {
...
}
@JmsListener(destination = "foo2")
public void foo12(final Message jsonMessage) throws JMSException {
...
}
编辑: 这是来自 JMS 配置文件:
@Configuration
@EnableJms
public class FooJmsConfig {
@Bean
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(BROKER_URL);
connectionFactory.setPassword(BROKER_USERNAME);
connectionFactory.setUserName(BROKER_PASSWORD);
connectionFactory.setUseCompression(true);
connectionFactory.setClientID("FPP_API");
connectionFactory.setConnectionIDPrefix("DRR");
connectionFactory.setUseAsyncSend(true);
return connectionFactory;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("1-1");
factory.setPubSubDomain(true);
factory.setSubscriptionDurable(true);
return factory;
}
}
我认为问题在于您为两个侦听器使用的 JmsListenerContainerFactory
设置了 1 的并发性。
如果需要,您可以创建多个 JmsListenerContainerFactory
并通过指定 JmsListener#containerFactory 属性.
将它们设置为 JmsListener
我发现通过将 setClientID()
从 ActiveMQConnectionFactory 提供程序方法移动到 DefaultJmsListenerContainerFactory 提供程序方法,
我只能为每个 jmsListener 使用一个全局 ActiveMQConnectionFactory 提供程序方法和多个 DefaultJmsListenerContainerFactory 提供程序方法:
所以最终的工作代码是:
JMSConfig 文件:
@Configuration
@EnableJms
public class FooJmsConfig {
@Bean
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(BROKER_URL);
connectionFactory.setPassword(BROKER_USERNAME);
connectionFactory.setUserName(BROKER_PASSWORD);
connectionFactory.setUseCompression(true);
connectionFactory.setConnectionIDPrefix("DRR");
connectionFactory.setUseAsyncSend(true);
return connectionFactory;
}
@Bean(name= "foo1")
public DefaultJmsListenerContainerFactory foo1() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("1-1");
factory.setPubSubDomain(true);
factory.setSubscriptionDurable(true);
connectionFactory.setClientID("FOO_1");
return factory;
}
@Bean(name= "foo2")
public DefaultJmsListenerContainerFactory foo2() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("1-1");
factory.setPubSubDomain(true);
factory.setSubscriptionDurable(true);
connectionFactory.setClientID("FOO_1");
return factory;
}
}
jms 听众将是
@JmsListener(destination = "foo1", containerFactory="foo1")
public void foo1(final Message jsonMessage) throws JMSException {
...
}
@JmsListener(destination = "foo2", containerFactory="foo2")
public void foo12(final Message jsonMessage) throws JMSException {
...
}
您可以建立多个 JMSFactory 连接。默认的 JMS 侦听器只接受单例连接工厂对象。所以你需要设置两个 jmslistenercontainerfactory 本身的 bean。
连接到单个主题的代码如下:
@Configuration
@EnableJms
@EnableTransactionManagement
public class JMSConnectionConfig{
private static final String AMQP_URI_FORMAT = "amqps://%s?amqp.idleTimeout=%d";
private int idleTimeout;
private String hostURL;
@Bean(name = "cachingConnectionFactory1")
@Primary
public ConnectionFactory myConnectionFactory1() {
// set up connection details to the topic
String remoteUri = String.format(AMQP_URI_FORMAT, hostURL, idleTimeout);
JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory();
jmsConnectionFactory.setRemoteURI(remoteUri);
jmsConnectionFactory.setClientID(clientId);
jmsConnectionFactory.setUsername(topic1SASName);
jmsConnectionFactory.setPassword(topic1SASKey);
// create caching factory object and return as connectionfactory parent object
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setTargetConnectionFactory(jmsConnectionFactory);
cachingConnectionFactory.setReconnectOnException(true);
return cachingConnectionFactory;
}
@Bean(name = "factory1")
public JmsListenerContainerFactory<?> factory1(@Qualifier("cachingConnectionFactory1") ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory topicFactory = new DefaultJmsListenerContainerFactory();
topicFactory.setConnectionFactory(connectionFactory);
topicFactory.setSubscriptionDurable(Boolean.TRUE);
// configure DefaultJmsListenerContainerFactoryConfigurer with caching factory and listener factory
configurer.configure(topicFactory, connectionFactory);
return topicFactory;
}
// Goes here for another topic in the same way as above two methods to establish initial connection to the topic
}
JMS listener message receiver class 的代码是这样的,你可以提供 destination 作为主题名称,对于 factory,你可以提供我们上面设置的连接工厂的 bean:
@JmsListener(destination = "${topic.name}", containerFactory = "factory1",
subscription = "${topic.subscription.name}")
public void receiveMessage(JmsTextMessage jmsTextMessage) throws JMSException, IOException, InterruptedException {
// listener code goes here
}
在我的项目中,我添加了两个不同的 JmsListener,但是当我 运行 在 ActiveMQ 面板中进行项目时,只有其中一个主题有消费者!
那么我应该为每个 JmsListener 添加单独的 jmsListenerContainerFactory 配置吗??
@JmsListener(destination = "foo1")
public void foo1(final Message jsonMessage) throws JMSException {
...
}
@JmsListener(destination = "foo2")
public void foo12(final Message jsonMessage) throws JMSException {
...
}
编辑: 这是来自 JMS 配置文件:
@Configuration
@EnableJms
public class FooJmsConfig {
@Bean
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(BROKER_URL);
connectionFactory.setPassword(BROKER_USERNAME);
connectionFactory.setUserName(BROKER_PASSWORD);
connectionFactory.setUseCompression(true);
connectionFactory.setClientID("FPP_API");
connectionFactory.setConnectionIDPrefix("DRR");
connectionFactory.setUseAsyncSend(true);
return connectionFactory;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("1-1");
factory.setPubSubDomain(true);
factory.setSubscriptionDurable(true);
return factory;
}
}
我认为问题在于您为两个侦听器使用的 JmsListenerContainerFactory
设置了 1 的并发性。
如果需要,您可以创建多个 JmsListenerContainerFactory
并通过指定 JmsListener#containerFactory 属性.
JmsListener
我发现通过将 setClientID()
从 ActiveMQConnectionFactory 提供程序方法移动到 DefaultJmsListenerContainerFactory 提供程序方法,
我只能为每个 jmsListener 使用一个全局 ActiveMQConnectionFactory 提供程序方法和多个 DefaultJmsListenerContainerFactory 提供程序方法:
所以最终的工作代码是:
JMSConfig 文件:
@Configuration
@EnableJms
public class FooJmsConfig {
@Bean
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(BROKER_URL);
connectionFactory.setPassword(BROKER_USERNAME);
connectionFactory.setUserName(BROKER_PASSWORD);
connectionFactory.setUseCompression(true);
connectionFactory.setConnectionIDPrefix("DRR");
connectionFactory.setUseAsyncSend(true);
return connectionFactory;
}
@Bean(name= "foo1")
public DefaultJmsListenerContainerFactory foo1() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("1-1");
factory.setPubSubDomain(true);
factory.setSubscriptionDurable(true);
connectionFactory.setClientID("FOO_1");
return factory;
}
@Bean(name= "foo2")
public DefaultJmsListenerContainerFactory foo2() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("1-1");
factory.setPubSubDomain(true);
factory.setSubscriptionDurable(true);
connectionFactory.setClientID("FOO_1");
return factory;
}
}
jms 听众将是
@JmsListener(destination = "foo1", containerFactory="foo1")
public void foo1(final Message jsonMessage) throws JMSException {
...
}
@JmsListener(destination = "foo2", containerFactory="foo2")
public void foo12(final Message jsonMessage) throws JMSException {
...
}
您可以建立多个 JMSFactory 连接。默认的 JMS 侦听器只接受单例连接工厂对象。所以你需要设置两个 jmslistenercontainerfactory 本身的 bean。
连接到单个主题的代码如下:
@Configuration
@EnableJms
@EnableTransactionManagement
public class JMSConnectionConfig{
private static final String AMQP_URI_FORMAT = "amqps://%s?amqp.idleTimeout=%d";
private int idleTimeout;
private String hostURL;
@Bean(name = "cachingConnectionFactory1")
@Primary
public ConnectionFactory myConnectionFactory1() {
// set up connection details to the topic
String remoteUri = String.format(AMQP_URI_FORMAT, hostURL, idleTimeout);
JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory();
jmsConnectionFactory.setRemoteURI(remoteUri);
jmsConnectionFactory.setClientID(clientId);
jmsConnectionFactory.setUsername(topic1SASName);
jmsConnectionFactory.setPassword(topic1SASKey);
// create caching factory object and return as connectionfactory parent object
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setTargetConnectionFactory(jmsConnectionFactory);
cachingConnectionFactory.setReconnectOnException(true);
return cachingConnectionFactory;
}
@Bean(name = "factory1")
public JmsListenerContainerFactory<?> factory1(@Qualifier("cachingConnectionFactory1") ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory topicFactory = new DefaultJmsListenerContainerFactory();
topicFactory.setConnectionFactory(connectionFactory);
topicFactory.setSubscriptionDurable(Boolean.TRUE);
// configure DefaultJmsListenerContainerFactoryConfigurer with caching factory and listener factory
configurer.configure(topicFactory, connectionFactory);
return topicFactory;
}
// Goes here for another topic in the same way as above two methods to establish initial connection to the topic
}
JMS listener message receiver class 的代码是这样的,你可以提供 destination 作为主题名称,对于 factory,你可以提供我们上面设置的连接工厂的 bean:
@JmsListener(destination = "${topic.name}", containerFactory = "factory1",
subscription = "${topic.subscription.name}")
public void receiveMessage(JmsTextMessage jmsTextMessage) throws JMSException, IOException, InterruptedException {
// listener code goes here
}