独立消费者抛出 InvalidGroupIdException
Standalone Consumer throwing InvalidGroupIdException
这是一个由两部分组成的问题。
我。代码是:
import java.time.Duration
import java.util
import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import scala.collection.JavaConverters._
import io.StdIn._
object StandaloneConsumer {
private final var topics = ""
private final val BOOTSTRAPSERVERS = "hostname:9092"
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAPSERVERS)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
val consumer = new KafkaConsumer[String, String](props)
topics = readLine("Enter the topic name: ")
try {
val partitionInfos = consumer.partitionsFor(topics)
val topicPartitions = new util.ArrayList[TopicPartition]
if(partitionInfos != null) {
for (partitionInfo <- partitionInfos.asScala) {
topicPartitions.add(new TopicPartition(partitionInfo.topic, partitionInfo.partition))
}
consumer.assign(topicPartitions)
while (true) {
val records = consumer.poll(Duration.ofMillis(500))
for(record <- records.asScala) {
println(s"key = ${record.key}, value = ${record.value()}")
}
consumer.commitAsync()
}
}
} catch {
case e:Exception => e.printStackTrace()
} finally {
consumer.commitSync()
consumer.close()
}
}
}
当我运行这个它正在正确地消耗所有数据但最后它给出:
org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
Exception in thread "main" org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
这个错误。 为什么?
二.我不明白这部分代码。这是怎么回事?:
val topicPartitions = new util.ArrayList[TopicPartition]
if(partitionInfos != null) {
for (partitionInfo <- partitionInfos.asScala) {
topicPartitions.add(new TopicPartition(partitionInfo.topic, partitionInfo.partition))
}
谁能给我解释一下。
谢谢。
在 Kafka 中,提交只有在您是消费者组的一部分时才有意义(然后让 Kafka 集群协调器管理分区分配和偏移 fetching/committing 操作)。
关于您的代码,您似乎在进行手动分区分配,但您仍在尝试提交:
consumer.commitAsync()
关于消费者组的一些文档:
针对你的问题二
此代码正在获取属于您的主题的不同分区,然后您将这些分区分配给您的消费者。
您应该阅读更多有关 Kafka 消费者群体的信息,并确定是否要使用它,或者自行处理分区分配。
我强烈建议使用它(出于可扩展性目的)除非你真的想跳过它。
这是一个由两部分组成的问题。
我。代码是:
import java.time.Duration
import java.util
import java.util.Properties
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import scala.collection.JavaConverters._
import io.StdIn._
object StandaloneConsumer {
private final var topics = ""
private final val BOOTSTRAPSERVERS = "hostname:9092"
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAPSERVERS)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
val consumer = new KafkaConsumer[String, String](props)
topics = readLine("Enter the topic name: ")
try {
val partitionInfos = consumer.partitionsFor(topics)
val topicPartitions = new util.ArrayList[TopicPartition]
if(partitionInfos != null) {
for (partitionInfo <- partitionInfos.asScala) {
topicPartitions.add(new TopicPartition(partitionInfo.topic, partitionInfo.partition))
}
consumer.assign(topicPartitions)
while (true) {
val records = consumer.poll(Duration.ofMillis(500))
for(record <- records.asScala) {
println(s"key = ${record.key}, value = ${record.value()}")
}
consumer.commitAsync()
}
}
} catch {
case e:Exception => e.printStackTrace()
} finally {
consumer.commitSync()
consumer.close()
}
}
}
当我运行这个它正在正确地消耗所有数据但最后它给出:
org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
Exception in thread "main" org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
这个错误。 为什么?
二.我不明白这部分代码。这是怎么回事?:
val topicPartitions = new util.ArrayList[TopicPartition]
if(partitionInfos != null) {
for (partitionInfo <- partitionInfos.asScala) {
topicPartitions.add(new TopicPartition(partitionInfo.topic, partitionInfo.partition))
}
谁能给我解释一下。 谢谢。
在 Kafka 中,提交只有在您是消费者组的一部分时才有意义(然后让 Kafka 集群协调器管理分区分配和偏移 fetching/committing 操作)。 关于您的代码,您似乎在进行手动分区分配,但您仍在尝试提交:
consumer.commitAsync()
关于消费者组的一些文档:
针对你的问题二
此代码正在获取属于您的主题的不同分区,然后您将这些分区分配给您的消费者。
您应该阅读更多有关 Kafka 消费者群体的信息,并确定是否要使用它,或者自行处理分区分配。 我强烈建议使用它(出于可扩展性目的)除非你真的想跳过它。