结构化流异常:流聚合不支持附加输出模式
Structured Streaming exception: Append output mode not supported for streaming aggregations
我 运行 我的 spark 作业时出现以下错误:
org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets;;
我不确定问题是否是由于缺少 watermark 引起的,我不知道如何在这种情况下应用。
以下是应用的聚合操作:
def aggregateByValue(): DataFrame = {
df.withColumn("Value", expr("(BookingClass, Value)"))
.groupBy("AirlineCode", "Origin", "Destination", "PoS", "TravelDate", "StartSaleDate", "EndSaleDate", "avsFlag")
.agg(collect_list("Value").as("ValueSeq"))
.drop("Value")
}
用法:
val theGroupedDF = theDF
.multiplyYieldByHundred
.explodeDates
.aggregateByValue
val query = theGroupedDF.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
将 outputMode
更改为 complete
解决了问题。
val query = theGroupedDF.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
添加这个可以解决问题:
val theGroupedDF = theDF
.multiplyYieldByHundred
.explodeDates
.aggregateByValue
//code bellow
.withColumn("timestamp", current_timestamp())
.withWatermark("timestamp", "10 minutes")
我 运行 我的 spark 作业时出现以下错误:
org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets;;
我不确定问题是否是由于缺少 watermark 引起的,我不知道如何在这种情况下应用。 以下是应用的聚合操作:
def aggregateByValue(): DataFrame = {
df.withColumn("Value", expr("(BookingClass, Value)"))
.groupBy("AirlineCode", "Origin", "Destination", "PoS", "TravelDate", "StartSaleDate", "EndSaleDate", "avsFlag")
.agg(collect_list("Value").as("ValueSeq"))
.drop("Value")
}
用法:
val theGroupedDF = theDF
.multiplyYieldByHundred
.explodeDates
.aggregateByValue
val query = theGroupedDF.writeStream
.outputMode("append")
.format("console")
.start()
query.awaitTermination()
将 outputMode
更改为 complete
解决了问题。
val query = theGroupedDF.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
添加这个可以解决问题:
val theGroupedDF = theDF
.multiplyYieldByHundred
.explodeDates
.aggregateByValue
//code bellow
.withColumn("timestamp", current_timestamp())
.withWatermark("timestamp", "10 minutes")