在 spark 结构化流中动态更改 hdfs 写入路径
Change hdfs write path dynamically in spark structured streaming
我有一个 spark 结构化流应用程序,它从 kafka 读取数据并将其写入 hdfs。我想根据当前日期动态更改 hdfs 写入路径,但结构化流式传输似乎无法那样工作。它只是创建一个应用程序启动日期的文件夹,并且即使日期发生变化也会继续写入同一文件夹。有什么办法可以根据当前日期动态更改路径吗?
下面是我的 writestream 的样子
val inputFormat = new SimpleDateFormat("yyyy-MM-dd")
val outPath = "maindir/sb_topic/data/loaddate="
val dswWriteStream =dfresult.writeStream
.outputMode(outputMode)
.format(writeformat)
.option("path",outPath+inputFormat.format((new java.util.Date()).getTime())) //hdfs file write path
.option("checkpointLocation", checkpointdir)
.option("maxRecordsPerFile", 999999999)
.trigger(Trigger.ProcessingTime("10 minutes"))
解决方案: 我通过将当前日期列(例如 'loaddate')添加到父数据帧 'dfresult' 来解决这个问题,然后按该列对写入流进行分区。
dswWriteStream.partitionBy('loaddate')
我有一个 spark 结构化流应用程序,它从 kafka 读取数据并将其写入 hdfs。我想根据当前日期动态更改 hdfs 写入路径,但结构化流式传输似乎无法那样工作。它只是创建一个应用程序启动日期的文件夹,并且即使日期发生变化也会继续写入同一文件夹。有什么办法可以根据当前日期动态更改路径吗?
下面是我的 writestream 的样子
val inputFormat = new SimpleDateFormat("yyyy-MM-dd")
val outPath = "maindir/sb_topic/data/loaddate="
val dswWriteStream =dfresult.writeStream
.outputMode(outputMode)
.format(writeformat)
.option("path",outPath+inputFormat.format((new java.util.Date()).getTime())) //hdfs file write path
.option("checkpointLocation", checkpointdir)
.option("maxRecordsPerFile", 999999999)
.trigger(Trigger.ProcessingTime("10 minutes"))
解决方案: 我通过将当前日期列(例如 'loaddate')添加到父数据帧 'dfresult' 来解决这个问题,然后按该列对写入流进行分区。
dswWriteStream.partitionBy('loaddate')