Kafka Direct Stream是否自己创建一个Consumer组(因为它不关心应用程序中给出的group.id 属性)
Does Kafka Direct Stream create a Consumer group by itself (as it does not care about group.id property given in application)
假设我刚刚启动了一个 Kafka direct stream + spark streaming 应用程序。对于第一批,驱动程序中的 Streaming Context 连接到 Kafka 并获取 startOffset 和 endOffset。然后,它会启动一个具有这些开始和结束偏移范围的 spark 作业,供执行者从 Kafka 获取记录。我的问题从这里开始。当是第二批时,Streaming context 连接到 Kafka 以获取开始和结束偏移量范围。当没有允许存储最后提交偏移值的消费者组(因为直接流不考虑 group.id)时,Kafka 如何能够提供这些范围?
使用 Kafka 消费者 API 时,总是 一个消费者组。无论您处理的是哪种流(Spark Direct Streaming、Spark Structured Streaming、Kafka Consumer 的 Java/Scala API...)。
as Direct stream does not take into account group.id
查看 Spark + Kafka integration Guide for direct streaming(spark-streaming-kafka010 的代码示例)如何声明消费者组:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
即使您没有在配置中声明消费者组,系统仍会为您创建一个(随机的)消费者组。
检查您的日志以查看您的应用程序中使用了哪些 group.id。
假设我刚刚启动了一个 Kafka direct stream + spark streaming 应用程序。对于第一批,驱动程序中的 Streaming Context 连接到 Kafka 并获取 startOffset 和 endOffset。然后,它会启动一个具有这些开始和结束偏移范围的 spark 作业,供执行者从 Kafka 获取记录。我的问题从这里开始。当是第二批时,Streaming context 连接到 Kafka 以获取开始和结束偏移量范围。当没有允许存储最后提交偏移值的消费者组(因为直接流不考虑 group.id)时,Kafka 如何能够提供这些范围?
使用 Kafka 消费者 API 时,总是 一个消费者组。无论您处理的是哪种流(Spark Direct Streaming、Spark Structured Streaming、Kafka Consumer 的 Java/Scala API...)。
as Direct stream does not take into account group.id
查看 Spark + Kafka integration Guide for direct streaming(spark-streaming-kafka010 的代码示例)如何声明消费者组:
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
即使您没有在配置中声明消费者组,系统仍会为您创建一个(随机的)消费者组。
检查您的日志以查看您的应用程序中使用了哪些 group.id。