Kafka 消息被重新处理
Kafka messages are reprocessed
我们有一个微服务,它使用 spring-boot 和 spring-cloud-stream 从 Kafka 生成和使用消息。
版本:
spring-引导:1.5.8.RELEASE
spring-云流:Ditmars.RELEASE
卡夫卡服务器:kafka_2.11-1.0.0
编辑:
我们在 Kubernetes 环境中工作,使用 3 个 Kafka 节点的 StatefulSets 集群和 3 个 Zookeeper 节点的集群。
我们遇到过几次旧消息被重新处理的情况,而这些消息几天前就已经处理过了。
几个注意事项:
- 在此之前打印了以下日志(还有更多类似的行,这只是一个摘要)
Revoking previously assigned partitions [] for group enrollment-service
Discovered coordinator dev-kafka-1.kube1.iaas.watercorp.com:9092 (id: 2147483646 rack: null)
Successfully joined group enrollment-service with generation 320
- 上述分区的撤销和重新分配事件每隔几个小时就会发生一次。在这些事件中,只有少数事件会重新使用旧消息。在大多数情况下,重新分配不会触发消息消费。
- 消息来自不同的分区。
- 每个 分区 有超过 1 条消息正在重新处理。
application.yml:
spring:
cloud:
stream:
kafka:
binder:
brokers: kafka
defaultBrokerPort: 9092
zkNodes: zookeeper
defaultZkPort: 2181
minPartitionCount: 2
replicationFactor: 1
autoCreateTopics: true
autoAddPartitions: true
headers: type,message_id
requiredAcks: 1
configuration:
"[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol
bindings:
user-enrollment-input:
consumer:
autoRebalanceEnabled: true
autoCommitOnError: true
enableDlq: true
user-input:
consumer:
autoRebalanceEnabled: true
autoCommitOnError: true
enableDlq: true
enrollment-mail-output:
producer:
sync: true
configuration:
retries: 10000
enroll-users-output:
producer:
sync: true
configuration:
retries: 10000
default:
binder: kafka
contentType: application/json
group: enrollment-service
consumer:
maxAttempts: 1
producer:
partitionKeyExtractorClass: com.watercorp.messaging.PartitionKeyExtractor
bindings:
user-enrollment-input:
destination: enroll-users
consumer:
concurrency: 10
partitioned: true
user-input:
destination: user
consumer:
concurrency: 5
partitioned: true
enrollment-mail-output:
destination: send-enrollment-mail
producer:
partitionCount: 10
enroll-users-output:
destination: enroll-users
producer:
partitionCount: 10
有没有我可能遗漏的配置?什么会导致这种行为?
因此,实际问题是以下问题单中描述的问题:https://issues.apache.org/jira/browse/KAFKA-3806。
使用建议的解决方法修复了它。
我们有一个微服务,它使用 spring-boot 和 spring-cloud-stream 从 Kafka 生成和使用消息。
版本:
spring-引导:1.5.8.RELEASE
spring-云流:Ditmars.RELEASE
卡夫卡服务器:kafka_2.11-1.0.0
编辑: 我们在 Kubernetes 环境中工作,使用 3 个 Kafka 节点的 StatefulSets 集群和 3 个 Zookeeper 节点的集群。
我们遇到过几次旧消息被重新处理的情况,而这些消息几天前就已经处理过了。
几个注意事项:
- 在此之前打印了以下日志(还有更多类似的行,这只是一个摘要)
Revoking previously assigned partitions [] for group enrollment-service
Discovered coordinator dev-kafka-1.kube1.iaas.watercorp.com:9092 (id: 2147483646 rack: null)
Successfully joined group enrollment-service with generation 320
- 上述分区的撤销和重新分配事件每隔几个小时就会发生一次。在这些事件中,只有少数事件会重新使用旧消息。在大多数情况下,重新分配不会触发消息消费。
- 消息来自不同的分区。
- 每个 分区 有超过 1 条消息正在重新处理。
application.yml:
spring: cloud: stream: kafka: binder: brokers: kafka defaultBrokerPort: 9092 zkNodes: zookeeper defaultZkPort: 2181 minPartitionCount: 2 replicationFactor: 1 autoCreateTopics: true autoAddPartitions: true headers: type,message_id requiredAcks: 1 configuration: "[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol bindings: user-enrollment-input: consumer: autoRebalanceEnabled: true autoCommitOnError: true enableDlq: true user-input: consumer: autoRebalanceEnabled: true autoCommitOnError: true enableDlq: true enrollment-mail-output: producer: sync: true configuration: retries: 10000 enroll-users-output: producer: sync: true configuration: retries: 10000 default: binder: kafka contentType: application/json group: enrollment-service consumer: maxAttempts: 1 producer: partitionKeyExtractorClass: com.watercorp.messaging.PartitionKeyExtractor bindings: user-enrollment-input: destination: enroll-users consumer: concurrency: 10 partitioned: true user-input: destination: user consumer: concurrency: 5 partitioned: true enrollment-mail-output: destination: send-enrollment-mail producer: partitionCount: 10 enroll-users-output: destination: enroll-users producer: partitionCount: 10
有没有我可能遗漏的配置?什么会导致这种行为?
因此,实际问题是以下问题单中描述的问题:https://issues.apache.org/jira/browse/KAFKA-3806。 使用建议的解决方法修复了它。