如何使用 kafka readStream 在 pyspark 中每 5 秒读取一次?

How can I read every 5 seconds in pyspark with kafka readStream?

我想每 5 秒阅读一个主题;对于旧版本的 pyspark 我可以使用 kafka-utils 和 window 方法,但目前我不能使用它。

现在我使用 sparkkafka 加载数据,代码如下

spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", 'localhost:9092') \
    .option("subscribe", 'data') \
    .load()

但是,我正在读取所有数据。

所以我想知道如何在可能的情况下每 5 秒读取一次批量大小为 1 秒的数据。

谢谢

假设您希望每 5 秒按某项进行聚合和分组,请参阅 documentation on windowing

这应该定义一个翻滚 window

kafka_df \
    .withWatermark("timestamp", "5 seconds") \
    .groupBy(
        window(kafka_df.timestamp, "5 seconds", "1 second"),
        kafka_df.value)