Kafka 消息被重新处理

Kafka messages are reprocessed

我们有一个微服务,它使用 spring-boot 和 spring-cloud-stream 从 Kafka 生成和使用消息。
版本:
spring-引导:1.5.8.RELEASE
spring-云流:Ditmars.RELEASE
卡夫卡服务器:kafka_2.11-1.0.0

编辑: 我们在 Kubernetes 环境中工作,使用 3 个 Kafka 节点的 StatefulSets 集群和 3 个 Zookeeper 节点的集群。

我们遇到过几次旧消息被重新处理的情况,而这些消息几天前就已经处理过了。
几个注意事项:

  1. 在此之前打印了以下日志(还有更多类似的行,这只是一个摘要)

Revoking previously assigned partitions [] for group enrollment-service
Discovered coordinator dev-kafka-1.kube1.iaas.watercorp.com:9092 (id: 2147483646 rack: null)
Successfully joined group enrollment-service with generation 320

  1. 上述分区的撤销和重新分配事件每隔几个小时就会发生一次。在这些事件中,只有少数事件会重新使用旧消息。在大多数情况下,重新分配不会触发消息消费。
  2. 消息来自不同的分区。
  3. 每个 分区 有超过 1 条消息正在重新处理。

application.yml:

spring:
  cloud:
      stream:
        kafka:
          binder:
            brokers: kafka
            defaultBrokerPort: 9092
            zkNodes: zookeeper
            defaultZkPort: 2181
            minPartitionCount: 2
            replicationFactor: 1
            autoCreateTopics: true
            autoAddPartitions: true
            headers: type,message_id
            requiredAcks: 1
            configuration:
              "[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol
          bindings:
            user-enrollment-input:
              consumer:
                autoRebalanceEnabled: true
                autoCommitOnError: true
                enableDlq: true
            user-input:
              consumer:
                autoRebalanceEnabled: true
                autoCommitOnError: true
                enableDlq: true
            enrollment-mail-output:
              producer:
                sync: true
                configuration:
                  retries: 10000
            enroll-users-output:
              producer:
                sync: true
                configuration:
                  retries: 10000
        default:
          binder: kafka
          contentType: application/json
          group: enrollment-service
          consumer:
            maxAttempts: 1
          producer:
            partitionKeyExtractorClass: com.watercorp.messaging.PartitionKeyExtractor
        bindings:
          user-enrollment-input:
            destination: enroll-users
            consumer:
              concurrency: 10
              partitioned: true
          user-input:
            destination: user
            consumer:
              concurrency: 5
              partitioned: true
          enrollment-mail-output:
            destination: send-enrollment-mail
            producer:
              partitionCount: 10
          enroll-users-output:
            destination: enroll-users
            producer:
              partitionCount: 10

有没有我可能遗漏的配置?什么会导致这种行为?

因此,实际问题是以下问题单中描述的问题:https://issues.apache.org/jira/browse/KAFKA-3806。 使用建议的解决方法修复了它。