Spring-Kafka:使用消费者 pause/resume 时发生重新平衡,根据文档不应该
Spring-Kafka: Rebalancing happening while using consumer pause/resume which should not as per documentation
Spring-Kafka:虽然 pausing/resuming 消费者根据文档使用 pause/resume 方法,但在使用自动分配但它不起作用时不应发生重新平衡,重新平衡正在发生。如何 pause/resume 消费者并在一段时间后保持轮询而不重新平衡?
用例:消费者应暂停一段时间并继续轮询以发出心跳并在时间结束后恢复,但 Kafka 不应在消费者暂停时重新平衡。
System.out.println("Consumer[" + Thread.currentThread().getName() + "] Partition [" + topicPartition + "] stopped consumption.");
consumer.pause(Collections.singleton(topicPartition));
try {
Thread.sleep(60000);
consumer.resume(Collections.singleton(topicPartition));
System.out.println("Consumer[" + Thread.currentThread().getName() + "] Partition [" + topicPartition + "] resumed consumption.");
} catch (InterruptedException e) {
e.printStackTrace();
}
日志:
2019-02-19 15:19:49.173 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=customer] (重新)加入群组
2019-02-19 15:19:49.175 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=customer] (重新)加入群组
2019-02-19 15:19:49.181 INFO 82272 --- [rTaskExecutor-3] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=customer] (重新)加入群组
2019-02-19 15:19:49.192 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=customer] 成功加入group with generation 581
2019-02-19 15:19:49.192 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=customer] 成功加入第581代组
2019-02-19 15:19:49.194 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=customer] 设置新分配的分区[ spring-kafka-topic-2, spring-kafka-topic-0, spring-kafka-topic-1]
2019-02-19 15:19:49.194 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=customer] 设置新分配的分区 [spring -kafka-topic-4, spring-kafka-topic-5, spring-kafka-topic-3]
2019-02-19 15:19:49.218 INFO 82272 --- [rTaskExecutor-2] o.s.k.l.KafkaMessageListenerContainer:分配的分区:[spring-kafka-topic-4,spring- kafka-topic-5, spring-kafka-topic-3]
2019-02-19 15:19:49.219 INFO 82272 --- [rTaskExecutor-1] o.s.k.l.KafkaMessageListenerContainer:分配的分区:[spring-kafka-topic-2,spring- kafka-topic-0, spring-kafka-topic-1]
2019-02-19 15:19:49.223 INFO 82272 --- [main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat 在端口上启动:8080 (http) with context path ''
2019-02-19 15:19:49.233 INFO 82272 --- [main] c.g.s.S.SpringKafkaSupportApplication:在 3.43 秒内启动了 SpringKafkaSupportApplication(JVM 运行 for 3.85)
消费者[customerTaskExecutor-1]收到消息[Customer(name=, phoneNumber=20)]
消费者[customerTaskExecutor-2]收到消息[Customer(name=test 6, phoneNumber=6)]
消费者 [customerTaskExecutor-1] 分区 [spring-kafka-topic-2] 停止消费。
消费者 [customerTaskExecutor-1] 分区 [spring-kafka-topic-1] 停止消费。
2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=customer] 尝试心跳失败,因为组正在重新平衡
2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=customer] 尝试心跳失败,因为组正在重新平衡
2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=customer] 撤销之前分配的分区 [spring -kafka-topic-2, spring-kafka-topic-0, spring-kafka-topic-1]
2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=customer] 撤销之前分配的分区 [spring -kafka-topic-4, spring-kafka-topic-5, spring-kafka-topic-3]
2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-1] o.s.k.l.KafkaMessageListenerContainer:分区已撤销:[spring-kafka-topic-2,spring- kafka-topic-0, spring-kafka-topic-1]
2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-2] o.s.k.l.KafkaMessageListenerContainer:分区已撤销:[spring-kafka-topic-4,spring- kafka-topic-5, spring-kafka-topic-3]
2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=customer] (重新)加入群组
2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=customer] (重新)加入群组
2019-02-19 15:19:52.209 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=customer] 成功加入582代群组
2019-02-19 15:19:52.209 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=customer] 成功加入582代群组
2019-02-19 15:19:52.209 INFO 82272 --- [rTaskExecutor-3] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=customer] 成功加入第582代组
2019-02-19 15:19:52.209 INFO 82272 --- [rTaskExecutor-3] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=customer] 设置新分配的分区 [spring -kafka-topic-4, spring-kafka-topic-5]
2019-02-19 15:19:52.210 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=customer] 设置新分配的分区 [spring -kafka-topic-0, spring-kafka-topic-1]
2019-02-19 15:19:52.210 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=customer] 设置新分配的分区 [spring -kafka-topic-2, spring-kafka-topic-3]
2019-02-19 15:19:52.211 INFO 82272 --- [rTaskExecutor-3] o.s.k.l.KafkaMessageListenerContainer:分配的分区:[spring-kafka-topic-4,spring- kafka-topic-5]
2019-02-19 15:19:52.212 INFO 82272 --- [rTaskExecutor-1] o.s.k.l.KafkaMessageListenerContainer:分配的分区:[spring-kafka-topic-0,spring- kafka-topic-1]
2019-02-19 15:19:52.212 INFO 82272 --- [rTaskExecutor-2] o.s.k.l.KafkaMessageListenerContainer:分配的分区:[spring-kafka-topic-2,spring- kafka-topic-3]
消费者[customerTaskExecutor-3] 收到消息[Customer(name=test 6, phoneNumber=6)]
阅读 Kafka 文档。
暂停消费者仅意味着后续 poll()
将 return 没有记录,直到您调用 resume()
,但您仍然必须在 [=13] 内调用 poll()
=] 以防止重新平衡。
刚刚与消费者和 Spring Kafka 有相同的 "group" 消息。 @KafkaListener 的结果与 ConcurrentMessageListenerContainer 的未注释 Spring 相同。参数调整不完全一样 straight Java.
使用 consumer.poll() 直接 Java 重写并使用 ExecutorService 启动线程 - 根据 Gary Russell 调整参数,一切正常。在重新平衡期间不再收到这些消息和丢失心跳。来自 Clouderable 和 Confluent 网站的直接 java 个示例:
http://cloudurable.com/blog/kafka-tutorial-kafka-consumer/index.html
Spring-Kafka:虽然 pausing/resuming 消费者根据文档使用 pause/resume 方法,但在使用自动分配但它不起作用时不应发生重新平衡,重新平衡正在发生。如何 pause/resume 消费者并在一段时间后保持轮询而不重新平衡?
用例:消费者应暂停一段时间并继续轮询以发出心跳并在时间结束后恢复,但 Kafka 不应在消费者暂停时重新平衡。
System.out.println("Consumer[" + Thread.currentThread().getName() + "] Partition [" + topicPartition + "] stopped consumption.");
consumer.pause(Collections.singleton(topicPartition));
try {
Thread.sleep(60000);
consumer.resume(Collections.singleton(topicPartition));
System.out.println("Consumer[" + Thread.currentThread().getName() + "] Partition [" + topicPartition + "] resumed consumption.");
} catch (InterruptedException e) {
e.printStackTrace();
}
日志: 2019-02-19 15:19:49.173 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=customer] (重新)加入群组 2019-02-19 15:19:49.175 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=customer] (重新)加入群组 2019-02-19 15:19:49.181 INFO 82272 --- [rTaskExecutor-3] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=customer] (重新)加入群组
2019-02-19 15:19:49.192 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=customer] 成功加入group with generation 581 2019-02-19 15:19:49.192 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=customer] 成功加入第581代组
2019-02-19 15:19:49.194 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=customer] 设置新分配的分区[ spring-kafka-topic-2, spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15:19:49.194 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=customer] 设置新分配的分区 [spring -kafka-topic-4, spring-kafka-topic-5, spring-kafka-topic-3] 2019-02-19 15:19:49.218 INFO 82272 --- [rTaskExecutor-2] o.s.k.l.KafkaMessageListenerContainer:分配的分区:[spring-kafka-topic-4,spring- kafka-topic-5, spring-kafka-topic-3] 2019-02-19 15:19:49.219 INFO 82272 --- [rTaskExecutor-1] o.s.k.l.KafkaMessageListenerContainer:分配的分区:[spring-kafka-topic-2,spring- kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15:19:49.223 INFO 82272 --- [main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat 在端口上启动:8080 (http) with context path '' 2019-02-19 15:19:49.233 INFO 82272 --- [main] c.g.s.S.SpringKafkaSupportApplication:在 3.43 秒内启动了 SpringKafkaSupportApplication(JVM 运行 for 3.85) 消费者[customerTaskExecutor-1]收到消息[Customer(name=, phoneNumber=20)] 消费者[customerTaskExecutor-2]收到消息[Customer(name=test 6, phoneNumber=6)] 消费者 [customerTaskExecutor-1] 分区 [spring-kafka-topic-2] 停止消费。 消费者 [customerTaskExecutor-1] 分区 [spring-kafka-topic-1] 停止消费。 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=customer] 尝试心跳失败,因为组正在重新平衡 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=customer] 尝试心跳失败,因为组正在重新平衡 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=customer] 撤销之前分配的分区 [spring -kafka-topic-2, spring-kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=customer] 撤销之前分配的分区 [spring -kafka-topic-4, spring-kafka-topic-5, spring-kafka-topic-3] 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-1] o.s.k.l.KafkaMessageListenerContainer:分区已撤销:[spring-kafka-topic-2,spring- kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-2] o.s.k.l.KafkaMessageListenerContainer:分区已撤销:[spring-kafka-topic-4,spring- kafka-topic-5, spring-kafka-topic-3] 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=customer] (重新)加入群组 2019-02-19 15:19:52.200 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=customer] (重新)加入群组 2019-02-19 15:19:52.209 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=customer] 成功加入582代群组 2019-02-19 15:19:52.209 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-3, groupId=customer] 成功加入582代群组 2019-02-19 15:19:52.209 INFO 82272 --- [rTaskExecutor-3] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-4, groupId=customer] 成功加入第582代组 2019-02-19 15:19:52.209 INFO 82272 --- [rTaskExecutor-3] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-4, groupId=customer] 设置新分配的分区 [spring -kafka-topic-4, spring-kafka-topic-5] 2019-02-19 15:19:52.210 INFO 82272 --- [rTaskExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-2, groupId=customer] 设置新分配的分区 [spring -kafka-topic-0, spring-kafka-topic-1] 2019-02-19 15:19:52.210 INFO 82272 --- [rTaskExecutor-2] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-3, groupId=customer] 设置新分配的分区 [spring -kafka-topic-2, spring-kafka-topic-3] 2019-02-19 15:19:52.211 INFO 82272 --- [rTaskExecutor-3] o.s.k.l.KafkaMessageListenerContainer:分配的分区:[spring-kafka-topic-4,spring- kafka-topic-5] 2019-02-19 15:19:52.212 INFO 82272 --- [rTaskExecutor-1] o.s.k.l.KafkaMessageListenerContainer:分配的分区:[spring-kafka-topic-0,spring- kafka-topic-1] 2019-02-19 15:19:52.212 INFO 82272 --- [rTaskExecutor-2] o.s.k.l.KafkaMessageListenerContainer:分配的分区:[spring-kafka-topic-2,spring- kafka-topic-3] 消费者[customerTaskExecutor-3] 收到消息[Customer(name=test 6, phoneNumber=6)]
阅读 Kafka 文档。
暂停消费者仅意味着后续 poll()
将 return 没有记录,直到您调用 resume()
,但您仍然必须在 [=13] 内调用 poll()
=] 以防止重新平衡。
刚刚与消费者和 Spring Kafka 有相同的 "group" 消息。 @KafkaListener 的结果与 ConcurrentMessageListenerContainer 的未注释 Spring 相同。参数调整不完全一样 straight Java.
使用 consumer.poll() 直接 Java 重写并使用 ExecutorService 启动线程 - 根据 Gary Russell 调整参数,一切正常。在重新平衡期间不再收到这些消息和丢失心跳。来自 Clouderable 和 Confluent 网站的直接 java 个示例:
http://cloudurable.com/blog/kafka-tutorial-kafka-consumer/index.html