在 Consumer 之间均匀分布 Kafka 分区
Uniformly distribute the Kafka partitions among the Consumer
我有一个包含 300 个分区的主题,并且有 100 个 Consumers/Machines。我正在使用 Spring Kafka 作为我的底层框架来实现 Kafka 消费者。
我使用的是ConcurrentKafkaListenerContainerFactory,并发数设置为3,所以理论上我应该有300个Consumer Container,一个partition连接一个container,这样partitions会均匀分布在100台机器上。
For the first constructor, kafka will distribute the partitions across the consumers. For the second constructor, the ConcurrentMessageListenerContainer distributes the TopicPartition s across the delegate KafkaMessageListenerContainer s.
If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.
但是我没有看到上面的行为,我看到一些 Containers/Machines 是空闲的,而其他的连接到 6 个分区,这导致 Kafka Topic 中的延迟。
我是不是做错了什么,如何确保分区在容器之间均匀映射并且没有容器映射到一个以上的分区?请帮忙。
key.deserializer : StringDeserializer
value.deserializer : [CUSTOM DESERIALIZER]
enable.auto.commit : false
max.poll.records : 5
group.id : [MY GROUP]
partition.assignment.strategy : StickyAssignor
max.partition.fetch.bytes : 1048576
bootstrap.servers : [SERVERS]
auto.commit.interval.ms : 3000
auto.offset.reset : latest
factory.setConcurrency(3);
@KafkaListener(topics = "#{kafkaTopicConfig.getStoreSupply()}", containerFactory = EI_LISTNER_FACTORY)
EI_LISTNER_FACTORY 是一个 Bean..
@Bean(EI_LISTNER_FACTORY)
public ConcurrentKafkaListenerContainerFactory<String, AggQuantityByPrimeValue> eiKafkaListenerContainerFactory() {
Boolean eiCnsumerStartup = [START_UP From Configuration]
Integer concurrentThreadCount = 3;
Map<String, Object> config = [properties from ABOVE]
ConcurrentKafkaListenerContainerFactory<String, AggQuantityByPrimeValue> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
factory.setAutoStartup(eiConsumerStartup);
if (config.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).equals("false")) {
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(concurrentThreadCount);
}
return factory;
}
配置看起来不错。可能你描述消费群体的时候,已经没几个消费者了unreachable/idle。因此,重新平衡会导致将相同的服务器容器线程分配给多个分区。
如果不是这种情况,启用 kafka 级别的日志来监控分区分配和撤销的日志,以检查重新平衡是否触发了预期的结果。
我有一个包含 300 个分区的主题,并且有 100 个 Consumers/Machines。我正在使用 Spring Kafka 作为我的底层框架来实现 Kafka 消费者。
我使用的是ConcurrentKafkaListenerContainerFactory,并发数设置为3,所以理论上我应该有300个Consumer Container,一个partition连接一个container,这样partitions会均匀分布在100台机器上。
For the first constructor, kafka will distribute the partitions across the consumers. For the second constructor, the ConcurrentMessageListenerContainer distributes the TopicPartition s across the delegate KafkaMessageListenerContainer s.
If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.
但是我没有看到上面的行为,我看到一些 Containers/Machines 是空闲的,而其他的连接到 6 个分区,这导致 Kafka Topic 中的延迟。
我是不是做错了什么,如何确保分区在容器之间均匀映射并且没有容器映射到一个以上的分区?请帮忙。
key.deserializer : StringDeserializer
value.deserializer : [CUSTOM DESERIALIZER]
enable.auto.commit : false
max.poll.records : 5
group.id : [MY GROUP]
partition.assignment.strategy : StickyAssignor
max.partition.fetch.bytes : 1048576
bootstrap.servers : [SERVERS]
auto.commit.interval.ms : 3000
auto.offset.reset : latest
factory.setConcurrency(3);
@KafkaListener(topics = "#{kafkaTopicConfig.getStoreSupply()}", containerFactory = EI_LISTNER_FACTORY)
EI_LISTNER_FACTORY 是一个 Bean..
@Bean(EI_LISTNER_FACTORY)
public ConcurrentKafkaListenerContainerFactory<String, AggQuantityByPrimeValue> eiKafkaListenerContainerFactory() {
Boolean eiCnsumerStartup = [START_UP From Configuration]
Integer concurrentThreadCount = 3;
Map<String, Object> config = [properties from ABOVE]
ConcurrentKafkaListenerContainerFactory<String, AggQuantityByPrimeValue> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
factory.setAutoStartup(eiConsumerStartup);
if (config.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).equals("false")) {
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(concurrentThreadCount);
}
return factory;
}
配置看起来不错。可能你描述消费群体的时候,已经没几个消费者了unreachable/idle。因此,重新平衡会导致将相同的服务器容器线程分配给多个分区。
如果不是这种情况,启用 kafka 级别的日志来监控分区分配和撤销的日志,以检查重新平衡是否触发了预期的结果。