我们如何使用@RabbitListener 连接到 before/After 消息处理
How do we hook into before/After message processing using @RabbitListener
问题: 我正在从 MessageListener 接口实现迁移到 @RabbitListener。我有这样的逻辑,我在一个 MessageListener 上做 "pre" 和 "post" 消息处理,它被几个 类
继承
示例:
public AbstractMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
//do some pre message processing
process(Message message);
// do some post message processing
}
protected abstract void process(Message message);
}
问题:有没有一种方法可以使用@RabbitListener注解实现类似的东西我可以继承pre/post消息处理逻辑而不必重新实现或调用每个子 @RabbitListener 注释中的 pre/post 消息处理,同时为子 @RabbitListener 维护可自定义的方法签名?还是这太贪心了?
示例期望结果:
public class SomeRabbitListenerClass {
@RabbitListener( id = "listener.mypojo",queues = "${rabbitmq.some.queue}")
public void listen(@Valid MyPojo myPojo) {
//...
}
}
public class SomeOtherRabbitListenerClass {
@RabbitListener(id = "listener.orders",queues ="${rabbitmq.some.other.queue}")
public void listen(Order order, @Header("order_type") String orderType) {
//...
}
}
这两个@RabbitListener 使用相同的继承 pre/post 消息处理
我看到@RabbitListener 注释中有一个 'containerFactory' 参数,但我已经在配置中声明了一个...而且我真的很确定如何通过自定义实现我想要的继承容器工厂。
更新答案:这就是我最终做的。
建议定义:
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.amqp.core.Message;
/**
* AOP Around advice wrapper. Every time a message comes in we can do
* pre/post processing by using this advice by implementing the before/after methods.
* @author sjacobs
*
*/
public class RabbitListenerAroundAdvice implements MethodInterceptor {
/**
* place the "AroundAdvice" around each new message being processed.
*/
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Message message = (Message) invocation.getArguments()[1];
before(message)
Object result = invocation.proceed();
after(message);
return result;
}
声明 beans: 在你的 rabbitmq 配置中将建议声明为 Spring bean 并将其传递给 rabbitListenerContainerFactory#setAdviceChain(...)
//...
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory( cachingConnectionFactory() );
factory.setTaskExecutor(threadPoolTaskExecutor());
factory.setMessageConverter(jackson2JsonMessageConverter());
factory.setAdviceChain(rabbitListenerAroundAdvice());
return factory;
}
@Bean
public RabbitListenerAroundAdvice rabbitListenerAroundAdvice() {
return new RabbitListenerAroundAdvice();
}
// ...
更正
您可以使用 SimpleRabbitListenerContainerFactory 中的建议链将环绕建议应用于为 @RabbitListener
创建的侦听器;两个参数是 Channel
和 Message
.
如果只需要在调用监听器前进行操作,可以在容器afterReceivePostProcessors
.
中添加MessagePostProcessor
(s)
这里不可能继承,因为 POJO 方法上的注释处理和 MessageListener
实现是完全不同的故事。
使用 MessageListener
您可以完全控制目标行为和 container
。
使用注解,您只需要处理 POJO,framework-free 代码。特定的 MessageListener
是在后台创建的。而那个完全基于注释的方法。
我想我们可以使用 Spring AOP 框架来实现您的要求。
查看最近的问题及其对此事的回答:How to write an integration test for @RabbitListener annotation?
问题: 我正在从 MessageListener 接口实现迁移到 @RabbitListener。我有这样的逻辑,我在一个 MessageListener 上做 "pre" 和 "post" 消息处理,它被几个 类
继承示例:
public AbstractMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
//do some pre message processing
process(Message message);
// do some post message processing
}
protected abstract void process(Message message);
}
问题:有没有一种方法可以使用@RabbitListener注解实现类似的东西我可以继承pre/post消息处理逻辑而不必重新实现或调用每个子 @RabbitListener 注释中的 pre/post 消息处理,同时为子 @RabbitListener 维护可自定义的方法签名?还是这太贪心了?
示例期望结果:
public class SomeRabbitListenerClass {
@RabbitListener( id = "listener.mypojo",queues = "${rabbitmq.some.queue}")
public void listen(@Valid MyPojo myPojo) {
//...
}
}
public class SomeOtherRabbitListenerClass {
@RabbitListener(id = "listener.orders",queues ="${rabbitmq.some.other.queue}")
public void listen(Order order, @Header("order_type") String orderType) {
//...
}
}
这两个@RabbitListener 使用相同的继承 pre/post 消息处理
我看到@RabbitListener 注释中有一个 'containerFactory' 参数,但我已经在配置中声明了一个...而且我真的很确定如何通过自定义实现我想要的继承容器工厂。
更新答案:这就是我最终做的。
建议定义:
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.amqp.core.Message;
/**
* AOP Around advice wrapper. Every time a message comes in we can do
* pre/post processing by using this advice by implementing the before/after methods.
* @author sjacobs
*
*/
public class RabbitListenerAroundAdvice implements MethodInterceptor {
/**
* place the "AroundAdvice" around each new message being processed.
*/
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Message message = (Message) invocation.getArguments()[1];
before(message)
Object result = invocation.proceed();
after(message);
return result;
}
声明 beans: 在你的 rabbitmq 配置中将建议声明为 Spring bean 并将其传递给 rabbitListenerContainerFactory#setAdviceChain(...)
//...
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory( cachingConnectionFactory() );
factory.setTaskExecutor(threadPoolTaskExecutor());
factory.setMessageConverter(jackson2JsonMessageConverter());
factory.setAdviceChain(rabbitListenerAroundAdvice());
return factory;
}
@Bean
public RabbitListenerAroundAdvice rabbitListenerAroundAdvice() {
return new RabbitListenerAroundAdvice();
}
// ...
更正
您可以使用 SimpleRabbitListenerContainerFactory 中的建议链将环绕建议应用于为 @RabbitListener
创建的侦听器;两个参数是 Channel
和 Message
.
如果只需要在调用监听器前进行操作,可以在容器afterReceivePostProcessors
.
MessagePostProcessor
(s)
这里不可能继承,因为 POJO 方法上的注释处理和 MessageListener
实现是完全不同的故事。
使用 MessageListener
您可以完全控制目标行为和 container
。
使用注解,您只需要处理 POJO,framework-free 代码。特定的 MessageListener
是在后台创建的。而那个完全基于注释的方法。
我想我们可以使用 Spring AOP 框架来实现您的要求。
查看最近的问题及其对此事的回答:How to write an integration test for @RabbitListener annotation?