将 JMSListener 与 RabbitMQ 结合使用
Use JMSListener with RabbitMQ
我的应用程序当前使用 IBM MQ 并且具有队列配置设置并且可以与 JMS 一起正常工作。例如
@EnableJms
@Configuration
public class IBMQueueConfig {
@Bean("defaultContainer")
public JmsListenerContainerFactory containerFactory(final ConnectionFactory connectionFactory,
final ErrorHandler errorHandler) {
final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setErrorHandler(errorHandler);
return factory;
}
}
我可以接收消息并进行如下处理:
@Service
public class ProcessMessageReceive {
@JmsListener(destination = "${queue}", concurrency = "${threads}", containerFactory = "defaultContainer")
public Message processMessage(@Payload final String message) {
//do stuff
}
}
我需要使用 RabbitMQ
进行测试并需要额外配置。我有以下 class:
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
@EnableRabbit
public class RabbitMQConfiguration {
private String host;
private int port;
private String username;
private String password;
private String virtualHost;
@Bean
public DirectExchange exchange() {
return new DirectExchange(exchange);
}
@Bean("defaultContainer")
public JmsListenerContainerFactory containerFactory(@Qualifier("rabbit-connection-factory") final ConnectionFactory connectionFactory) {
final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(); //ERROR
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(@Qualifier("rabbit-connection-factory") final ConnectionFactory connectionFactory,
@Value("spring.rabbitmq.listener.simple.concurrency") final int concurrency,
@Value("spring.rabbitmq.listener.simple.max-concurrency") final int maxConcurrency) {
final SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory);
containerFactory.setConcurrentConsumers(concurrency);
containerFactory.setMaxConcurrentConsumers(maxConcurrency);
containerFactory.setDefaultRequeueRejected(false);
return containerFactory;
}
@Bean(name = "rabbit-connection-factory")
public ConnectionFactory connectionFactory() {
final CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
return connectionFactory;
}
@Bean
public Queue inboundQueue() {
return new Queue(fixInboundQueue, true);
}
@Bean
public Binding inboundQueueBinding() {
return bind(inboundQueue())
.to(exchange())
.with(routingKey);
}
}
我在行 factory.setConnectionFactory(connectionFactory);
上收到错误,因为它需要 javax.jms.ConnectionFactory
但提供的是 Rabbit MQ One。
有什么方法可以连接 Rabbit MQ ConnectionFactory 吗?我知道如果我使用 RMQConnectionFactory
是可能的,但我想看看我是否可以通过 Spring Rabbit 依赖实现它。
objective 是为了避免专门为 Rabbit MQ 编写另一个 processMessage()
并重新使用我已经拥有的东西。
或者,我可以同时使用这两个注释吗?在哪种情况下,我会根据产品或测试使用 spring 配置文件来启用我需要的配置文件?
@RabbitListener(queues = "${app.rabbitmq.queue}")
@JmsListener(destination = "${queue}", concurrency = "${threads}", containerFactory = "defaultContainer")
public Message processMessage(@Payload final String message) {
//do stuff
}
如果你想通过 AMQP 与 RabbitMQ 通信,你必须使用 @RabbitListener
而不是 @JmsListener
。
如果您想在生产中使用 JMS 并在测试中使用 RabbitMQ,则可以添加这两个注释。
我的应用程序当前使用 IBM MQ 并且具有队列配置设置并且可以与 JMS 一起正常工作。例如
@EnableJms
@Configuration
public class IBMQueueConfig {
@Bean("defaultContainer")
public JmsListenerContainerFactory containerFactory(final ConnectionFactory connectionFactory,
final ErrorHandler errorHandler) {
final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setErrorHandler(errorHandler);
return factory;
}
}
我可以接收消息并进行如下处理:
@Service
public class ProcessMessageReceive {
@JmsListener(destination = "${queue}", concurrency = "${threads}", containerFactory = "defaultContainer")
public Message processMessage(@Payload final String message) {
//do stuff
}
}
我需要使用 RabbitMQ
进行测试并需要额外配置。我有以下 class:
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
@EnableRabbit
public class RabbitMQConfiguration {
private String host;
private int port;
private String username;
private String password;
private String virtualHost;
@Bean
public DirectExchange exchange() {
return new DirectExchange(exchange);
}
@Bean("defaultContainer")
public JmsListenerContainerFactory containerFactory(@Qualifier("rabbit-connection-factory") final ConnectionFactory connectionFactory) {
final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(); //ERROR
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(@Qualifier("rabbit-connection-factory") final ConnectionFactory connectionFactory,
@Value("spring.rabbitmq.listener.simple.concurrency") final int concurrency,
@Value("spring.rabbitmq.listener.simple.max-concurrency") final int maxConcurrency) {
final SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory);
containerFactory.setConcurrentConsumers(concurrency);
containerFactory.setMaxConcurrentConsumers(maxConcurrency);
containerFactory.setDefaultRequeueRejected(false);
return containerFactory;
}
@Bean(name = "rabbit-connection-factory")
public ConnectionFactory connectionFactory() {
final CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
return connectionFactory;
}
@Bean
public Queue inboundQueue() {
return new Queue(fixInboundQueue, true);
}
@Bean
public Binding inboundQueueBinding() {
return bind(inboundQueue())
.to(exchange())
.with(routingKey);
}
}
我在行 factory.setConnectionFactory(connectionFactory);
上收到错误,因为它需要 javax.jms.ConnectionFactory
但提供的是 Rabbit MQ One。
有什么方法可以连接 Rabbit MQ ConnectionFactory 吗?我知道如果我使用 RMQConnectionFactory
是可能的,但我想看看我是否可以通过 Spring Rabbit 依赖实现它。
objective 是为了避免专门为 Rabbit MQ 编写另一个 processMessage()
并重新使用我已经拥有的东西。
或者,我可以同时使用这两个注释吗?在哪种情况下,我会根据产品或测试使用 spring 配置文件来启用我需要的配置文件?
@RabbitListener(queues = "${app.rabbitmq.queue}")
@JmsListener(destination = "${queue}", concurrency = "${threads}", containerFactory = "defaultContainer")
public Message processMessage(@Payload final String message) {
//do stuff
}
如果你想通过 AMQP 与 RabbitMQ 通信,你必须使用 @RabbitListener
而不是 @JmsListener
。
如果您想在生产中使用 JMS 并在测试中使用 RabbitMQ,则可以添加这两个注释。