Spark Structured Streaming 水印错误
Spark Structured Streaming watermark error
跟进
我有json流数据,格式与下面相同
| A | B |
|-------|------------------------------------------|
| ABC | [{C:1, D:1}, {C:2, D:4}] |
| XYZ | [{C:3, D :6}, {C:9, D:11}, {C:5, D:12}] |
我需要把它转换成下面的格式
| A | C | D |
|-------|-----|------|
| ABC | 1 | 1 |
| ABC | 2 | 4 |
| XYZ | 3 | 6 |
| XYZ | 9 | 11 |
| XYZ | 5 | 12 |
为了实现这一点,按照上一个问题的建议执行转换。
val df1 = df0.select($"A", explode($"B")).toDF("A", "Bn")
val df2 = df1.withColumn("SeqNum", monotonically_increasing_id()).toDF("A", "Bn", "SeqNum")
val df3 = df2.select($"A", explode($"Bn"), $"SeqNum").toDF("A", "B", "C", "SeqNum")
val df4 = df3.withColumn("dummy", concat( $"SeqNum", lit("||"), $"A"))
val df5 = df4.select($"dummy", $"B", $"C").groupBy("dummy").pivot("B").agg(first($"C"))
val df6 = df5.withColumn("A", substring_index(col("dummy"), "||", -1)).drop("dummy")
现在我尝试将结果保存到 HDFS 中的 csv 文件
df6.withWatermark("event_time", "0 seconds")
.writeStream
.trigger(Trigger.ProcessingTime("0 seconds"))
.queryName("query_db")
.format("parquet")
.option("checkpointLocation", "/path/to/checkpoint")
.option("path", "/path/to/output")
// .outputMode("complete")
.start()
现在我得到以下错误。
Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
EventTimeWatermark event_time#223: timestamp, interval
我怀疑我没有执行任何需要它存储超出该行处理时间的聚合值的聚合。为什么会出现此错误?我可以将水印保留为 0 秒吗?
如有任何帮助,我们将不胜感激。
据我了解,只有在事件时间执行window操作时才需要加水印。 Spark 使用水印来处理延迟数据,出于同样的目的,Spark 需要保存较旧的聚合。
下面的 link 用例子很好地解释了这一点:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking
我在您的转换中没有看到任何 window 操作,如果是这种情况,那么我认为您可以尝试 运行 不加水印的流查询。
当对火花流结构进行分组时,您必须在数据框中已经有了水印,并在分组时将其考虑在内,方法是在聚合中包含 window 水印.
df.groupBy(col("dummy"), window(col("event_time"), "1 day")).
跟进
我有json流数据,格式与下面相同
| A | B |
|-------|------------------------------------------|
| ABC | [{C:1, D:1}, {C:2, D:4}] |
| XYZ | [{C:3, D :6}, {C:9, D:11}, {C:5, D:12}] |
我需要把它转换成下面的格式
| A | C | D |
|-------|-----|------|
| ABC | 1 | 1 |
| ABC | 2 | 4 |
| XYZ | 3 | 6 |
| XYZ | 9 | 11 |
| XYZ | 5 | 12 |
为了实现这一点,按照上一个问题的建议执行转换。
val df1 = df0.select($"A", explode($"B")).toDF("A", "Bn")
val df2 = df1.withColumn("SeqNum", monotonically_increasing_id()).toDF("A", "Bn", "SeqNum")
val df3 = df2.select($"A", explode($"Bn"), $"SeqNum").toDF("A", "B", "C", "SeqNum")
val df4 = df3.withColumn("dummy", concat( $"SeqNum", lit("||"), $"A"))
val df5 = df4.select($"dummy", $"B", $"C").groupBy("dummy").pivot("B").agg(first($"C"))
val df6 = df5.withColumn("A", substring_index(col("dummy"), "||", -1)).drop("dummy")
现在我尝试将结果保存到 HDFS 中的 csv 文件
df6.withWatermark("event_time", "0 seconds")
.writeStream
.trigger(Trigger.ProcessingTime("0 seconds"))
.queryName("query_db")
.format("parquet")
.option("checkpointLocation", "/path/to/checkpoint")
.option("path", "/path/to/output")
// .outputMode("complete")
.start()
现在我得到以下错误。
Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;; EventTimeWatermark event_time#223: timestamp, interval
我怀疑我没有执行任何需要它存储超出该行处理时间的聚合值的聚合。为什么会出现此错误?我可以将水印保留为 0 秒吗?
如有任何帮助,我们将不胜感激。
据我了解,只有在事件时间执行window操作时才需要加水印。 Spark 使用水印来处理延迟数据,出于同样的目的,Spark 需要保存较旧的聚合。
下面的 link 用例子很好地解释了这一点: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#handling-late-data-and-watermarking
我在您的转换中没有看到任何 window 操作,如果是这种情况,那么我认为您可以尝试 运行 不加水印的流查询。
当对火花流结构进行分组时,您必须在数据框中已经有了水印,并在分组时将其考虑在内,方法是在聚合中包含 window 水印.
df.groupBy(col("dummy"), window(col("event_time"), "1 day")).