有时,Spark Structured Streaming 写入流会出现 IllegalStateException:在写入批处理 4 时进行竞赛
Once in a while Spark Structured Streaming write stream is getting IllegalStateException: Race while writing batch 4
我在同一个 spark 结构化流会话中有多个查询 运行。
查询正在将 parquet 记录写入 Google Bucket 并将检查点写入 Google Bucket.
val query1 = df1
.select(col("key").cast("string"),from_json(col("value").cast("string"), schema, Map.empty[String, String]).as("data"))
.select("key","data.*")
.writeStream.format("parquet").option("path", path).outputMode("append")
.option("checkpointLocation", checkpoint_dir1)
.partitionBy("key")/*.trigger(Trigger.ProcessingTime("5 seconds"))*/
.queryName("query1").start()
val query2 = df2.select(col("key").cast("string"),from_json(col("value").cast("string"), schema, Map.empty[String, String]).as("data"))
.select("key","data.*")
.writeStream.format("parquet").option("path", path).outputMode("append")
.option("checkpointLocation", checkpoint_dir2)
.partitionBy("key")/*.trigger(Trigger.ProcessingTime("5 seconds"))*/
.queryName("query2").start()
问题: 有时作业失败 ava.lang.IllegalStateException:在写入批次 4 时进行竞赛
日志:
Caused by: java.lang.IllegalStateException: Race while writing batch 4
at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitJob(ManifestFileCommitProtocol.scala:67)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:187)
... 20 more
20/07/24 19:40:15 INFO SparkContext: Invoking stop() from shutdown hook
这个错误是因为有两个写入器写入输出路径。文件流接收器不支持多个编写器。它假设只有一个写入路径的作者。每个查询都需要使用自己的输出目录。
因此,为了解决这个问题,您可以让每个查询都使用自己的输出目录。回读数据时,可以加载每个输出目录并合并它们。
您还可以使用支持多个并发写入器的流接收器,例如 Delta Lake library. It's also supported by Google Cloud: https://cloud.google.com/blog/products/data-analytics/getting-started-with-new-table-formats-on-dataproc 。此 link 包含有关如何在 Google Cloud 上使用 Delta Lake 的说明。它没有提到流式传输的情况,但是您需要做的是将代码中的 format("parquet")
更改为 format("delta")
。
我在同一个 spark 结构化流会话中有多个查询 运行。 查询正在将 parquet 记录写入 Google Bucket 并将检查点写入 Google Bucket.
val query1 = df1
.select(col("key").cast("string"),from_json(col("value").cast("string"), schema, Map.empty[String, String]).as("data"))
.select("key","data.*")
.writeStream.format("parquet").option("path", path).outputMode("append")
.option("checkpointLocation", checkpoint_dir1)
.partitionBy("key")/*.trigger(Trigger.ProcessingTime("5 seconds"))*/
.queryName("query1").start()
val query2 = df2.select(col("key").cast("string"),from_json(col("value").cast("string"), schema, Map.empty[String, String]).as("data"))
.select("key","data.*")
.writeStream.format("parquet").option("path", path).outputMode("append")
.option("checkpointLocation", checkpoint_dir2)
.partitionBy("key")/*.trigger(Trigger.ProcessingTime("5 seconds"))*/
.queryName("query2").start()
问题: 有时作业失败 ava.lang.IllegalStateException:在写入批次 4 时进行竞赛
日志:
Caused by: java.lang.IllegalStateException: Race while writing batch 4
at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitJob(ManifestFileCommitProtocol.scala:67)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:187)
... 20 more
20/07/24 19:40:15 INFO SparkContext: Invoking stop() from shutdown hook
这个错误是因为有两个写入器写入输出路径。文件流接收器不支持多个编写器。它假设只有一个写入路径的作者。每个查询都需要使用自己的输出目录。
因此,为了解决这个问题,您可以让每个查询都使用自己的输出目录。回读数据时,可以加载每个输出目录并合并它们。
您还可以使用支持多个并发写入器的流接收器,例如 Delta Lake library. It's also supported by Google Cloud: https://cloud.google.com/blog/products/data-analytics/getting-started-with-new-table-formats-on-dataproc 。此 link 包含有关如何在 Google Cloud 上使用 Delta Lake 的说明。它没有提到流式传输的情况,但是您需要做的是将代码中的 format("parquet")
更改为 format("delta")
。