如何确保写csv是完整的?
How to make sure that write csv is complete?
我正在将数据集写入 CSV,如下所示:
df.coalesce(1)
.write()
.format("csv")
.option("header", "true")
.mode(SaveMode.Overwrite)
.save(sink);
sparkSession.streams().awaitAnyTermination();
如何确保流式处理作业终止时输出正确完成?
我有一个问题,如果我也终止了,sink 文件夹会被覆盖并且是空的 early/late。
附加信息:特别是如果主题没有消息,我的 spark 作业仍然是 运行 并用空文件覆盖结果。
How do I make sure, that when the streaming job gets terminated, that the output is done properly?
Spark Structured Streaming 的工作方式是流式查询(作业)连续运行并且 "when the streaming job gets terminated, that the output is done properly".
我要问的问题是流式查询是如何终止的。这是 StreamingQuery.stop
还是 Ctrl-C
/ kill -9
?
如果流式查询以强制方式终止 (Ctrl-C
/ kill -9
),那么,您会得到您所要求的 - 部分执行无法确保输出是正确的正确,因为进程(流式查询)被强制关闭。
使用 StreamingQuery.stop
时,流式查询将优雅地终止并写出当时的所有内容。
I have the problem, that the sink folder gets overwritten and that the folder is empty if I terminate too early/late.
如果您也终止了 early/late,您还会期待什么,因为流式查询无法完成其工作。你应该 stop
它优雅地得到预期的输出。
Additional Info: Particularly if the topic has no messages, my spark job is still running and overwrites the result with an empty file.
这是一个有趣的观察结果,需要进一步探索。
如果没有要处理的消息,则不会触发任何批处理,因此不会触发作业,因此不会"overwrites the result with an empty file."(因为不会执行任何任务)。
首先,我看到你没有使用过writeStream
我不太确定你的工作是流媒体工作。
现在,回答您的问题 1,您可以使用 StreamingQueryListener
来监视流式查询的进度。让另一个 StreamingQuery 从输出位置读取。监控它也是如此。将文件放在输出位置后,使用 StreamingQueryListener
中的查询名称和输入记录数来优雅地 stop
任何查询。 awaitAnyTermination
应该停止您的 spark 应用程序。以下代码可以提供帮助。
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(event: QueryStartedEvent) {
//logger message to show that the query has started
}
override def onQueryProgress(event: QueryProgressEvent) {
synchronized {
if(event.progress.name.equalsIgnoreCase("QueryName"))
{
recordsReadCount = recordsReadCount + event.progress.numInputRows
//Logger messages to show continuous progress
}
}
}
override def onQueryTerminated(event: QueryTerminatedEvent) {
synchronized {
//logger message to show the reason of termination.
}
}
})
回答你的第二个问题,我也不认为这是可能的,正如 Jacek 在回答中提到的那样。
我正在将数据集写入 CSV,如下所示:
df.coalesce(1)
.write()
.format("csv")
.option("header", "true")
.mode(SaveMode.Overwrite)
.save(sink);
sparkSession.streams().awaitAnyTermination();
如何确保流式处理作业终止时输出正确完成?
我有一个问题,如果我也终止了,sink 文件夹会被覆盖并且是空的 early/late。
附加信息:特别是如果主题没有消息,我的 spark 作业仍然是 运行 并用空文件覆盖结果。
How do I make sure, that when the streaming job gets terminated, that the output is done properly?
Spark Structured Streaming 的工作方式是流式查询(作业)连续运行并且 "when the streaming job gets terminated, that the output is done properly".
我要问的问题是流式查询是如何终止的。这是 StreamingQuery.stop
还是 Ctrl-C
/ kill -9
?
如果流式查询以强制方式终止 (Ctrl-C
/ kill -9
),那么,您会得到您所要求的 - 部分执行无法确保输出是正确的正确,因为进程(流式查询)被强制关闭。
使用 StreamingQuery.stop
时,流式查询将优雅地终止并写出当时的所有内容。
I have the problem, that the sink folder gets overwritten and that the folder is empty if I terminate too early/late.
如果您也终止了 early/late,您还会期待什么,因为流式查询无法完成其工作。你应该 stop
它优雅地得到预期的输出。
Additional Info: Particularly if the topic has no messages, my spark job is still running and overwrites the result with an empty file.
这是一个有趣的观察结果,需要进一步探索。
如果没有要处理的消息,则不会触发任何批处理,因此不会触发作业,因此不会"overwrites the result with an empty file."(因为不会执行任何任务)。
首先,我看到你没有使用过writeStream
我不太确定你的工作是流媒体工作。
现在,回答您的问题 1,您可以使用 StreamingQueryListener
来监视流式查询的进度。让另一个 StreamingQuery 从输出位置读取。监控它也是如此。将文件放在输出位置后,使用 StreamingQueryListener
中的查询名称和输入记录数来优雅地 stop
任何查询。 awaitAnyTermination
应该停止您的 spark 应用程序。以下代码可以提供帮助。
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(event: QueryStartedEvent) {
//logger message to show that the query has started
}
override def onQueryProgress(event: QueryProgressEvent) {
synchronized {
if(event.progress.name.equalsIgnoreCase("QueryName"))
{
recordsReadCount = recordsReadCount + event.progress.numInputRows
//Logger messages to show continuous progress
}
}
}
override def onQueryTerminated(event: QueryTerminatedEvent) {
synchronized {
//logger message to show the reason of termination.
}
}
})
回答你的第二个问题,我也不认为这是可能的,正如 Jacek 在回答中提到的那样。