嵌入式kafka生产者测试

Embedded kafka producer test

我正在编写集成测试来测试 kafka 生产者。

@RunWith(SpringRunner.class)
@SpringBootTest(classes = {kafkaProducerConfig.class, KafkaProducerIT.InnerConfig.class})
@EnableConfigurationProperties(KafkaProducerInfo.class)
@ComponentScan(basePackages = "...")
public class KafkaProducerIT {

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, "testtopic");

    @Autowired
    CustomKafkaProducer<String, String> KafkaProducer;

    @Autowired
    KafkaController kafkaController;

    @Test
    public void whenSendMessage_thenConsumeIt() throws InterruptedException {
        KafkaProducer.produceMessageToKafkaTopic("ahahahwow", "testtopic");
        kafkaController.countDownLatch.await();
    }

    @Configuration
    public static class InnerConfig {

        @Bean
        public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(ConsumerFactory<String, Object> replyConsumerFactory) {
            ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(replyConsumerFactory);
            factory.setBatchListener(true);
            return factory;
        }

        @Bean
        KafkaController kafkaController() {
            return new KafkaController();
        }

    }

    public static class KafkaController {

        CountDownLatch countDownLatch = new CountDownLatch(1);

        @KafkaListener(topics = "testtopic")
        public void listen(final String payload) {
            countDownLatch.countDown();
        }
    }

}

想法是我想向主题发送消息,使用 KafkaControllerCountDownLatch 阅读它。 我遇到的问题是 CountDownLatch 从未被触发,测试只是挂在 await.

CustomKafkaProducer 只是一个包装器,它在底层使用常规 kafkaTemplate

p.s.

在debug过程中,出现过几次flow进入listener,test通过的情况。所以问题与错误的主题名称等无关

您需要为消费者设置auto.offset.reset=earliest。默认值是最新的,因此如果消费者在发送记录后启动,则会出现竞争条件。