为什么使用套接字源和多个接收器的流式查询不起作用?
Why does streaming query with socket source and multiple sinks not work?
我正在尝试使用多个查询写入 spark 中的不同接收器。第一个查询有效,输出被写入接收器,但第二个查询无效。
谁能指出我的错误。
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
import spark.implicits._
val source = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
.as[String]
.map {e =>
println(e)
e
}
// With Multiple Queries
val q1 = source.writeStream.outputMode("append").format("console")
.trigger(Trigger.ProcessingTime(1000))
.start()
println(q1)
val q2 = source.writeStream.outputMode("append")
.format("csv")
.option("path", "output.csv")
.option("checkpointLocation", "/tmp/checkpoint/test")
.trigger(Trigger.ProcessingTime(1000))
.start()
println(q2)
spark.streams.awaitAnyTermination()
控制台接收器正在运行,但 CSV 接收器未写入输出。如果我更改顺序,则 csv 接收器可以工作,但不能使用控制台。
我假设您正在使用 netcat 或类似的实用程序来生成数据。此类实用程序并非设计为可重放且不提供持久层,因此数据一旦被使用就会不可逆转地销毁。
因此第二个流将监听变化,但永远不会有数据到达它。
我正在尝试使用多个查询写入 spark 中的不同接收器。第一个查询有效,输出被写入接收器,但第二个查询无效。
谁能指出我的错误。
val spark = SparkSession
.builder
.appName("StructuredNetworkWordCount")
.getOrCreate()
import spark.implicits._
val source = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
.as[String]
.map {e =>
println(e)
e
}
// With Multiple Queries
val q1 = source.writeStream.outputMode("append").format("console")
.trigger(Trigger.ProcessingTime(1000))
.start()
println(q1)
val q2 = source.writeStream.outputMode("append")
.format("csv")
.option("path", "output.csv")
.option("checkpointLocation", "/tmp/checkpoint/test")
.trigger(Trigger.ProcessingTime(1000))
.start()
println(q2)
spark.streams.awaitAnyTermination()
控制台接收器正在运行,但 CSV 接收器未写入输出。如果我更改顺序,则 csv 接收器可以工作,但不能使用控制台。
我假设您正在使用 netcat 或类似的实用程序来生成数据。此类实用程序并非设计为可重放且不提供持久层,因此数据一旦被使用就会不可逆转地销毁。
因此第二个流将监听变化,但永远不会有数据到达它。