来自 Kafka 的 Spark DStream 总是从头开始

Spark DStream from Kafka always starts at beginning

查看我对解决方案已接受答案的最后评论

我这样配置 DStream

  val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "kafka1.example.com:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[KafkaAvroDeserializer],
    "group.id" -> "mygroup",
    "specific.avro.reader" -> true,
    "schema.registry.url" -> "http://schema.example.com:8081"
  )

  val stream = KafkaUtils.createDirectStream(
    ssc,
    PreferConsistent,
    Subscribe[String, DataFile](topics, kafkaParams)
  )

虽然这有效并且我按预期获得了 DataFile,但当我停止并重新 运行 作业时,它总是从主题的开头开始。我怎样才能让它在上次停止的地方继续?

跟进 1

正如 Bhima Rao Gogineni 的回答,我改变了我的配置:

val consumerParams =
  Map("bootstrap.servers" -> bootstrapServerString,
      "schema.registry.url" -> schemaRegistryUri.toString,
      "specific.avro.reader" -> "true",
      "group.id" -> "measuring-data-files",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[KafkaAvroDeserializer],
      "enable.auto.commit" -> (false: JavaBool),
      "auto.offset.reset" -> "earliest")

然后我设置了流:

val stream = KafkaUtils.
  createDirectStream(ssc,
                     LocationStrategies.PreferConsistent,
                     ConsumerStrategies.Subscribe[String, DataFile](List(inTopic), consumerParams))

然后我处理它:

stream.
  foreachRDD { rdd =>
    ... // Do stuff with the RDD - transform, produce to other topic etc.
    // Commit the offsets
    log.info("Committing the offsets")
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)        
  }

但是重新运行时还是从头开始。

这是我的 Kafka 日志的摘录:

一个运行:

[2018-07-04 07:47:31,593] INFO [GroupCoordinator 0]: Preparing to rebalance group measuring-data-files with old generation 22 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:47:31,594] INFO [GroupCoordinator 0]: Stabilized group measuring-data-files generation 23 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:47:31,599] INFO [GroupCoordinator 0]: Assignment received from leader for group measuring-data-files for generation 23 (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:48:06,690] INFO [ProducerStateManager partition=data-0] Writing producer snapshot at offset 131488999 (kafka.log.ProducerStateManager)
[2018-07-04 07:48:06,690] INFO [Log partition=data-0, dir=E:\confluent-4.1.1\data\kafka] Rolled new log segment at offset 131488999 in 1 ms. (kafka.log.Log)
[2018-07-04 07:48:10,788] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2018-07-04 07:48:30,074] INFO [GroupCoordinator 0]: Member consumer-1-262ece09-93c4-483e-b488-87057578dabc in group measuring-data-files has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:48:30,074] INFO [GroupCoordinator 0]: Preparing to rebalance group measuring-data-files with old generation 23 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:48:30,074] INFO [GroupCoordinator 0]: Group measuring-data-files with generation 24 is now empty (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:48:45,761] INFO [ProducerStateManager partition=data-0] Writing producer snapshot at offset 153680971 (kafka.log.ProducerStateManager)
[2018-07-04 07:48:45,763] INFO [Log partition=data-0, dir=E:\confluent-4.1.1\data\kafka] Rolled new log segment at offset 153680971 in 3 ms. (kafka.log.Log)
[2018-07-04 07:49:24,819] INFO [ProducerStateManager partition=data-0] Writing producer snapshot at offset 175872864 (kafka.log.ProducerStateManager)
[2018-07-04 07:49:24,820] INFO [Log partition=data-0, dir=E:\confluent-4.1.1\data\kafka] Rolled new log segment at offset 175872864 in 1 ms. (kafka.log.Log)

下一个运行:

[2018-07-04 07:50:13,748] INFO [GroupCoordinator 0]: Preparing to rebalance group measuring-data-files with old generation 24 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:13,749] INFO [GroupCoordinator 0]: Stabilized group measuring-data-files generation 25 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:13,754] INFO [GroupCoordinator 0]: Assignment received from leader for group measuring-data-files for generation 25 (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:43,758] INFO [GroupCoordinator 0]: Member consumer-1-906c2eaa-f012-4283-96fc-c34582de33fb in group measuring-data-files has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:43,758] INFO [GroupCoordinator 0]: Preparing to rebalance group measuring-data-files with old generation 25 (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)
[2018-07-04 07:50:43,758] INFO [GroupCoordinator 0]: Group measuring-data-files with generation 26 is now empty (__consumer_offsets-8) (kafka.coordinator.group.GroupCoordinator)

跟进 2

我像这样更详细地保存偏移量:

    // Commit the offsets
    log.info("Committing the offsets")
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    if(offsetRanges.isEmpty) {
      log.info("Offset ranges is empty...")
    } else {
      log.info("# offset ranges: %d" format offsetRanges.length)
    }
    object cb extends OffsetCommitCallback {

      def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata],
                     exception: Exception): Unit =
        if(exception != null) {
          log.info("Commit FAILED")
          log.error(exception.getMessage, exception)
        } else {
          log.info("Commit SUCCEEDED - count: %d" format offsets.size())
          offsets.
            asScala.
            foreach {
              case (p, omd) =>
                log.info("partition = %d; topic = %s; offset = %d; metadata = %s".
                  format(p.partition(), p.topic(), omd.offset(), omd.metadata()))
            }
        }
    }
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges, cb)

我得到这个例外:

2018-07-04 10:14:00 ERROR DumpTask$:136 - Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:600)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:541)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.onSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitPendingRequests(ConsumerNetworkClient.java:260)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:366)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:978)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.paranoidPoll(DirectKafkaInputDStream.scala:163)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:182)
        at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:209)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:342)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun$$anonfun$apply.apply(DStream.scala:342)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$$anonfun.apply(DStream.scala:341)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:336)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute.apply(DStream.scala:334)
        at scala.Option.orElse(Option.scala:289)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:331)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
        at org.apache.spark.streaming.DStreamGraph$$anonfun.apply(DStreamGraph.scala:122)
        at org.apache.spark.streaming.DStreamGraph$$anonfun.apply(DStreamGraph.scala:121)
        at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap.apply(TraversableLike.scala:241)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:121)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun.apply(JobGenerator.scala:249)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun.apply(JobGenerator.scala:247)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:247)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:183)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:89)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon.onReceive(JobGenerator.scala:88)
        at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)

我该如何解决?

Spark 提供了两个 APIs 来读取来自 kafka 的消息。

来自 Spark 文档

Approach 1: Receiver-based Approach

This approach uses a Receiver to receive the data. The Receiver is implemented using the Kafka high-level consumer API. As with all receivers, the data received from Kafka through a Receiver is stored in Spark executors, and then jobs launched by Spark Streaming processes the data.

Approach 2: Direct Approach (No Receivers)

This new receiver-less “direct” approach has been introduced in Spark 1.3 to ensure stronger end-to-end guarantees. Instead of using receivers to receive data, this approach periodically queries Kafka for the latest offsets in each topic+partition, and accordingly defines the offset ranges to process in each batch. When the jobs to process the data are launched, Kafka’s simple consumer API is used to read the defined ranges of offsets from Kafka (similar to read files from a file system).
Note that one disadvantage of this approach is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself

在您的情况下,您使用的是 Direct Approach,因此您需要自己处理消息偏移量并指定从您要读取消息的位置开始的偏移量范围。或者,如果您希望 zookeeper 处理您的消息偏移量,那么您可以使用 KafkaUtils.createStream() API.

使用基于接收器的方法

您可以在 spark documentation.

中找到有关如何处理 kafka 偏移量的更多信息

使用新的 Spark Kafka Connect API,我们可以尝试异步提交。

读取偏移量并在处理完成后提交。

相同的 Kafka 配置:

enable.auto.commit=false

auto.offset.reset=earliestauto.offset.reset=latest --> 如果 Kafka 主题中没有最后提交的可用偏移量,则此配置生效,然后它将根据此配置从开始或结束读取偏移量。

 stream.foreachRDD { rdd =>
   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

   // some time later, after outputs have completed
   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

这是来源:https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html