Spring 添加围绕目标方法执行的 AOP 建议后,云流测试失败

Spring cloud streams test failing after adding AOP advice that executes around the target method

我有一个 spring 云流应用程序,它的流侦听器使用来自输入通道的事件。一切 运行 都很顺利,直到我添加了一个 AOP 建议来记录处理方法的执行(在应用程序中的其他方法之间)。之后,测试开始失败并出现以下错误:

org.springframework.messaging.MessagingException: Exception thrown while invoking com.acme.fx.exchangerate.store.infrastructure.entrypoint.messaging.ExchangeRateStoreStreamListener$$EnhancerBySpringCGLIB$95881e#handle[1 args]; nested exception is java.lang.IllegalStateException: The mapped handler method class 'com.acme.fx.exchangerate.store.infrastructure.entrypoint.messaging.ExchangeRateStoreStreamListener$$EnhancerBySpringCGLIB$95881e$MockitoMock33324661' is not an instance of the actual endpoint bean class 'com.acme.fx.exchangerate.store.infrastructure.entrypoint.messaging.ExchangeRateStoreStreamListener$$EnhancerBySpringCGLIB$95881e$$EnhancerBySpringCGLIB$a2d55ce'. If the endpoint requires proxying (e.g. due to @Transactional), please use class-based proxying. HandlerMethod details: ...

接收器定义:

申请代码如下:

public interface ExchangeRateStoreStreamSink {
        String NEWEXCHANGERATE="new-exchange-rate";

        @Input(NEWEXCHANGERATE)
        SubscribableChannel newExchangeRate();
}

带有注释方法的流监听器:

@EnableBinding(ExchangeRateStoreStreamSink.class)
public class ExchangeRateStoreStreamListener {
    private CommandBus commandBus;

    @Autowired
    public ExchangeRateStoreStreamListener(CommandBus commandBus) {
        this.commandBus = commandBus;
    }

    @Loggable(operationName="ExchangeRateConsumption")
    @StreamListener(ExchangeRateStoreStreamSink.NEWEXCHANGERATE)
    public void handle(NewExchangeRateMessage newExchangeRateMessage) {
        AddExchangeRateCommand addExchangeRateCommand = new AddExchangeRateCommand(newExchangeRateMessage.from,
                newExchangeRateMessage.to, newExchangeRateMessage.amount, newExchangeRateMessage.date);
        commandBus.dispatch(addExchangeRateCommand);
    }
}

测试:

@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class ExchangeRateStoreStreamListenerTest {
    @Autowired
    private ExchangeRateStoreStreamSink streamSink;

    @SpyBean
    private ExchangeRateStoreStreamListener streamListener;


    @Test
    public void test() {
        SubscribableChannel input = streamSink.newExchangeRate();
        NewExchangeRateMessage exchangeRateMessage = NewExchangeRateMessageFactory.aNewExchangeRateMessage();
        input.send(new GenericMessage<>(exchangeRateMessage));

        verify(streamListener).handle(any(NewExchangeRateMessage.class));
    }
}

AOP 方面:

@Aspect
@Component
public class LoggingAspect {
    private static final String API_DOMAIN = "fx";

    @Pointcut(value = "@annotation(loggable) && execution(* *(..))", argNames = "loggable")
    public void loggableMethod(Loggable loggable) { }

    @Around(value = "loggableMethod(loggable)", argNames = "pjp,loggable")
    public Object logAccess(ProceedingJoinPoint pjp, Loggable loggable) throws Throwable {
        final Signature signature = pjp.getSignature();

        final Logger logger = LogManager.getLogger(signature.getDeclaringType());

        logger.info( "api_domain={} _operation={} _message=\"Start operation\"",
                API_DOMAIN, loggable.operationName());
        try {
            return pjp.proceed();
        } catch (DomainError domainError) {
            // Some logic here
        }
    }
}

我们非常欢迎任何帮助。提前致谢!

这是因为 StreamListener 已经是一个代理。有太多需要解释的内容,可能是写博客的好主题。 . . 无论如何,我很高兴你已经描述了你试图解决的实际问题,这个问题可以用更简单的方法解决,那就是引入 ChannelInterceptor - 它基本上充当你的消息处理程序调用的 Around 建议. 基本上这是一个例子:

@Bean
@GlobalChannelInterceptor 
public ChannelInterceptor channelInterceptor() {
    return new ChannelInterceptor() {
        @Override
        public Message<?> preSend(Message<?> msg, MessageChannel mc) {
            System.out.println("Before send to channel: " + mc);
            return msg;
        }

        @Override
        public void afterSendCompletion(Message<?> msg, MessageChannel mc, boolean bln, Exception excptn) {
            System.out.println("After send completion to channel: " + mc);
        }

        @Override
        public void postSend(Message<?> msg, MessageChannel mc, boolean bln) {
            System.out.println("After send to channel: " + mc);
        }

    };
}

。 . .这是它会产生的输出:

Before send to channel: input
Before send to channel: integrationFlowCreator.channel#0
===> SOME LOG MESSAGE INSIDE YOUR CODE 
After send to channel: integrationFlowCreator.channel#0
After send completion to channel: integrationFlowCreator.channel#0
After send to channel: input
After send completion to channel: input

您只需在配置中声明它就可以了。它将应用于所有通道,因此您(通过逻辑)仅在您想要的时候进行监控。 有关详细信息,请参阅 GlobalChannelInterceptor 的 Javadoc。 希望有帮助