结构化流式传输:必须使用 writeStream.start() 执行流式源查询
Structured streaming: Queries with streaming sources must be executed with writeStream.start()
我正在尝试使用结构化流从文件中读取一些数据,最后将其写入 Cassandra。但是我收到以下错误(在 cassandra 写作之前很久)
"org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;"
这是我使用的代码片段
val ip15M = spark.readStream.schema(NewsSchema).parquet(INPUT_DIRECTORY)
val dataframeToRowColFunction = new RowToColumn(table) // This seems to work fine
val pairs = ip15M.toJavaRDD.flatMapToPair(dataframeToRowColFunction.FlatMapData) // This fails
// ... Other code
下面是 RowToColumn class 的样子
class RowToColumn (var table: Table) extends java.io.Serializable{
val decomposer = new EventDecomposer(table)
val FlatMapData: PairFlatMapFunction[Row, AggregateKey, AggregateValue] = new PairFlatMapFunction[Row, AggregateKey, AggregateValue]() {
//val FlatMapData: PairFlatMapFunction[Row, String, AggregateValue] = new PairFlatMapFunction[Row, String, AggregateValue]() {
println(" in FlatMapData") // This is printed
override def call(x: Row) = {
println(" in Call method") // This is not printed
// Other code ...
}
}
这项工作在没有流媒体的情况下工作正常。另外,我看了其他 link1 and link2
,但没有解决问题
您可以通过以下方式处理写入部分,因为我不知道 Cassandra 是否有用于 spark 中结构化流的流到流连接器:
ip15M
.writeStream
.foreachBatch { (df, batchId) => {
// here apply all of your logic on dataframe
}
}
.start()
请记住,在 foreach
循环中,您处理的是 dataframe
,而不是流,您很可能可以将它们直接保存在 Cassandra 中。
我正在尝试使用结构化流从文件中读取一些数据,最后将其写入 Cassandra。但是我收到以下错误(在 cassandra 写作之前很久)
"org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;"
这是我使用的代码片段
val ip15M = spark.readStream.schema(NewsSchema).parquet(INPUT_DIRECTORY)
val dataframeToRowColFunction = new RowToColumn(table) // This seems to work fine
val pairs = ip15M.toJavaRDD.flatMapToPair(dataframeToRowColFunction.FlatMapData) // This fails
// ... Other code
下面是 RowToColumn class 的样子
class RowToColumn (var table: Table) extends java.io.Serializable{
val decomposer = new EventDecomposer(table)
val FlatMapData: PairFlatMapFunction[Row, AggregateKey, AggregateValue] = new PairFlatMapFunction[Row, AggregateKey, AggregateValue]() {
//val FlatMapData: PairFlatMapFunction[Row, String, AggregateValue] = new PairFlatMapFunction[Row, String, AggregateValue]() {
println(" in FlatMapData") // This is printed
override def call(x: Row) = {
println(" in Call method") // This is not printed
// Other code ...
}
}
这项工作在没有流媒体的情况下工作正常。另外,我看了其他 link1 and link2 ,但没有解决问题
您可以通过以下方式处理写入部分,因为我不知道 Cassandra 是否有用于 spark 中结构化流的流到流连接器:
ip15M
.writeStream
.foreachBatch { (df, batchId) => {
// here apply all of your logic on dataframe
}
}
.start()
请记住,在 foreach
循环中,您处理的是 dataframe
,而不是流,您很可能可以将它们直接保存在 Cassandra 中。