pyspark 结构化流中的数据帧长度
DataFrame length in pyspark structred streeming
我想知道有没有办法知道结构化流媒体中 pyspark 数据帧的长度?
实际上,我正在从 kafka 读取数据帧并寻找一种方法来了解结果数据帧的大小,因为我在不同的步骤中对其进行了不同的过滤。
我在时间戳列上使用 groupby 计算每个 window 中的 df 大小或长度,如下所示:
sqlFunctions.window(col("etimestamp"), "30 seconds", "30 seconds")
并且可以得到我想要的。但是我认为我们应该这样做(主要需求)似乎很奇怪。此外,例如,如果我的主要需求是对另一列进行分组,这意味着我应该首先对该列进行分组,然后在结果数据框中应用另一个分组!有没有办法直接知道查询结果的数量?
使用 df.count()
方法我收到此错误:
pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
代码:
spark = SparkSession \
.builder \
.appName("myapp") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1") \
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_version) \
.option("subscribe", "mytopic") \
.option("mode", "DROPMALFORMED") \
.load()
df2 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
jsonSchema = (
StructType()
.add("fwd", IntegerType()),
.add("st", LongType()),
)
print(df.count(), df2.count())
query = df2\
.writeStream \
.outputMode("append") \
.format("console") \
.start()
来自 Spark 文档,建议的方式:
count() - Cannot return a single count from a streaming Dataset.
Instead, use ds.groupBy().count() which returns a streaming Dataset
containing a running count.
请注意,您应该使用
complete
模式。
我想知道有没有办法知道结构化流媒体中 pyspark 数据帧的长度? 实际上,我正在从 kafka 读取数据帧并寻找一种方法来了解结果数据帧的大小,因为我在不同的步骤中对其进行了不同的过滤。 我在时间戳列上使用 groupby 计算每个 window 中的 df 大小或长度,如下所示:
sqlFunctions.window(col("etimestamp"), "30 seconds", "30 seconds")
并且可以得到我想要的。但是我认为我们应该这样做(主要需求)似乎很奇怪。此外,例如,如果我的主要需求是对另一列进行分组,这意味着我应该首先对该列进行分组,然后在结果数据框中应用另一个分组!有没有办法直接知道查询结果的数量?
使用 df.count()
方法我收到此错误:
pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
代码:
spark = SparkSession \
.builder \
.appName("myapp") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1") \
.getOrCreate()
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_version) \
.option("subscribe", "mytopic") \
.option("mode", "DROPMALFORMED") \
.load()
df2 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
jsonSchema = (
StructType()
.add("fwd", IntegerType()),
.add("st", LongType()),
)
print(df.count(), df2.count())
query = df2\
.writeStream \
.outputMode("append") \
.format("console") \
.start()
来自 Spark 文档,建议的方式:
count() - Cannot return a single count from a streaming Dataset. Instead, use ds.groupBy().count() which returns a streaming Dataset containing a running count.
请注意,您应该使用
complete
模式。