Spring 使用 MANUAL_IMMEDIATE ack 启动 Kafka
Spring Boot Kafka with MANUAL_IMMEDIATE ack
我有一个 Springboot 应用程序。使用此配置:
kafka:
consumer:
listener:
ack-mode: MANUAL_IMMEDIATE
广告此消费者:
@KafkaListener(topics = "test", groupId = "group_id")
public void consume(String message, Acknowledgment ack) throws IOException {
ack.acknowledge();
}
但是我在收到消息时出现这个错误:
org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage
您的 YAML 似乎没有被读取;我刚测试过
更正
您的 YAML 无效 - 删除 consumer
元素。
spring:
kafka:
consumer:
auto-offset-reset: earliest
listener:
ack-mode: MANUAL_IMMEDIATE
而且效果很好;删除 ack-mode
给我的错误与您看到的相同。
@SpringBootApplication
public class So60929385Application {
public static void main(String[] args) {
SpringApplication.run(So60929385Application.class, args);
}
@KafkaListener(id = "so60929385", topics = "so60929385")
public void listen(String in, Acknowledgment ack) {
System.out.println(in);
ack.acknowledge();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("so60929385", "foo");
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so60929385").partitions(1).replicas(1).build();
}
}
我有一个 Springboot 应用程序。使用此配置:
kafka:
consumer:
listener:
ack-mode: MANUAL_IMMEDIATE
广告此消费者:
@KafkaListener(topics = "test", groupId = "group_id")
public void consume(String message, Acknowledgment ack) throws IOException {
ack.acknowledge();
}
但是我在收到消息时出现这个错误:
org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage
您的 YAML 似乎没有被读取;我刚测试过
更正
您的 YAML 无效 - 删除 consumer
元素。
spring:
kafka:
consumer:
auto-offset-reset: earliest
listener:
ack-mode: MANUAL_IMMEDIATE
而且效果很好;删除 ack-mode
给我的错误与您看到的相同。
@SpringBootApplication
public class So60929385Application {
public static void main(String[] args) {
SpringApplication.run(So60929385Application.class, args);
}
@KafkaListener(id = "so60929385", topics = "so60929385")
public void listen(String in, Acknowledgment ack) {
System.out.println(in);
ack.acknowledge();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("so60929385", "foo");
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so60929385").partitions(1).replicas(1).build();
}
}