如何获取用于结构化查询的 Kafka 偏移量以进行手动和可靠的偏移量管理?

How to get Kafka offsets for structured query for manual and reliable offset management?

Spark 2.2引入了Kafka的结构化流源。据我了解,它依赖于 HDFS 检查点目录来存储偏移量并保证 "exactly-once" 消息传递。

但是老码头(如 https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/)表示 Spark Streaming 检查点无法跨应用程序或 Spark 升级恢复,因此不是很可靠。作为一种解决方案,有一种做法是支持在支持 MySQL 或 RedshiftDB 等事务的外部存储中存储偏移量。

如果我想将 Kafka 源的偏移量存储到事务数据库,我如何从结构化流批处理中获取偏移量?

以前,可以通过将 RDD 转换为 HasOffsetRanges:

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges    

但是使用新的 Streaming API,我有一个 DatasetInternalRow,我找不到一个简单的方法来获取偏移量。 Sink API 只有 addBatch(batchId: Long, data: DataFrame) 方法,我如何才能获得给定批次 ID 的偏移量?

带有 Kafka 源的流式数据集有 offset 作为 field 之一。您可以简单地查询查询中的所有偏移量并将它们保存到 JDBC Sink

相关的 Spark DEV 邮件列表讨论线程是 here

总结:

Spark Streaming 将在未来的版本(> 2.2.0)中支持获取偏移量。关注的 JIRA 票 - https://issues-test.apache.org/jira/browse/SPARK-18258

对于 Spark <= 2.2.0,您可以通过从检查点目录读取 json 来获取给定批次的偏移量(API 不稳定,所以要小心):

val checkpointRoot = // read 'checkpointLocation' from custom sink params
val checkpointDir = new Path(new Path(checkpointRoot), "offsets").toUri.toString
val offsetSeqLog = new OffsetSeqLog(sparkSession, checkpointDir)

val endOffset: Map[TopicPartition, Long] = offsetSeqLog.get(batchId).map { endOffset =>
  endOffset.offsets.filter(_.isDefined).map { str =>
    JsonUtilsWrapper.jsonToOffsets(str.get.json)
  }
}


/**
  * Hack to access private API
  * Put this class into org.apache.spark.sql.kafka010 package
  */
object JsonUtilsWrapper {
  def offsetsToJson(partitionOffsets: Map[TopicPartition, Long]): String = {
    JsonUtils.partitionOffsets(partitionOffsets)
  }

  def jsonToOffsets(str: String): Map[TopicPartition, Long] = {
    JsonUtils.partitionOffsets(str)
  }
}

endOffset 将包含每个 topic/partition 的直到偏移量。 获取起始偏移量是有问题的,因为您必须阅读 'commit' 检查点目录。但通常情况下,您不关心起始偏移量,因为存储结束偏移量足以可靠地重新启动 Spark 作业。

请注意,您还必须将已处理的批次 ID 存储在您的存储中。在某些情况下,Spark 可以使用相同的批次 ID 重新 运行 失败的批次,因此请确保使用最新处理的批次 ID(您应该从外部存储中读取)初始化自定义接收器,并忽略 ID < latestProcessedBatchId 的任何批次.顺便说一句,批次 ID 在查询中不是唯一的,因此您必须分别为每个查询存储批次 ID。

Spark 2.2 introduced a Kafka's structured streaming source. As I understand, it's relying on HDFS checkpoint dir to store offsets and guarantee an "exactly-once" message delivery.

正确。

每个触发器 Spark Structured Streaming 都会将偏移量保存到检查点位置的 offset 目录(使用 checkpointLocation 选项定义或 spark.sql.streaming.checkpointLocation Spark 属性 或随机分配)应该保证偏移量被处理至多一次。该功能称为 Write Ahead Logs.

检查点位置中的另一个目录是 commits 完整的流式批处理目录,每个批处理一个文件(文件名是批处理 ID)。

引用Fault Tolerance Semantics中的官方文档:

To achieve that, we have designed the Structured Streaming sources, the sinks and the execution engine to reliably track the exact progress of the processing so that it can handle any kind of failure by restarting and/or reprocessing. Every streaming source is assumed to have offsets (similar to Kafka offsets, or Kinesis sequence numbers) to track the read position in the stream. The engine uses checkpointing and write ahead logs to record the offset range of the data being processed in each trigger. The streaming sinks are designed to be idempotent for handling reprocessing. Together, using replayable sources and idempotent sinks, Structured Streaming can ensure end-to-end exactly-once semantics under any failure.

每次执行触发器时,StreamExecution 检查目录,"computes" 已经处理了哪些偏移量。这给你至少一次语义和恰好一次总共。

But old docs (...) says that Spark Streaming checkpoints are not recoverable across applications or Spark upgrades and hence not very reliable.

你称他们为 "old" 是有原因的, 不是吗?

他们指的是旧的和(在我看来)死的 Spark Streaming,它不仅保留了偏移量,还保留了导致检查点几乎无法使用的情况的整个查询代码,例如当您更改代码时。

现在时代已经结束,结构化流式处理对检查点的内容和时间更加谨慎。

If I want to store offsets from Kafka source to a transactional DB, how can I obtain offset from a structured stream batch?

一个解决方案可能是实现或以某种方式使用 MetadataLog 用于处理偏移检查点的接口。 可以工作。

how can I suppose to get an offset for given batch id?

目前不可能。

我的理解是,您将无法做到这一点,因为流媒体的语义对您是隐藏的。您只是应该 而不是 处理这种称为偏移的低级 "thing",Spark Structured Streaming 使用它来提供恰好一次保证。

引用 Michael Armbrust 在 Spark 峰会上的演讲 Easy, Scalable, Fault Tolerant Stream Processing with Structured Streaming in Apache Spark

you should not have to reason about streaming

further in the talk (on the next slide)

you should write simple queries & Spark should continuously update the answer


一种使用StreamingQueryProgress获取偏移量(来自任何来源,包括Kafka)的方法,您可以使用StreamingQueryListener和[=16进行拦截=]回调。

onQueryProgress(event: QueryProgressEvent): Unit Called when there is some status update (ingestion rate updated, etc.)

使用 StreamingQueryProgress,您可以使用 SourceProgress 访问 sources 属性,这会给您想要的。