嵌入式kafka生产者测试
Embedded kafka producer test
我正在编写集成测试来测试 kafka 生产者。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {kafkaProducerConfig.class, KafkaProducerIT.InnerConfig.class})
@EnableConfigurationProperties(KafkaProducerInfo.class)
@ComponentScan(basePackages = "...")
public class KafkaProducerIT {
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, "testtopic");
@Autowired
CustomKafkaProducer<String, String> KafkaProducer;
@Autowired
KafkaController kafkaController;
@Test
public void whenSendMessage_thenConsumeIt() throws InterruptedException {
KafkaProducer.produceMessageToKafkaTopic("ahahahwow", "testtopic");
kafkaController.countDownLatch.await();
}
@Configuration
public static class InnerConfig {
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(ConsumerFactory<String, Object> replyConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(replyConsumerFactory);
factory.setBatchListener(true);
return factory;
}
@Bean
KafkaController kafkaController() {
return new KafkaController();
}
}
public static class KafkaController {
CountDownLatch countDownLatch = new CountDownLatch(1);
@KafkaListener(topics = "testtopic")
public void listen(final String payload) {
countDownLatch.countDown();
}
}
}
想法是我想向主题发送消息,使用 KafkaController
和 CountDownLatch
阅读它。
我遇到的问题是 CountDownLatch
从未被触发,测试只是挂在 await
.
CustomKafkaProducer
只是一个包装器,它在底层使用常规 kafkaTemplate
。
p.s.
在debug过程中,出现过几次flow进入listener,test通过的情况。所以问题与错误的主题名称等无关
您需要为消费者设置auto.offset.reset=earliest。默认值是最新的,因此如果消费者在发送记录后启动,则会出现竞争条件。
我正在编写集成测试来测试 kafka 生产者。
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {kafkaProducerConfig.class, KafkaProducerIT.InnerConfig.class})
@EnableConfigurationProperties(KafkaProducerInfo.class)
@ComponentScan(basePackages = "...")
public class KafkaProducerIT {
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, "testtopic");
@Autowired
CustomKafkaProducer<String, String> KafkaProducer;
@Autowired
KafkaController kafkaController;
@Test
public void whenSendMessage_thenConsumeIt() throws InterruptedException {
KafkaProducer.produceMessageToKafkaTopic("ahahahwow", "testtopic");
kafkaController.countDownLatch.await();
}
@Configuration
public static class InnerConfig {
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(ConsumerFactory<String, Object> replyConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(replyConsumerFactory);
factory.setBatchListener(true);
return factory;
}
@Bean
KafkaController kafkaController() {
return new KafkaController();
}
}
public static class KafkaController {
CountDownLatch countDownLatch = new CountDownLatch(1);
@KafkaListener(topics = "testtopic")
public void listen(final String payload) {
countDownLatch.countDown();
}
}
}
想法是我想向主题发送消息,使用 KafkaController
和 CountDownLatch
阅读它。
我遇到的问题是 CountDownLatch
从未被触发,测试只是挂在 await
.
CustomKafkaProducer
只是一个包装器,它在底层使用常规 kafkaTemplate
。
p.s.
在debug过程中,出现过几次flow进入listener,test通过的情况。所以问题与错误的主题名称等无关
您需要为消费者设置auto.offset.reset=earliest。默认值是最新的,因此如果消费者在发送记录后启动,则会出现竞争条件。