并非所有 kafka 消费者都被分配到分区
Not all kafka consumers are getting assigned to partitions
我有 10 个消费者和 10 个分区。
我取分区数
int partitionCount = getPartitionCount(kafkaUrl);
我用相同的 group.id.
创建了相同数量的消费者
public void listen() {
try {
String kafkaUrl = getKafkaUrl();
int partitionCount = getPartitionCount(kafkaUrl);
Stream.iterate(0, i -> i + 1)
.limit(partitionCount)
.forEach(index -> executorService.execute(() ->
consumerTask.invokeKafkaConsumerTask(prepareConsumerConfig(index, kafkaUrl), INPUT_TOPIC)));
} catch (Exception exception) {
logger.error("Cannot receive event from kafka ", exception);
}
public void invokeKafkaConsumerTask(Properties properties, String topicName) {
try(KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList(topicName));
logger.info("[KAFKA] consumer created");
invokeKafkaConsumer(consumer);
} catch (IllegalArgumentException exception) {
logger.error("Cannot create kafka consumer ", exception);
}
}
private void invokeKafkaConsumer(KafkaConsumer<String, String> consumer) {
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(4));
if (consumerRecords.count() > 0) {
consumeRecords(consumerRecords);
consumer.commitSync();
}
}
} catch (Exception e) {
logger.error("Error while receiving records ", e);
}
}
方法getPartitionCount
return 10 个分区所以它工作正常
配置看起来像这样
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_CLIENT_ID);
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_CLIENT_ID + index);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "300000");
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(Integer.MAX_VALUE));
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
将消费者分配给分区后我看到的内容
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CLIENT-ID
topicName 1 89391 89391 0 consumer0
topicName 3 88777 88777 0 consumer1
topicName 5 89280 89280 0 consumer2
topicName 4 88776 88776 0 consumer2
topicName 0 4670991 4670991 0 consumer0
topicName 9 23307 89343 66036 consumer4
topicName 7 89610 89610 0 consumer3
topicName 8 88167 88167 0 consumer4
topicName 2 89138 89138 0 consumer1
topicName 6 88967 88967 0 consumer3
只有一半的消费者被分配到分区
为什么会这样?
根据文档,每个分区应该有一个消费者。
我做错了什么吗?
kafka 版本 2.1.1.
我也发现了一些这样的日志 ->
Setting newly assigned partitions:[empty]
您要订阅主题名称或 java 模式的集合吗?
如果您要订阅 Pattern ,请将 partition.assignment.strategy
更改为 RoundRobinAssignor
或 StickyAssignor
.
[解决方案] 有趣的案例我更改了 group.id 和 partition.assignment.strategy,添加了 auto.offset.reset=earliest 并且它看起来有效...
我有 10 个消费者和 10 个分区。 我取分区数
int partitionCount = getPartitionCount(kafkaUrl);
我用相同的 group.id.
创建了相同数量的消费者 public void listen() {
try {
String kafkaUrl = getKafkaUrl();
int partitionCount = getPartitionCount(kafkaUrl);
Stream.iterate(0, i -> i + 1)
.limit(partitionCount)
.forEach(index -> executorService.execute(() ->
consumerTask.invokeKafkaConsumerTask(prepareConsumerConfig(index, kafkaUrl), INPUT_TOPIC)));
} catch (Exception exception) {
logger.error("Cannot receive event from kafka ", exception);
}
public void invokeKafkaConsumerTask(Properties properties, String topicName) {
try(KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList(topicName));
logger.info("[KAFKA] consumer created");
invokeKafkaConsumer(consumer);
} catch (IllegalArgumentException exception) {
logger.error("Cannot create kafka consumer ", exception);
}
}
private void invokeKafkaConsumer(KafkaConsumer<String, String> consumer) {
try {
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(4));
if (consumerRecords.count() > 0) {
consumeRecords(consumerRecords);
consumer.commitSync();
}
}
} catch (Exception e) {
logger.error("Error while receiving records ", e);
}
}
方法getPartitionCount return 10 个分区所以它工作正常
配置看起来像这样
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_CLIENT_ID);
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, CONSUMER_CLIENT_ID + index);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "300000");
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(Integer.MAX_VALUE));
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
将消费者分配给分区后我看到的内容
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CLIENT-ID
topicName 1 89391 89391 0 consumer0
topicName 3 88777 88777 0 consumer1
topicName 5 89280 89280 0 consumer2
topicName 4 88776 88776 0 consumer2
topicName 0 4670991 4670991 0 consumer0
topicName 9 23307 89343 66036 consumer4
topicName 7 89610 89610 0 consumer3
topicName 8 88167 88167 0 consumer4
topicName 2 89138 89138 0 consumer1
topicName 6 88967 88967 0 consumer3
只有一半的消费者被分配到分区
为什么会这样?
根据文档,每个分区应该有一个消费者。
我做错了什么吗?
kafka 版本 2.1.1.
我也发现了一些这样的日志 ->
Setting newly assigned partitions:[empty]
您要订阅主题名称或 java 模式的集合吗?
如果您要订阅 Pattern ,请将 partition.assignment.strategy
更改为 RoundRobinAssignor
或 StickyAssignor
.
[解决方案] 有趣的案例我更改了 group.id 和 partition.assignment.strategy,添加了 auto.offset.reset=earliest 并且它看起来有效...