Spring 启动 BatchAcknowledgingMessageListener<String, String> 在逗号上拆分消息

Spring Boot BatchAcknowledgingMessageListener<String, String> Splitting Message on Commas

我有一个 Spring 启动应用程序,它带有一个实现 BatchAcknowledgingMessageListener 接口的 Kafka Listener。当我从主题收到应该是一条消息时,它实际上是原始消息中每一行的一条消息,我无法将该消息转换为 ConsumerRecord

生成记录的代码如下所示:

this.kafkaTemplate.send("myTopic", "12345", "{\"OrderID\": \"12345\"}, \"OrderDate\": \"2021-06-01T12:13:16Z\"");

Kafka 配置如下所示(这仍处于使用 Testcontainers 的集成测试阶段,因此生产者正在生产消费者正在收听的同一主题):

spring:
  kafka:
    listener:
      ack-mode: manual-immediate
      concurrency: 1
    consumer:
      bootstrap-servers: localhost:9093
      enable-auto-commit: false
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 10
      topic: myTopic
    producer:
      bootstrap-servers: localhost:9093
      client-id: my-client
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      topic: myTopic

最后,消费者逻辑:

@KafkaListener(topics = "${spring.kafka.consumer.topic}")
public void consumeMessages(final List<ConsumerRecord<String, String>> records, final Acknowledgment ack) throws IOException {
  // This line fails with ClassCastException: "Can't cast String to ConsumerRecord"
  // for (final ConsumerRecord<String, String> record : records) {

  for (final Object record : records) {
    log.debug("Record: {}", record);
  }
  ...
}

这个例子的调试输出是:

[LOG HEADER]: Record: {"OrderID": "12345"
[LOG HEADER]: Record: "OrderDate": "2021-06-01T12:13:16Z"}

如您所见,消息以逗号分隔,并且我收到了针对所生成的单个消息的多条消息。这显然是失败的,但我不明白为什么我不只是得到单个 ConsumerRecord 对象。

您缺少侦听器类型配置,因此默认转换服务发现您需要一个列表并用逗号分隔字符串。

spring:
  kafka:
    listener:
      ack-mode: manual-immediate
      concurrency: 1
      type: batch
    consumer:
...

添加 type: batch 告诉框架您想要整批记录。

原来问题很简单。我缺少 spring.kafka.listener.type 配置参数。在这里找到答案: