Kafka CommitFailedException when 运行 2 consumers separately with different topics
Kafka CommitFailedException when running 2 consumers separately with different topics
我正在尝试 运行 2 个消费者订阅了 2 个不同的主题。当 运行 一次 运行 一个时,两个消费者程序 运行 都是正确的,但是当 运行 同时 运行 他们时,一个消费者总是显示异常:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
我按照建议将 max.pool.size
设置为 2,将 session.timeout.ms
设置为 30000,将 heartbeat.interval.ms
设置为 1000
下面是我的消费者函数,这个函数对于两个文件都是相同的,只是主题名称更改为 Test2
,我 运行 在两个不同的 [=26] 中使用这两个函数=] 运行同时使用两者。
public void consume()
{
//Kafka consumer configuration settings
List<String> topicNames = new ArrayList<String>();
topicNames.add("Test1");
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "30000");
props.put("heartbeat.interval.ms", "1000");
props.put("max.poll.records", "2");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(topicNames);
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Record: "+record.value());
String responseString = "successfull";
if (responseString.equals("successfull")) {
consumer.commitSync();
}
}
}
}
catch (Exception e) {
LOG.error("Exception: ", e);
}
finally {
consumer.close();
}
}
由于此错误,Kafka
主题中的记录未提交。
我该如何克服这个错误?
在您的情况下,您需要为消费者分配不同的组 ID。您正在创建两个具有相同组 ID 的消费者(这没问题),但是调用订阅两次是不行的。
您一次可以 运行 一个消费者,因为您只调用一次订阅。
如果您需要任何进一步的帮助,请告诉我。乐于助人。
我正在尝试 运行 2 个消费者订阅了 2 个不同的主题。当 运行 一次 运行 一个时,两个消费者程序 运行 都是正确的,但是当 运行 同时 运行 他们时,一个消费者总是显示异常:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
我按照建议将 max.pool.size
设置为 2,将 session.timeout.ms
设置为 30000,将 heartbeat.interval.ms
设置为 1000
下面是我的消费者函数,这个函数对于两个文件都是相同的,只是主题名称更改为 Test2
,我 运行 在两个不同的 [=26] 中使用这两个函数=] 运行同时使用两者。
public void consume()
{
//Kafka consumer configuration settings
List<String> topicNames = new ArrayList<String>();
topicNames.add("Test1");
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "30000");
props.put("heartbeat.interval.ms", "1000");
props.put("max.poll.records", "2");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(topicNames);
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Record: "+record.value());
String responseString = "successfull";
if (responseString.equals("successfull")) {
consumer.commitSync();
}
}
}
}
catch (Exception e) {
LOG.error("Exception: ", e);
}
finally {
consumer.close();
}
}
由于此错误,Kafka
主题中的记录未提交。
我该如何克服这个错误?
在您的情况下,您需要为消费者分配不同的组 ID。您正在创建两个具有相同组 ID 的消费者(这没问题),但是调用订阅两次是不行的。
您一次可以 运行 一个消费者,因为您只调用一次订阅。
如果您需要任何进一步的帮助,请告诉我。乐于助人。