如何使用 kafka readStream 在 pyspark 中每 5 秒读取一次?
How can I read every 5 seconds in pyspark with kafka readStream?
我想每 5 秒阅读一个主题;对于旧版本的 pyspark
我可以使用 kafka-utils 和 window 方法,但目前我不能使用它。
现在我使用 spark
从 kafka
加载数据,代码如下
spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", 'localhost:9092') \
.option("subscribe", 'data') \
.load()
但是,我正在读取所有数据。
所以我想知道如何在可能的情况下每 5 秒读取一次批量大小为 1 秒的数据。
谢谢
假设您希望每 5 秒按某项进行聚合和分组,请参阅 documentation on windowing
这应该定义一个翻滚 window
kafka_df \
.withWatermark("timestamp", "5 seconds") \
.groupBy(
window(kafka_df.timestamp, "5 seconds", "1 second"),
kafka_df.value)
我想每 5 秒阅读一个主题;对于旧版本的 pyspark
我可以使用 kafka-utils 和 window 方法,但目前我不能使用它。
现在我使用 spark
从 kafka
加载数据,代码如下
spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", 'localhost:9092') \
.option("subscribe", 'data') \
.load()
但是,我正在读取所有数据。
所以我想知道如何在可能的情况下每 5 秒读取一次批量大小为 1 秒的数据。
谢谢
假设您希望每 5 秒按某项进行聚合和分组,请参阅 documentation on windowing
这应该定义一个翻滚 window
kafka_df \
.withWatermark("timestamp", "5 seconds") \
.groupBy(
window(kafka_df.timestamp, "5 seconds", "1 second"),
kafka_df.value)