spark structured streaming exception:没有水印不支持附加输出模式

spark structured streaming exception : Append output mode not supported without watermark

我对 year 进行了简单的分组操作,并进行了一些汇总,如下所示。我尝试将结果附加到 hdfs 路径,如下所示。我收到错误提示,

   org.apache.spark.sql.AnalysisException: Append output mode not supported 
   when there are streaming aggregations on streaming DataFrames/DataSets 
   without watermark;;
   Aggregate [year#88], [year#88, sum(rating#89) AS rating#173, 
   sum(cast(duration#90 as bigint)) AS duration#175L]
   +- EventTimeWatermark event_time#96: timestamp, interval 10 seconds

下面是我的代码。有人可以帮忙吗

    val spark =SparkSession.builder().appName("mddd").
    enableHiveSupport().config("hive.exec.dynamic.partition", "true").
    config("hive.exec.dynamic.partition.mode", "nonstrict").
    config("spark.sql.streaming.checkpointLocation", "/user/sa/sparkCheckpoint").
    config("spark.debug.maxToStringFields",100).
    getOrCreate()

    val mySchema = StructType(Array(
     StructField("id", IntegerType),
     StructField("name", StringType),
     StructField("year", IntegerType),
     StructField("rating", DoubleType),
     StructField("duration", IntegerType)
    ))

    val xmlData = spark.readStream.option("sep", ",").schema(mySchema).csv("file:///home/sa/kafdata/") 
    import java.util.Calendar
    val df_agg_without_time= xmlData.withColumn("event_time", to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))

    val df_agg_with_time = df_agg_without_time.withWatermark("event_time", "10 seconds").groupBy($"year").agg(sum($"rating").as("rating"),sum($"duration").as("duration"))
    val cop = df_agg_with_time.withColumn("column_name_with", to_json(struct($"window")))

    df_agg_with_time.writeStream.outputMode("append").partitionBy("year").format("csv").
    option("path", "hdfs://dio/apps/hive/warehouse/gt.db/sample_mov/").start()

我输入的是 csv 格式

    id,name,year,rating,duration
    1,The Nightmare Before Christmas,1993,3.9,4568
    2,The Mummy,1993,3.5,4388
    3,Orphans of the Storm,1921,3.2,9062
    4,The Object of Beauty,1921,2.8,6150
    5,Night Tide,1963,2.8,5126
    6,One Magic Christmas,1963,3.8,5333
    7,Muriel's Wedding,1963,3.5,6323
    8,Mother's Boys,1963,3.4,5733

我的预期输出应该在 hdfs 中,并在年份进行分区

    year,rating,duration
    1993,7.4,8956
    1921,6.0,15212
    1963,10.7,17389

我真的不确定我的方法有什么问题。请帮忙

这是一个多方面的问题:

  • Structured Streaming API 有限制恕我直言。
  • 一个人可以通过管道传输多个查询,技术上它 运行s,但不会产生任何输出,因此这样做没有任何价值 - 即使您可以指定它也无法执行此类其他功能。
  • 手册指出:必须在与时间戳相同的列上调用 withWatermark 聚合中使用的列。

    For example, df.withWatermark("time", "1 min").groupBy("time2").count() is invalid in Append output mode, as watermark is defined on a different column from the aggregation column. Simply stated, for Append you need WaterMark. I think you have an issue here.

  • 使用路径时是否有以下关系?

  .enableHiveSupport().config("hive.exec.dynamic.partition", "true")
  .config("hive.exec.dynamic.partition.mode", "nonstrict")
  • 此外,您的最终用例未知。这里的问题是这是否是一个好方法,但我的见解太少无法评估,我们只是假设它是这样。
  • 我们假设同一部电影的评分将成为未来微批次的一部分。
  • 提要中缺少 event_time,但您自己创建了它。有点不切实际,但我们可以接受,虽然 TimeStamp 有点问题。
  • 我建议您查看此博客 http://blog.madhukaraphatak.com/introduction-to-spark-structured-streaming-part-12/,以获得对结构化流的出色评估。

所以,一般来说:

  • 在选项 Complete、Append 和 Update 中,我认为您选择了正确的 Append。可以使用更新,但我将其排除在范围之外。
  • 但是没有把event_time放在一个Window中。你应该做这个。我在这里的最后放了一个例子,运行 在 Spark Shell 中我无法让 case class 工作 - 这就是为什么它花了这么长时间,但在编译程序中它是不是问题,也不是 DataBricks。
  • 从功能上讲,您不能编写多个查询来执行您尝试过的聚合。它只是在我的例子中产生了一些错误。
  • 我建议你使用我使用的时间戳方法,它更容易,因为我无法测试你所有的东西。

然后:

  • 或者,将此模块的输出写入 KAFKA 主题并将该主题读入另一个模块并进行第二次聚合并写出,同时考虑到您可以在不同的微批中获得多个电影评级。
  • 或者,将数据写出,因为它包括一个计数字段,然后提供一个视图层用于查询,考虑到有多个写入的事实。

这是一个使用套接字输入和 Spark Shell 的示例 - 您可以推断出您自己的数据,以及微批处理的输出(注意在查看数据时存在延迟):

import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode

val sparkSession = SparkSession.builder
  .master("local")
  .appName("example")
  .getOrCreate()
//create stream from socket

import sparkSession.implicits._
sparkSession.sparkContext.setLogLevel("ERROR")
val socketStreamDs = sparkSession.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()
  .as[String]

val stockDs = socketStreamDs.map(value => (value.trim.split(","))).map(entries=>(new java.sql.Timestamp(entries(0).toLong),entries(1),entries(2).toDouble)).toDF("time","symbol","value")//.toDS() 

val windowedCount = stockDs
  .withWatermark("time", "20000 milliseconds")
  .groupBy( 
    window($"time", "10 seconds"),
           $"symbol" 
  )
  .agg(sum("value"), count($"symbol"))

val query =
  windowedCount.writeStream
    .format("console")
    .option("truncate", "false")
    .outputMode(OutputMode.Append())

query.start().awaitTermination()

结果:

Batch: 14
----------------------------------------------+------+----------+-------------+  
|window                                       |symbol|sum(value)|count(symbol)|
+---------------------------------------------+------+----------+-------------+
|[2016-04-27 04:34:20.0,2016-04-27 04:34:30.0]|"aap1"|4200.0    |6            |
|[2016-04-27 04:34:30.0,2016-04-27 04:34:40.0]|"app1"|800.0     |2            |
|[2016-04-27 04:34:20.0,2016-04-27 04:34:30.0]|"aap2"|2500.0    |1            |
|[2016-04-27 04:34:40.0,2016-04-27 04:34:50.0]|"app1"|2800.0    |4            |
+---------------------------------------------+------+----------+-------------+

这是一个很大的话题,你需要整体看待它。

对于输出,您可以看到在某些情况下计数可能很方便,尽管 avg 输出可用于计算总体平均值。成功。