如何在流式查询中生成摘要统计信息(使用Summarizer.metrics)?

How to generate summary statistics (using Summarizer.metrics) in streaming query?

目前,我正在使用 spark structured streaming 以 (id, timestamp_value, device_id, temperature_value, comment) 的形式创建随机数据的数据帧。

每批次的 Spark 数据帧:

根据上面数据框的截图,我想对“temperature_value”列做一些描述性的统计。例如,最小值、最大值、平均值、计数、方差。

我在 python 中实现此目的的方法如下:

import sys
import json
import psycopg2
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import from_json, col, to_json
from pyspark.sql.types import *
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import get_json_object
from pyspark.ml.stat import Summarizer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import lit,unix_timestamp
from pyspark.sql import functions as F
import numpy as np
from pyspark.mllib.stat import Statistics

spark = SparkSession.builder.appName(<spark_application_name>).getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark.streams.active

data = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "kafka_broker:<port_number>").option("subscribe", <topic_name>).option("startingOffsets", "latest").load()

schema = StructType([
    StructField("id", DoubleType()),
    StructField("timestamp_value", DoubleType()), 
    StructField("device_id", DoubleType()), 
    StructField("temperature_value", DoubleType()),
    StructField("comment", StringType())])

telemetry_dataframe = data.selectExpr("CAST(value AS STRING)").select(from_json(col("value").cast("string"), schema).alias("tmp")).select("tmp.*")

telemetry_dataframe.printSchema()

temperature_value_selection = telemetry_dataframe.select("temperature_value")

temperature_value_selection_new = temperature_value_selection.withColumn("device_temperature", temperature_value_selection["temperature_value"].cast(DecimalType()))

temperature_value_selection_new.printSchema()

assembler = VectorAssembler(
  inputCols=["device_temperature"], outputCol="temperatures"
)

assembled = assembler.transform(temperature_value_selection_new)

assembled_new = assembled.withColumn("timestamp", F.current_timestamp())

assembled_new.printSchema()

# scaler = StandardScaler(inputCol="temperatures", outputCol="scaledTemperatures", withStd=True, withMean=False).fit(assembled)

# scaled = scaler.transform(assembled)

summarizer = Summarizer.metrics("max", "min", "variance", "mean", "count")

descriptive_table_one = assembled_new.withWatermark("timestamp", "4 minutes").select(summarizer.summary(assembled_new.temperatures))
#descriptive_table_one = assembled_new.withWatermark("timestamp", "4 minutes").groupBy(F.col("timestamp")).agg(max(F.col('timestamp')).alias("timestamp")).orderBy('timestamp', ascending=False).select(summarizer.summary(assembled.temperatures))

#descriptive_table_one = assembled_new.select(summarizer.summary(assembled.temperatures))

# descriptive_table_two = temperature_value_selection_new.select(summarizer.summary(temperature_value_selection_new.device_temperature))


# -------------------------------------------------------------------------------------

#########################################
#               QUERIES                 #
#########################################

query_1 = telemetry_dataframe.writeStream.outputMode("append").format("console").trigger(processingTime = "5 seconds").start()#.awaitTermination()

query_2 = temperature_value_selection_new.writeStream.outputMode("append").format("console").trigger(processingTime = "8 seconds").start()#.awaitTermination()

query_3= assembled_new.writeStream.outputMode("append").format("console").trigger(processingTime = "11 seconds").start()#.awaitTermination()

#query_4_1 = descriptive_table_one.writeStream.outputMode("complete").format("console").trigger(processingTime = "14 seconds").start()#.awaitTermination()
query_4_2 = descriptive_table_one.writeStream.outputMode("append").format("console").trigger(processingTime = "17 seconds").start()#.awaitTermination()

Summarizer documentation.

根据发布的代码,我隔离列“temperature_value”,然后对其进行矢量化(使用 VectorAssembler)以创建矢量类型的列“temperatures”。

我想要的是将“Summarizer”函数的结果输出到我的控制台。这就是为什么我对 outputMode 使用“append”并格式化“console”的原因。但我收到此错误:pyspark.sql.utils.AnalysisException:'当流 DataFrames/DataSets 上有流聚合时不支持附加输出模式,没有水印 。因此,我使用了“withWatermark”函数,但我仍然遇到与输出模式“append”相同的错误。

当我尝试将输出模式更改为“完成”时,我的终端立即终止了火花流。

即时流终止:

我的问题:

  1. 我应该如何使用“withWatermark”函数将矢量列“温度”的汇总统计信息输出到我的控制台?

  2. 是否有任何其他方法可以为我的数据框的自定义列计算描述性统计信息,我可能会错过?

感谢提前提供的任何帮助。

编辑 (20.12.2019)

解决方案已给出并被接受。虽然,现在我收到以下错误:

When I tried to change the outputMode to "complete", my terminal was instantly terminating the spark streaming.

您所有的流式查询都已启动 运行ning,但是 pyspark 应用程序的(主线程)甚至没有给它们 运行 长时间的机会(因为它不等待由于 #.awaitTermination()).

而终止

您应该使用 StreamingQuery.awaitTermination() 阻塞 pyspark 应用程序的主线程,例如query_1.awaitTermination()