结构化流从卡夫卡获取了错误的当前偏移量
structured streaming fetched wrong current offset from kafka
当 运行 使用 lib 激发结构化流式传输时:"org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0"
,我们不断收到有关当前偏移量获取的错误:
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost
task 0.3 in stage 0.0 (TID 3, qa2-hdp-4.acuityads.org, executor 2):
java.lang.AssertionError: assertion failed: latest offs et
-9223372036854775808 does not equal -1 at scala.Predef$.assert(Predef.scala:170) at
org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.resolveRange(KafkaMicroBatchReader.scala:371)
at
org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.(KafkaMicroBatchReader.scala:329)
at
org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartition.createPartitionReader(KafkaMicroBatchReader.scala:314)
at
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121) at
org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
出于某种原因,fetchLatestOffset 似乎为其中一个分区返回了 Long.MIN_VALUE。我检查了结构化流检查点,这是正确的,currentAvailableOffset 设置为 Long.MIN_VALUE.
kafka 代理版本:1.1.0。
我们使用的库:
{{libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0" }}
如何重现:
基本上我们启动了一个结构化流媒体并订阅了 4 个分区的主题。然后将一些消息生成到主题中,作业崩溃并像上面那样记录堆栈跟踪。
正如我们在日志中看到的那样,提交的偏移量似乎也很好:
=== Streaming Query ===
Identifier: [id = c46c67ee-3514-4788-8370-a696837b21b1, runId = 31878627-d473-4ee8-955d-d4d3f3f45eb9]
Current Committed Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: {"REVENUEEVENT":{"0":1}}}
Current Available Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: {"REVENUEEVENT":{"0":-9223372036854775808}}}
所以 spark streaming 记录了分区的正确值:0,但是从 kafka 返回的当前可用偏移量显示 Long.MIN_VALUE。
发现问题,这是由于 spark 结构化流媒体库中的整数溢出。详细信息张贴在这里:https://issues.apache.org/jira/browse/SPARK-26718
当 运行 使用 lib 激发结构化流式传输时:"org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0"
,我们不断收到有关当前偏移量获取的错误:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, qa2-hdp-4.acuityads.org, executor 2): java.lang.AssertionError: assertion failed: latest offs et -9223372036854775808 does not equal -1 at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.resolveRange(KafkaMicroBatchReader.scala:371) at org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartitionReader.(KafkaMicroBatchReader.scala:329) at org.apache.spark.sql.kafka010.KafkaMicroBatchInputPartition.createPartitionReader(KafkaMicroBatchReader.scala:314) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD.compute(DataSourceRDD.scala:42) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:402) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
出于某种原因,fetchLatestOffset 似乎为其中一个分区返回了 Long.MIN_VALUE。我检查了结构化流检查点,这是正确的,currentAvailableOffset 设置为 Long.MIN_VALUE.
kafka 代理版本:1.1.0。 我们使用的库:
{{libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0" }}
如何重现: 基本上我们启动了一个结构化流媒体并订阅了 4 个分区的主题。然后将一些消息生成到主题中,作业崩溃并像上面那样记录堆栈跟踪。
正如我们在日志中看到的那样,提交的偏移量似乎也很好:
=== Streaming Query ===
Identifier: [id = c46c67ee-3514-4788-8370-a696837b21b1, runId = 31878627-d473-4ee8-955d-d4d3f3f45eb9]
Current Committed Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: {"REVENUEEVENT":{"0":1}}}
Current Available Offsets: {KafkaV2[Subscribe[REVENUEEVENT]]: {"REVENUEEVENT":{"0":-9223372036854775808}}}
所以 spark streaming 记录了分区的正确值:0,但是从 kafka 返回的当前可用偏移量显示 Long.MIN_VALUE。
发现问题,这是由于 spark 结构化流媒体库中的整数溢出。详细信息张贴在这里:https://issues.apache.org/jira/browse/SPARK-26718