在 spark 结构化流中动态更改 hdfs 写入路径

Change hdfs write path dynamically in spark structured streaming

我有一个 spark 结构化流应用程序,它从 kafka 读取数据并将其写入 hdfs。我想根据当前日期动态更改 hdfs 写入路径,但结构化流式传输似乎无法那样工作。它只是创建一个应用程序启动日期的文件夹,并且即使日期发生变化也会继续写入同一文件夹。有什么办法可以根据当前日期动态更改路径吗?

下面是我的 writestream 的样子

 val inputFormat = new SimpleDateFormat("yyyy-MM-dd")
 val outPath = "maindir/sb_topic/data/loaddate="

val dswWriteStream =dfresult.writeStream
    .outputMode(outputMode) 
    .format(writeformat) 
    .option("path",outPath+inputFormat.format((new java.util.Date()).getTime())) //hdfs file write path
    .option("checkpointLocation", checkpointdir) 
    .option("maxRecordsPerFile", 999999999) 
    .trigger(Trigger.ProcessingTime("10 minutes")) 

解决方案: 我通过将当前日期列(例如 'loaddate')添加到父数据帧 'dfresult' 来解决这个问题,然后按该列对写入流进行分区。

dswWriteStream.partitionBy('loaddate')