无水印的流 DataFrames/DataSets 上有流聚合时不支持追加输出模式
Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark
我有一个正在加载到 Spark 的 kafka 流。来自 Kafka 主题的消息具有以下属性:bl_iban
、blacklisted
、timestamp
。所以有 IBANS,关于该 IBAN 是否被列入黑名单 (Y/N) 的标志,还有该记录的时间戳。
问题是一个 IBAN 可以有多个记录,因为超时 IBAN 可能会被列入黑名单或“删除”。我想要实现的是,我想知道每个 IBANS 的当前状态。然而,我从更简单的目标开始,那就是为每个 IBAN 列出最新的 timestamp
(之后我想添加 blacklisted
状态)所以我生成了以下代码(其中黑名单代表我从卡夫卡加载的数据集):
blackList = blackList.groupBy("bl_iban")
.agg(col("bl_iban"), max("timestamp"));
之后,我尝试使用以下代码将其打印到控制台:
StreamingQuery query = blackList.writeStream()
.format("console")
.outputMode(OutputMode.Append())
.start();
我有 运行 我的代码,但出现以下错误:
Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark
所以我像这样在我的数据集中添加了水印:
blackList = blackList.withWatermark("timestamp", "2 seconds")
.groupBy("bl_iban")
.agg(col("bl_iban"), max("timestamp"));
之后又遇到了同样的错误。
有什么想法可以解决这个问题吗?
更新:
在 mike 的帮助下,我设法摆脱了那个错误。但问题是我仍然无法让我的黑名单工作。我可以看到数据是如何从 Kafka 加载的,但是在我的组操作之后,我得到了两个空批次,仅此而已。
来自 Kafka 的打印数据:
+-----------------------+-----------+-----------------------+
|bl_iban |blacklisted|timestamp |
+-----------------------+-----------+-----------------------+
|SK047047595122709025789|N |2020-04-10 17:26:58.208|
|SK341492788657560898224|N |2020-04-10 17:26:58.214|
|SK118866580129485701645|N |2020-04-10 17:26:58.215|
+-----------------------+-----------+-----------------------+
这就是我如何得到输出的黑名单:
blackList = blackList.selectExpr("split(cast(value as string),',') as value", "cast(timestamp as timestamp) timestamp")
.selectExpr("value[0] as bl_iban", "value[1] as blacklisted", "timestamp");
这是我的群操作:
Dataset<Row> blackListCurrent = blackList.withWatermark("timestamp", "20 minutes")
.groupBy(window(col("timestamp"), "10 minutes", "5 minutes"), col("bl_iban"))
.agg(col("bl_iban"), max("timestamp"));
Link 到源文件:Spark Blacklist
当您在 Spark 中使用水印时,您需要确保您的聚合了解 window。 Spark documentation 提供了更多背景信息。
在您的情况下,代码应如下所示
blackList = blackList.withWatermark("timestamp", "2 seconds")
.groupBy(window(col("timestamp"), "10 minutes", "5 minutes"), col("bl_iban"))
.agg(col("bl_iban"), max("timestamp"));
重要的是,属性 timestamp
具有时间戳数据类型!
我有一个正在加载到 Spark 的 kafka 流。来自 Kafka 主题的消息具有以下属性:bl_iban
、blacklisted
、timestamp
。所以有 IBANS,关于该 IBAN 是否被列入黑名单 (Y/N) 的标志,还有该记录的时间戳。
问题是一个 IBAN 可以有多个记录,因为超时 IBAN 可能会被列入黑名单或“删除”。我想要实现的是,我想知道每个 IBANS 的当前状态。然而,我从更简单的目标开始,那就是为每个 IBAN 列出最新的 timestamp
(之后我想添加 blacklisted
状态)所以我生成了以下代码(其中黑名单代表我从卡夫卡加载的数据集):
blackList = blackList.groupBy("bl_iban")
.agg(col("bl_iban"), max("timestamp"));
之后,我尝试使用以下代码将其打印到控制台:
StreamingQuery query = blackList.writeStream()
.format("console")
.outputMode(OutputMode.Append())
.start();
我有 运行 我的代码,但出现以下错误:
Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark
所以我像这样在我的数据集中添加了水印:
blackList = blackList.withWatermark("timestamp", "2 seconds")
.groupBy("bl_iban")
.agg(col("bl_iban"), max("timestamp"));
之后又遇到了同样的错误。 有什么想法可以解决这个问题吗?
更新: 在 mike 的帮助下,我设法摆脱了那个错误。但问题是我仍然无法让我的黑名单工作。我可以看到数据是如何从 Kafka 加载的,但是在我的组操作之后,我得到了两个空批次,仅此而已。 来自 Kafka 的打印数据:
+-----------------------+-----------+-----------------------+
|bl_iban |blacklisted|timestamp |
+-----------------------+-----------+-----------------------+
|SK047047595122709025789|N |2020-04-10 17:26:58.208|
|SK341492788657560898224|N |2020-04-10 17:26:58.214|
|SK118866580129485701645|N |2020-04-10 17:26:58.215|
+-----------------------+-----------+-----------------------+
这就是我如何得到输出的黑名单:
blackList = blackList.selectExpr("split(cast(value as string),',') as value", "cast(timestamp as timestamp) timestamp")
.selectExpr("value[0] as bl_iban", "value[1] as blacklisted", "timestamp");
这是我的群操作:
Dataset<Row> blackListCurrent = blackList.withWatermark("timestamp", "20 minutes")
.groupBy(window(col("timestamp"), "10 minutes", "5 minutes"), col("bl_iban"))
.agg(col("bl_iban"), max("timestamp"));
Link 到源文件:Spark Blacklist
当您在 Spark 中使用水印时,您需要确保您的聚合了解 window。 Spark documentation 提供了更多背景信息。
在您的情况下,代码应如下所示
blackList = blackList.withWatermark("timestamp", "2 seconds")
.groupBy(window(col("timestamp"), "10 minutes", "5 minutes"), col("bl_iban"))
.agg(col("bl_iban"), max("timestamp"));
重要的是,属性 timestamp
具有时间戳数据类型!