有时,Spark Structured Streaming 写入流会出现 IllegalStateException:在写入批处理 4 时进行竞赛

Once in a while Spark Structured Streaming write stream is getting IllegalStateException: Race while writing batch 4

我在同一个 spark 结构化流会话中有多个查询 运行。 查询正在将 parquet 记录写入 Google Bucket 并将检查点写入 Google Bucket.

val query1 = df1
        .select(col("key").cast("string"),from_json(col("value").cast("string"), schema, Map.empty[String, String]).as("data"))
        .select("key","data.*")
        .writeStream.format("parquet").option("path", path).outputMode("append")
        .option("checkpointLocation", checkpoint_dir1)
        .partitionBy("key")/*.trigger(Trigger.ProcessingTime("5 seconds"))*/
        .queryName("query1").start()

val query2 = df2.select(col("key").cast("string"),from_json(col("value").cast("string"), schema, Map.empty[String, String]).as("data"))
            .select("key","data.*")
            .writeStream.format("parquet").option("path", path).outputMode("append")
            .option("checkpointLocation", checkpoint_dir2)
            .partitionBy("key")/*.trigger(Trigger.ProcessingTime("5 seconds"))*/
            .queryName("query2").start()

问题: 有时作业失败 ava.lang.IllegalStateException:在写入批次 4 时进行竞赛

日志:

Caused by: java.lang.IllegalStateException: Race while writing batch 4
    at org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol.commitJob(ManifestFileCommitProtocol.scala:67)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:187)
    ... 20 more
20/07/24 19:40:15 INFO SparkContext: Invoking stop() from shutdown hook

这个错误是因为有两个写入器写入输出路径。文件流接收器不支持多个编写器。它假设只有一个写入路径的作者。每个查询都需要使用自己的输出目录。

因此,为了解决这个问题,您可以让每个查询都使用自己的输出目录。回读数据时,可以加载每个输出目录并合并它们。

您还可以使用支持多个并发写入器的流接收器,例如 Delta Lake library. It's also supported by Google Cloud: https://cloud.google.com/blog/products/data-analytics/getting-started-with-new-table-formats-on-dataproc 。此 link 包含有关如何在 Google Cloud 上使用 Delta Lake 的说明。它没有提到流式传输的情况,但是您需要做的是将代码中的 format("parquet") 更改为 format("delta")