为什么将聚合数据帧写入文件接收器时出现异常?
Why an exception while writing an aggregated dataframe to a file sink?
我正在对流数据帧执行聚合并尝试将结果写入输出目录。但是我得到一个例外说
pyspark.sql.utils.AnalysisException: 'Data source json does not support Update output mode;
我在“完整”输出模式下遇到了类似的错误。
这是我的代码:
grouped_df = logs_df.groupBy('host', 'timestamp').agg(count('host').alias('total_count'))
result_host = grouped_df.filter(col('total_count') > threshold)
writer_query = result_host.writeStream \
.format("json") \
.queryName("JSON Writer") \
.outputMode("update") \
.option("path", "output") \
.option("checkpointLocation", "chk-point-dir") \
.trigger(processingTime="1 minute") \
.start()
writer_query.awaitTermination()
根据 OutputSinks 上的文档,FileSinks 仅支持“附加”模式,请参阅下面 table.
中的“支持的输出模式”
我正在对流数据帧执行聚合并尝试将结果写入输出目录。但是我得到一个例外说
pyspark.sql.utils.AnalysisException: 'Data source json does not support Update output mode;
我在“完整”输出模式下遇到了类似的错误。
这是我的代码:
grouped_df = logs_df.groupBy('host', 'timestamp').agg(count('host').alias('total_count'))
result_host = grouped_df.filter(col('total_count') > threshold)
writer_query = result_host.writeStream \
.format("json") \
.queryName("JSON Writer") \
.outputMode("update") \
.option("path", "output") \
.option("checkpointLocation", "chk-point-dir") \
.trigger(processingTime="1 minute") \
.start()
writer_query.awaitTermination()
根据 OutputSinks 上的文档,FileSinks 仅支持“附加”模式,请参阅下面 table.
中的“支持的输出模式”