如何在 Spring 测试中监视自动装配的 bean

How spy an autowired bean in Spring Tests

我有一个简单的日志处理程序 bean 配置,我将其注入 IntegrationFlow

@Configuration
class LogHandlerConfiguration {

    private LoggingHandler handler;

    @Bean
    public MessageHandler kafkaSuccessHandler() {
        return getLogger(LoggingHandler.Level.INFO);
    }

    @Bean(name="kafkaFailureHandler")
    public MessageHandler kafkaFailureHandler() {
        return getLogger(LoggingHandler.Level.ERROR);
    }

    private LoggingHandler getLogger(LoggingHandler.Level level) {
        handler = new LoggingHandler(level);
        handler.setShouldLogFullMessage(Boolean.TRUE);
        return handler;
    }
}

要测试的集成流程

@Bean
IntegrationFlow kafkaFailureFlow(ExecutorChannel kafkaErrorChannel, MessageHandler kafkaFailureHandler) {
    return IntegrationFlows.from(kafkaErrorChannel)
            .transform("payload.failedMessage")
            .handle(kafkaFailureHandler)
            .get();
}

这是我的测试

@SpyBean
MessageHandler kafkaFailureHandler;

@BeforeEach
public void setup() {
    MockitoAnnotations.openMocks(KafkaPublishFailureTest.class);
}

@Test
void testFailedKafkaPublish() {

    //Dummy message
    Map<String, String> map = new HashMap<>();
    map.put("key", "value");
    // Publish Message
    Message<Map<String, String>> message = MessageBuilder.withPayload(map)
            .setHeader("X-UPSTREAM-TYPE", "alm")
            .setHeader("X-UPSTREAM-INSTANCE", "jira")
            .setHeader("X-MESSAGE-KEY", "key-1")
            .build();

    kafkaGateway.publish(message);
    // Failure handler called
    Mockito.verify(kafkaFailureHandler, Mockito.timeout(0).atLeastOnce()).handleMessage(
            ArgumentMatchers.any(Message.class));
}

我们已经创建了一个通用的 Kafka 生产者、消费者配置,下游应用程序可以将最适合其需求的失败和成功处理程序附加到该配置。我无法验证 LoggingHandler 在这种情况下至少被调用一次。

failureHandlerExecturoeChannel 的支持下执行 ThreadPoolTaskExecutor

@Bean
ExecutorChannel kafkaErrorChannel(Executor threadPoolExecutor) {
    return MessageChannels.executor("kafkaErrorChannel", threadPoolExecutor).get();
}

通过重试建议处理失败

@Bean
RequestHandlerRetryAdvice retryAdvice(ExecutorChannel kafkaErrorChannel) {
    RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
    retryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(kafkaErrorChannel));
    return retryAdvice;
}

我在 运行 测试

时遇到此错误
java.lang.IllegalStateException: No bean found for definition [SpyDefinition@44dfdd58 name = '', typeToSpy = org.springframework.messaging.MessageHandler, reset = AFTER]
    at org.springframework.util.Assert.state(Assert.java:97) ~[spring-core-5.3.4.jar:5.3.4]
    at org.springframework.boot.test.mock.mockito.MockitoPostProcessor.inject(MockitoPostProcessor.java:351) ~[spring-boot-test-2.4.3.jar:2.4.3]

以下是我已经尝试过并且有效的方法:

@SpringBootApplication
public class Demo1Application {

    public static void main(String[] args) {
        SpringApplication.run(Demo1Application.class, args);
    }

    @Bean
    ExecutorChannel kafkaErrorChannel(TaskExecutor taskExecutor) {
        return new ExecutorChannel(taskExecutor);
    }

    @Bean
    public MessageHandler kafkaFailureHandler() {
        LoggingHandler handler = new LoggingHandler(LoggingHandler.Level.ERROR);
        handler.setShouldLogFullMessage(Boolean.TRUE);
        return handler;
    }

    @Bean
    IntegrationFlow kafkaFailureFlow(ExecutorChannel kafkaErrorChannel, MessageHandler kafkaFailureHandler) {
        return IntegrationFlows.from(kafkaErrorChannel)
                .transform("payload.failedMessage")
                .handle(kafkaFailureHandler)
                .get();
    }

}

@SpringBootTest
class Demo1ApplicationTests {

    @Autowired
    ExecutorChannel kafkaErrorChannel;

    @SpyBean
    MessageHandler kafkaFailureHandler;

    @Test
    void testSpyBean() throws InterruptedException {
        MessagingException payload = new MessageHandlingException(new GenericMessage<>("test"));
        this.kafkaErrorChannel.send(new ErrorMessage(payload));
        Thread.sleep(1000);
        Mockito.verify(this.kafkaFailureHandler).handleMessage(ArgumentMatchers.any(Message.class));
    }

}

也许是您没有将 LogHandlerConfiguration 包含在 @SpringBootTest 配置中的问题。这就是为什么我问了一个简单的项目来玩。 您的具有所有这些属性的代码太自定义了,无法 copy/paste 进入我的环境...

还要注意那个Thread.sleep(1000);。由于您的 kafkaErrorChannel 是一个 ExecutorChannel,消息消耗发生在另一个线程上,离开您的主测试线程并由于竞争条件导致失败。很难猜测正确的时间,所以最好存根模拟方法来实现一些线程屏障,如 new CountDownLatch(1) 并在测试中等待它。

题外还可以调查Spring集成测试框架:https://docs.spring.io/spring-integration/docs/current/reference/html/testing.html#test-context

所以,被挂了为什么不 @SpyBean?有两个问题

  1. 成功和失败处理程序都是令人困惑的 MessageHandlers @SpyBean
  2. Kafka 生产者等待时间太长,即 1000 毫秒

这是最终起作用的方法,使用命名 bean

@Bean("kafkaFailureHandler")
public MessageHandler kafkaFailureHandler() {
    LoggingHandler handler = new LoggingHandler(LoggingHandler.Level.INFO);
    handler.setShouldLogFullMessage(Boolean.TRUE);
    return handler;
}

然后在测试中也减少最大块

@DirtiesContext
@SpringBootTest(classes = {KafkaHandlerConfiguration.class, SwiftalkKafkaGateway.class})
@SpringIntegrationTest(noAutoStartup = {"kafkaFailureFlow"})
@TestPropertySource(properties = {
        "spring.main.banner-mode=off",
        "logging.level.root=INFO",
        "logging.level.org.springframework=INFO",
        "logging.level.org.springframework.integration=DEBUG",
        "spring.kafka.producer.properties.max.block.ms=50",
        "spring.kafka.producer.bootstrap-servers=localhost:9999",
        "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
        "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
})
public class KafkaPublishFailureTest {

    private static final Logger log = LogManager.getLogger(KafkaPublishFailureTest.class);

    @Autowired
    SwiftalkKafkaGateway kafkaGateway;

    @SpyBean(name = "kafkaFailureHandler")
    MessageHandler kafkaFailureHandler;

    @Test
    @SuppressWarnings("all")
    void testFailedKafkaPublish() throws InterruptedException {

        //Dummy message
        Map<String, String> map = new HashMap<>();
        map.put("key", "value");
        // Publish Message
        Message<Map<String, String>> message = MessageBuilder.withPayload(map)
                .setHeader("X-UPSTREAM-TYPE", "alm")
                .setHeader("X-UPSTREAM-INSTANCE", "jira")
                .setHeader("X-MESSAGE-KEY", "key-1")
                .build();

        kafkaGateway.publish(message);
        verify(this.kafkaFailureHandler, timeout(500)).handleMessage(any(Message.class));
    }

}

注意 spring.kafka.producer.properties.max.block.ms=50verify(this.kafkaFailureHandler, timeout(500)).handleMessage(any(Message.class));