在完整输出模式下,Spark 结构化流中的中间状态可以是 dropped/controlled 吗? (火花 2.4.0)

Can intermediate state be dropped/controlled in Spark structured streaming in Complete Output mode? (Spark 2.4.0)

我有一个场景,我想处理来自 kafka 主题的数据。我有这个特定的 java 代码可以从 kafka 主题中以流的形式读取数据。

Dataset<Row> streamObjs = sparkSession.readStream().format("kafka")
                .option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", streamTopic)
                .option("failOnDataLoss", false).load();

我将其转换为字符串,定义模式,然后尝试使用水印(用于后期数据)和 window(用于分组和聚合),最后输出到 kafka sink。

Dataset<Row> selectExprImporter = streamObjs.selectExpr("CAST(value AS STRING)");

StructType streamSchema = new StructType().add("id", DataTypes.StringType)
                .add("timestamp", DataTypes.LongType)
                .add("values", new MapType(DataTypes.StringType, DataTypes.DoubleType, false));

Dataset<Row> selectValueImporter = selectExprImporter
                .select(functions.from_json(new Column("value"), streamSchema ).alias("data"));
.
.
(More transformations/operations)
.
.

Dataset<Row> aggCount_15min = streamData.withWatermark("timestamp", "2 minute")
                .withColumn("frequency", functions.lit(15))
                .groupBy(new Column("id"), new Column("frequency"),
                        functions.window(new Column("timestamp"), "15 minute").as("time_range"))
                .agg(functions.mean("value").as("mean_value"), functions.sum("value").as("sum"),
                        functions.count(functions.lit(1)).as("number_of_values"))
                .filter("mean_value > 35").orderBy("id", "frequency", "time_range");

aggCount_15min.selectExpr("to_json(struct(*)) AS value").writeStream()
                .outputMode(OutputMode.Complete()).format("kafka").option("kafka.bootstrap.servers", bootstrapServers)
                .option("topic", outputTopic).option("checkpointLocation", checkpointLocation).start().awaitTermination();

问题

  1. 我是否正确理解当在 kafka 接收器中使用 完整输出模式 时,中间状态将永远增加直到我得到 OutOfMemory 异常?

  2. 此外,完整输出模式的理想用例是什么?只在中间data/state不增加的情况下使用?

  3. 我的情况需要完整输出模式,因为我想使用 orderBy 子句。有什么方法可以强制 spark 在每说 30 分钟后放弃它的状态并再次使用新数据吗?

  4. 有没有更好的方法可以不使用完整输出模式但仍能得到想要的结果?我应该使用 spark 结构化流媒体以外的其他东西吗?

期望的结果是按照上面的查询聚合和分组数据,然后在创建第一个批次后,删除所有状态并重新开始下一个批次。这里批次可以是最后处理的时间戳的函数。就像说当当前时间戳从第一个收到的时间戳开始超过 20 分钟时丢弃所有状态并重新开始,或者如果它是 window 时间的函数(在本例中为 15 分钟)则更好,比如说当 4 批 15 分钟时 windows 已经处理,第 5 批的时间戳到达前 4 批的删除状态,并为这批重新开始。

这个问题问了很多事情,较少关注 Spark Structured Streaming (SSS) 的实际作用。然后回答您的编号问题、标题问题和 non-numbered 问题:

一个。标题问题:

Not as such, but Complete mode only stores aggregates, so not all data is stored but a state allowing re-computation based on incremental adding of data. I find the manual misleading in terms of its description, but it may be jus me. But you will get this error otherwise:

org.apache.spark.sql.AnalysisException: Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets

  1. 当我在 kafka sink 中使用完整输出模式 时,我的理解是否正确,中间状态将永远增加直到我得到 OutOfMemory 异常?

The kafka sink does not figure here. The intermediate state is what Spark Structured Streaming needs to store. It stores aggregates and discards the newer data. But in the end you would get an OOM due to this or some other error I suspect.

  1. 此外,完整输出模式的理想用例是什么?只在中间data/state不增加的时候才用?

For aggregations over all data received. 2nd part of your question is not logical and I cannot answer therefore. The state will generally increase over time.

  1. 在我的案例中需要完整输出模式,因为我想使用 orderBy 子句。有什么方法可以强制 spark 在每说 30 分钟后放弃它的状态并再次使用新数据吗?

No, there is not. Even trying to stop gracefully is not an idea and then re-starting as the period is not really 15 mins then. And, it's against the SSS approach anyway. From the manuals: Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode. You cannot drop the state as you would like, again aggregates discussion.

  1. 有没有更好的方法可以不使用完整输出模式但仍能得到想要的结果?我应该使用 spark 结构化流媒体以外的其他东西吗?

No, as you have many requirements that cannot be satisfied by the current implementation. Unless you drop order by and do non-overlapping window operation (15,15) in Append mode with a minuscule watermark, if memory serves correctly. You would then rely on sorting later on by down-stream processing as order by not supported.

最终的总体问题:期望的结果是按照上面的查询聚合和分组数据,然后在创建第一个批次后,删除所有状态并重新开始下一个批次。这里批次可以是最后处理的时间戳的函数。就像说当当前时间戳从第一个收到的时间戳开始超过 20 分钟时丢弃所有状态并重新开始,或者如果它是 window 时间的函数(在本例中为 15 分钟)则更好,比如说当 4 批 15 分钟时 windows 已经处理,第 5 批的时间戳到达前 4 批的删除状态并重新开始这批。

Whilst your ideas may be considered understandable, the SSS-framework does not support it all and specifically what you want(, just yet).