Spring 启动 BatchAcknowledgingMessageListener<String, String> 在逗号上拆分消息
Spring Boot BatchAcknowledgingMessageListener<String, String> Splitting Message on Commas
我有一个 Spring 启动应用程序,它带有一个实现 BatchAcknowledgingMessageListener 接口的 Kafka Listener。当我从主题收到应该是一条消息时,它实际上是原始消息中每一行的一条消息,我无法将该消息转换为 ConsumerRecord。
生成记录的代码如下所示:
this.kafkaTemplate.send("myTopic", "12345", "{\"OrderID\": \"12345\"}, \"OrderDate\": \"2021-06-01T12:13:16Z\"");
Kafka 配置如下所示(这仍处于使用 Testcontainers 的集成测试阶段,因此生产者正在生产消费者正在收听的同一主题):
spring:
kafka:
listener:
ack-mode: manual-immediate
concurrency: 1
consumer:
bootstrap-servers: localhost:9093
enable-auto-commit: false
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 10
topic: myTopic
producer:
bootstrap-servers: localhost:9093
client-id: my-client
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
topic: myTopic
最后,消费者逻辑:
@KafkaListener(topics = "${spring.kafka.consumer.topic}")
public void consumeMessages(final List<ConsumerRecord<String, String>> records, final Acknowledgment ack) throws IOException {
// This line fails with ClassCastException: "Can't cast String to ConsumerRecord"
// for (final ConsumerRecord<String, String> record : records) {
for (final Object record : records) {
log.debug("Record: {}", record);
}
...
}
这个例子的调试输出是:
[LOG HEADER]: Record: {"OrderID": "12345"
[LOG HEADER]: Record: "OrderDate": "2021-06-01T12:13:16Z"}
如您所见,消息以逗号分隔,并且我收到了针对所生成的单个消息的多条消息。这显然是失败的,但我不明白为什么我不只是得到单个 ConsumerRecord 对象。
您缺少侦听器类型配置,因此默认转换服务发现您需要一个列表并用逗号分隔字符串。
spring:
kafka:
listener:
ack-mode: manual-immediate
concurrency: 1
type: batch
consumer:
...
添加 type: batch
告诉框架您想要整批记录。
原来问题很简单。我缺少 spring.kafka.listener.type 配置参数。在这里找到答案:
我有一个 Spring 启动应用程序,它带有一个实现 BatchAcknowledgingMessageListener
生成记录的代码如下所示:
this.kafkaTemplate.send("myTopic", "12345", "{\"OrderID\": \"12345\"}, \"OrderDate\": \"2021-06-01T12:13:16Z\"");
Kafka 配置如下所示(这仍处于使用 Testcontainers 的集成测试阶段,因此生产者正在生产消费者正在收听的同一主题):
spring:
kafka:
listener:
ack-mode: manual-immediate
concurrency: 1
consumer:
bootstrap-servers: localhost:9093
enable-auto-commit: false
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 10
topic: myTopic
producer:
bootstrap-servers: localhost:9093
client-id: my-client
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
topic: myTopic
最后,消费者逻辑:
@KafkaListener(topics = "${spring.kafka.consumer.topic}")
public void consumeMessages(final List<ConsumerRecord<String, String>> records, final Acknowledgment ack) throws IOException {
// This line fails with ClassCastException: "Can't cast String to ConsumerRecord"
// for (final ConsumerRecord<String, String> record : records) {
for (final Object record : records) {
log.debug("Record: {}", record);
}
...
}
这个例子的调试输出是:
[LOG HEADER]: Record: {"OrderID": "12345"
[LOG HEADER]: Record: "OrderDate": "2021-06-01T12:13:16Z"}
如您所见,消息以逗号分隔,并且我收到了针对所生成的单个消息的多条消息。这显然是失败的,但我不明白为什么我不只是得到单个 ConsumerRecord
您缺少侦听器类型配置,因此默认转换服务发现您需要一个列表并用逗号分隔字符串。
spring:
kafka:
listener:
ack-mode: manual-immediate
concurrency: 1
type: batch
consumer:
...
添加 type: batch
告诉框架您想要整批记录。
原来问题很简单。我缺少 spring.kafka.listener.type 配置参数。在这里找到答案: