Spark Structured Streaming 多个 WriteStreams 到同一个接收器
Spark Structured Streaming Multiple WriteStreams to Same Sink
在 Spark Structured Streaming 2.2.1 中,同一个数据库接收器的两个 Writestream
没有按顺序发生。请建议如何让它们按顺序执行。
val deleteSink = ds1.writestream
.outputMode("update")
.foreach(mydbsink)
.start()
val UpsertSink = ds2.writestream
.outputMode("update")
.foreach(mydbsink)
.start()
deleteSink.awaitTermination()
UpsertSink.awaitTermination()
使用上面的代码,deleteSink
在UpsertSink
之后执行。
如果你想有两个并行的流运行,你必须使用
sparkSession.streams.awaitAnyTermination()
而不是
deleteSink.awaitTermination()
UpsertSink.awaitTermination()
在你的情况下,UpsertSink 永远不会启动,除非 deleteSink 停止或抛出异常,如 scaladoc 中所述
Waits for the termination of this
query, either by query.stop()
or by an exception.
If the query has terminated with an exception, then the exception will be thrown.
If the query has terminated, then all subsequent calls to this method will either return
immediately (if the query was terminated by stop()
), or throw the exception
immediately (if the query has terminated with exception).
在 Spark Structured Streaming 2.2.1 中,同一个数据库接收器的两个 Writestream
没有按顺序发生。请建议如何让它们按顺序执行。
val deleteSink = ds1.writestream
.outputMode("update")
.foreach(mydbsink)
.start()
val UpsertSink = ds2.writestream
.outputMode("update")
.foreach(mydbsink)
.start()
deleteSink.awaitTermination()
UpsertSink.awaitTermination()
使用上面的代码,deleteSink
在UpsertSink
之后执行。
如果你想有两个并行的流运行,你必须使用
sparkSession.streams.awaitAnyTermination()
而不是
deleteSink.awaitTermination()
UpsertSink.awaitTermination()
在你的情况下,UpsertSink 永远不会启动,除非 deleteSink 停止或抛出异常,如 scaladoc 中所述
Waits for the termination of
this
query, either byquery.stop()
or by an exception. If the query has terminated with an exception, then the exception will be thrown. If the query has terminated, then all subsequent calls to this method will either return immediately (if the query was terminated bystop()
), or throw the exception immediately (if the query has terminated with exception).