Spring 添加围绕目标方法执行的 AOP 建议后,云流测试失败
Spring cloud streams test failing after adding AOP advice that executes around the target method
我有一个 spring 云流应用程序,它的流侦听器使用来自输入通道的事件。一切 运行 都很顺利,直到我添加了一个 AOP 建议来记录处理方法的执行(在应用程序中的其他方法之间)。之后,测试开始失败并出现以下错误:
org.springframework.messaging.MessagingException: Exception thrown while invoking com.acme.fx.exchangerate.store.infrastructure.entrypoint.messaging.ExchangeRateStoreStreamListener$$EnhancerBySpringCGLIB$95881e#handle[1 args]; nested exception is java.lang.IllegalStateException: The mapped handler method class 'com.acme.fx.exchangerate.store.infrastructure.entrypoint.messaging.ExchangeRateStoreStreamListener$$EnhancerBySpringCGLIB$95881e$MockitoMock33324661' is not an instance of the actual endpoint bean class 'com.acme.fx.exchangerate.store.infrastructure.entrypoint.messaging.ExchangeRateStoreStreamListener$$EnhancerBySpringCGLIB$95881e$$EnhancerBySpringCGLIB$a2d55ce'. If the endpoint requires proxying (e.g. due to @Transactional), please use class-based proxying.
HandlerMethod details:
...
接收器定义:
申请代码如下:
public interface ExchangeRateStoreStreamSink {
String NEWEXCHANGERATE="new-exchange-rate";
@Input(NEWEXCHANGERATE)
SubscribableChannel newExchangeRate();
}
带有注释方法的流监听器:
@EnableBinding(ExchangeRateStoreStreamSink.class)
public class ExchangeRateStoreStreamListener {
private CommandBus commandBus;
@Autowired
public ExchangeRateStoreStreamListener(CommandBus commandBus) {
this.commandBus = commandBus;
}
@Loggable(operationName="ExchangeRateConsumption")
@StreamListener(ExchangeRateStoreStreamSink.NEWEXCHANGERATE)
public void handle(NewExchangeRateMessage newExchangeRateMessage) {
AddExchangeRateCommand addExchangeRateCommand = new AddExchangeRateCommand(newExchangeRateMessage.from,
newExchangeRateMessage.to, newExchangeRateMessage.amount, newExchangeRateMessage.date);
commandBus.dispatch(addExchangeRateCommand);
}
}
测试:
@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class ExchangeRateStoreStreamListenerTest {
@Autowired
private ExchangeRateStoreStreamSink streamSink;
@SpyBean
private ExchangeRateStoreStreamListener streamListener;
@Test
public void test() {
SubscribableChannel input = streamSink.newExchangeRate();
NewExchangeRateMessage exchangeRateMessage = NewExchangeRateMessageFactory.aNewExchangeRateMessage();
input.send(new GenericMessage<>(exchangeRateMessage));
verify(streamListener).handle(any(NewExchangeRateMessage.class));
}
}
AOP 方面:
@Aspect
@Component
public class LoggingAspect {
private static final String API_DOMAIN = "fx";
@Pointcut(value = "@annotation(loggable) && execution(* *(..))", argNames = "loggable")
public void loggableMethod(Loggable loggable) { }
@Around(value = "loggableMethod(loggable)", argNames = "pjp,loggable")
public Object logAccess(ProceedingJoinPoint pjp, Loggable loggable) throws Throwable {
final Signature signature = pjp.getSignature();
final Logger logger = LogManager.getLogger(signature.getDeclaringType());
logger.info( "api_domain={} _operation={} _message=\"Start operation\"",
API_DOMAIN, loggable.operationName());
try {
return pjp.proceed();
} catch (DomainError domainError) {
// Some logic here
}
}
}
我们非常欢迎任何帮助。提前致谢!
这是因为 StreamListener 已经是一个代理。有太多需要解释的内容,可能是写博客的好主题。 . .
无论如何,我很高兴你已经描述了你试图解决的实际问题,这个问题可以用更简单的方法解决,那就是引入 ChannelInterceptor
- 它基本上充当你的消息处理程序调用的 Around 建议.
基本上这是一个例子:
@Bean
@GlobalChannelInterceptor
public ChannelInterceptor channelInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> msg, MessageChannel mc) {
System.out.println("Before send to channel: " + mc);
return msg;
}
@Override
public void afterSendCompletion(Message<?> msg, MessageChannel mc, boolean bln, Exception excptn) {
System.out.println("After send completion to channel: " + mc);
}
@Override
public void postSend(Message<?> msg, MessageChannel mc, boolean bln) {
System.out.println("After send to channel: " + mc);
}
};
}
。 . .这是它会产生的输出:
Before send to channel: input
Before send to channel: integrationFlowCreator.channel#0
===> SOME LOG MESSAGE INSIDE YOUR CODE
After send to channel: integrationFlowCreator.channel#0
After send completion to channel: integrationFlowCreator.channel#0
After send to channel: input
After send completion to channel: input
您只需在配置中声明它就可以了。它将应用于所有通道,因此您(通过逻辑)仅在您想要的时候进行监控。
有关详细信息,请参阅 GlobalChannelInterceptor
的 Javadoc。
希望有帮助
我有一个 spring 云流应用程序,它的流侦听器使用来自输入通道的事件。一切 运行 都很顺利,直到我添加了一个 AOP 建议来记录处理方法的执行(在应用程序中的其他方法之间)。之后,测试开始失败并出现以下错误:
org.springframework.messaging.MessagingException: Exception thrown while invoking com.acme.fx.exchangerate.store.infrastructure.entrypoint.messaging.ExchangeRateStoreStreamListener$$EnhancerBySpringCGLIB$95881e#handle[1 args]; nested exception is java.lang.IllegalStateException: The mapped handler method class 'com.acme.fx.exchangerate.store.infrastructure.entrypoint.messaging.ExchangeRateStoreStreamListener$$EnhancerBySpringCGLIB$95881e$MockitoMock33324661' is not an instance of the actual endpoint bean class 'com.acme.fx.exchangerate.store.infrastructure.entrypoint.messaging.ExchangeRateStoreStreamListener$$EnhancerBySpringCGLIB$95881e$$EnhancerBySpringCGLIB$a2d55ce'. If the endpoint requires proxying (e.g. due to @Transactional), please use class-based proxying. HandlerMethod details: ...
接收器定义:
申请代码如下:
public interface ExchangeRateStoreStreamSink {
String NEWEXCHANGERATE="new-exchange-rate";
@Input(NEWEXCHANGERATE)
SubscribableChannel newExchangeRate();
}
带有注释方法的流监听器:
@EnableBinding(ExchangeRateStoreStreamSink.class)
public class ExchangeRateStoreStreamListener {
private CommandBus commandBus;
@Autowired
public ExchangeRateStoreStreamListener(CommandBus commandBus) {
this.commandBus = commandBus;
}
@Loggable(operationName="ExchangeRateConsumption")
@StreamListener(ExchangeRateStoreStreamSink.NEWEXCHANGERATE)
public void handle(NewExchangeRateMessage newExchangeRateMessage) {
AddExchangeRateCommand addExchangeRateCommand = new AddExchangeRateCommand(newExchangeRateMessage.from,
newExchangeRateMessage.to, newExchangeRateMessage.amount, newExchangeRateMessage.date);
commandBus.dispatch(addExchangeRateCommand);
}
}
测试:
@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class ExchangeRateStoreStreamListenerTest {
@Autowired
private ExchangeRateStoreStreamSink streamSink;
@SpyBean
private ExchangeRateStoreStreamListener streamListener;
@Test
public void test() {
SubscribableChannel input = streamSink.newExchangeRate();
NewExchangeRateMessage exchangeRateMessage = NewExchangeRateMessageFactory.aNewExchangeRateMessage();
input.send(new GenericMessage<>(exchangeRateMessage));
verify(streamListener).handle(any(NewExchangeRateMessage.class));
}
}
AOP 方面:
@Aspect
@Component
public class LoggingAspect {
private static final String API_DOMAIN = "fx";
@Pointcut(value = "@annotation(loggable) && execution(* *(..))", argNames = "loggable")
public void loggableMethod(Loggable loggable) { }
@Around(value = "loggableMethod(loggable)", argNames = "pjp,loggable")
public Object logAccess(ProceedingJoinPoint pjp, Loggable loggable) throws Throwable {
final Signature signature = pjp.getSignature();
final Logger logger = LogManager.getLogger(signature.getDeclaringType());
logger.info( "api_domain={} _operation={} _message=\"Start operation\"",
API_DOMAIN, loggable.operationName());
try {
return pjp.proceed();
} catch (DomainError domainError) {
// Some logic here
}
}
}
我们非常欢迎任何帮助。提前致谢!
这是因为 StreamListener 已经是一个代理。有太多需要解释的内容,可能是写博客的好主题。 . .
无论如何,我很高兴你已经描述了你试图解决的实际问题,这个问题可以用更简单的方法解决,那就是引入 ChannelInterceptor
- 它基本上充当你的消息处理程序调用的 Around 建议.
基本上这是一个例子:
@Bean
@GlobalChannelInterceptor
public ChannelInterceptor channelInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> msg, MessageChannel mc) {
System.out.println("Before send to channel: " + mc);
return msg;
}
@Override
public void afterSendCompletion(Message<?> msg, MessageChannel mc, boolean bln, Exception excptn) {
System.out.println("After send completion to channel: " + mc);
}
@Override
public void postSend(Message<?> msg, MessageChannel mc, boolean bln) {
System.out.println("After send to channel: " + mc);
}
};
}
。 . .这是它会产生的输出:
Before send to channel: input
Before send to channel: integrationFlowCreator.channel#0
===> SOME LOG MESSAGE INSIDE YOUR CODE
After send to channel: integrationFlowCreator.channel#0
After send completion to channel: integrationFlowCreator.channel#0
After send to channel: input
After send completion to channel: input
您只需在配置中声明它就可以了。它将应用于所有通道,因此您(通过逻辑)仅在您想要的时候进行监控。
有关详细信息,请参阅 GlobalChannelInterceptor
的 Javadoc。
希望有帮助