如何显示流式查询中间转换的结果?
How to display results of intermediate transformations of streaming query?
我正在实施一个用例来试用 Spark 结构化流 API。
源数据从 Kafka 主题中读取,并在应用一些转换后,将结果写入控制台。
我想打印中间输出以及结构化流查询的最终结果。
这是代码片段:
val trips = getTaxiTripDataframe() //this function consumes kafka topic and desrialize the byte array to create dataframe with required columns
val filteredTrips = trips.filter(col("taxiCompany").isNotNull && col("pickUpArea").isNotNull)
val output = filteredTrips
.groupBy("taxiCompany","pickupArea")
.agg(Map("pickupArea" -> "count"))
val query = output.writeStream.format("console")
.option("numRows","50")
.option("truncate","false")
.outputMode("update").start()
query.awaitTermination()
我想在控制台上打印 'filteredTrips' 数据帧。我尝试使用数据帧的 .show() 方法,但由于它是在流数据上创建的数据帧,因此抛出以下异常:
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
还有其他解决方法吗?
是的,您可以创建两个流(我使用的是 Spark 2.4.3)
val filteredTrips = trips.filter(col("taxiCompany").isNotNull && col("pickUpArea").isNotNull)
val query1 = filteredTrips
.format("console")
.option("numRows","50")
.option("truncate","false")
.outputMode("update").start()
val query2 = filteredTrips
.groupBy("taxiCompany","pickupArea")
.agg(Map("pickupArea" -> "count"))
.writeStream
.format("console")
.option("numRows","50")
.option("truncate","false")
.outputMode("update").start()
query1.awaitTermination()
query2.awaitTermination()
我正在实施一个用例来试用 Spark 结构化流 API。 源数据从 Kafka 主题中读取,并在应用一些转换后,将结果写入控制台。
我想打印中间输出以及结构化流查询的最终结果。
这是代码片段:
val trips = getTaxiTripDataframe() //this function consumes kafka topic and desrialize the byte array to create dataframe with required columns
val filteredTrips = trips.filter(col("taxiCompany").isNotNull && col("pickUpArea").isNotNull)
val output = filteredTrips
.groupBy("taxiCompany","pickupArea")
.agg(Map("pickupArea" -> "count"))
val query = output.writeStream.format("console")
.option("numRows","50")
.option("truncate","false")
.outputMode("update").start()
query.awaitTermination()
我想在控制台上打印 'filteredTrips' 数据帧。我尝试使用数据帧的 .show() 方法,但由于它是在流数据上创建的数据帧,因此抛出以下异常:
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
还有其他解决方法吗?
是的,您可以创建两个流(我使用的是 Spark 2.4.3)
val filteredTrips = trips.filter(col("taxiCompany").isNotNull && col("pickUpArea").isNotNull)
val query1 = filteredTrips
.format("console")
.option("numRows","50")
.option("truncate","false")
.outputMode("update").start()
val query2 = filteredTrips
.groupBy("taxiCompany","pickupArea")
.agg(Map("pickupArea" -> "count"))
.writeStream
.format("console")
.option("numRows","50")
.option("truncate","false")
.outputMode("update").start()
query1.awaitTermination()
query2.awaitTermination()