Spark Structured Streaming 无法在 kafka 中写入流
Spark Structured Streaming cannot writeStream in kafka
我正在使用结构化流媒体,我正在尝试将我的结果发送到名为 "results" 的 kafka 主题中。
我收到以下错误:
'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
有人能帮忙吗?
query1 = prediction.writeStream.format("kafka")\
.option("topic", "results")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("checkpointLocation", "checkpoint")\
.start()
query1.awaitTermination()
预测架构是:
root
|-- prediction: double (nullable = false)
|-- count: long (nullable = false)
我是不是漏掉了什么?
错误消息提示缺少什么:水印。
水印用于在聚合流数据时处理延迟传入的数据。详细信息可以在 Spark documentation for Structured Streaming 中找到。
在与聚合中使用的时间戳列相同的列上使用 withWatermark 很重要。
Spark 文档中给出了如何使用 withWatermark
的示例:
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word) \
.count()
我正在使用结构化流媒体,我正在尝试将我的结果发送到名为 "results" 的 kafka 主题中。
我收到以下错误:
'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
有人能帮忙吗?
query1 = prediction.writeStream.format("kafka")\
.option("topic", "results")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("checkpointLocation", "checkpoint")\
.start()
query1.awaitTermination()
预测架构是:
root
|-- prediction: double (nullable = false)
|-- count: long (nullable = false)
我是不是漏掉了什么?
错误消息提示缺少什么:水印。
水印用于在聚合流数据时处理延迟传入的数据。详细信息可以在 Spark documentation for Structured Streaming 中找到。
在与聚合中使用的时间戳列相同的列上使用 withWatermark 很重要。
Spark 文档中给出了如何使用 withWatermark
的示例:
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word) \
.count()