流式查询未显示 Spark 中的任何进度
Streaming query not showing any progress in Spark
我正在从 Spark 结构化流应用程序中获取表单的状态消息:
18/02/12 16:38:54 INFO StreamExecution: Streaming query made progress: {
"id" : "a6c37f0b-51f4-47c5-a487-8bd269b80142",
"runId" : "061e41b4-f488-4483-a290-403f1f7eff03",
"name" : null,
"timestamp" : "2018-02-12T11:08:54.323Z",
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"getOffset" : 30,
"triggerExecution" : 46
},
"eventTime" : {
"watermark" : "1970-01-01T00:00:00.000Z"
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "FileStreamSource[file:/home/chiralcarbon/IdeaProjects/spark_structured_streaming/args[0]]",
"startOffset" : null,
"endOffset" : null,
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@bcc171"
}
}
所有消息的 numInputRows 值为 0。
该程序从 parquet 文件流式传输数据并将相同的流输出到 console.Following 是代码:
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder.
master("local")
.appName("sparkSession")
.getOrCreate()
val schema = ..
val in = spark.readStream
.schema(schema)
.parquet("args[0]")
val query = in.writeStream
.format("console")
.outputMode("append")
.start()
query.awaitTermination()
}
}
这是什么原因,我该如何解决?
您在 readStream 中有一个错误:
val in = spark.readStream
.schema(schema)
.parquet("args[0]")
您可能想从第一个参数中提供的目录中读取。然后改用直接调用或字符串插值:
val in = spark.readStream
.schema(schema)
.parquet(args(0))
或最后一行,如果表达式较长或在其他情况下有一些连接:
.parquet(s"${args(0)}")
目前您的代码试图从不存在的目录中读取,因此不会读取任何文件。更改后,目录将以正确的方式提供,Spark 将开始读取文件
我正在从 Spark 结构化流应用程序中获取表单的状态消息:
18/02/12 16:38:54 INFO StreamExecution: Streaming query made progress: {
"id" : "a6c37f0b-51f4-47c5-a487-8bd269b80142",
"runId" : "061e41b4-f488-4483-a290-403f1f7eff03",
"name" : null,
"timestamp" : "2018-02-12T11:08:54.323Z",
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"getOffset" : 30,
"triggerExecution" : 46
},
"eventTime" : {
"watermark" : "1970-01-01T00:00:00.000Z"
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "FileStreamSource[file:/home/chiralcarbon/IdeaProjects/spark_structured_streaming/args[0]]",
"startOffset" : null,
"endOffset" : null,
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@bcc171"
}
}
所有消息的 numInputRows 值为 0。
该程序从 parquet 文件流式传输数据并将相同的流输出到 console.Following 是代码:
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder.
master("local")
.appName("sparkSession")
.getOrCreate()
val schema = ..
val in = spark.readStream
.schema(schema)
.parquet("args[0]")
val query = in.writeStream
.format("console")
.outputMode("append")
.start()
query.awaitTermination()
}
}
这是什么原因,我该如何解决?
您在 readStream 中有一个错误:
val in = spark.readStream
.schema(schema)
.parquet("args[0]")
您可能想从第一个参数中提供的目录中读取。然后改用直接调用或字符串插值:
val in = spark.readStream
.schema(schema)
.parquet(args(0))
或最后一行,如果表达式较长或在其他情况下有一些连接:
.parquet(s"${args(0)}")
目前您的代码试图从不存在的目录中读取,因此不会读取任何文件。更改后,目录将以正确的方式提供,Spark 将开始读取文件