结构化流异常:流聚合不支持附加输出模式

Structured Streaming exception: Append output mode not supported for streaming aggregations

我 运行 我的 spark 作业时出现以下错误:

org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets;;

我不确定问题是否是由于缺少 watermark 引起的,我不知道如何在这种情况下应用。 以下是应用的聚合操作:

def aggregateByValue(): DataFrame = {
  df.withColumn("Value", expr("(BookingClass, Value)"))
    .groupBy("AirlineCode", "Origin", "Destination", "PoS", "TravelDate", "StartSaleDate", "EndSaleDate", "avsFlag")
    .agg(collect_list("Value").as("ValueSeq"))
    .drop("Value")
}

用法:

val theGroupedDF = theDF
  .multiplyYieldByHundred
  .explodeDates
  .aggregateByValue

val query = theGroupedDF.writeStream
  .outputMode("append")
  .format("console")
  .start()
query.awaitTermination()

outputMode 更改为 complete 解决了问题。

val query = theGroupedDF.writeStream
  .outputMode("complete")
  .format("console")
  .start()
query.awaitTermination()

添加这个可以解决问题:

  val theGroupedDF = theDF
  .multiplyYieldByHundred
  .explodeDates
  .aggregateByValue
  //code bellow
  .withColumn("timestamp", current_timestamp())
  .withWatermark("timestamp", "10 minutes")