如何在一个微批中设置最大行数?

How to set maximum number of rows in one micro-batch?

我正在使用 spark-structured-streaming foreachBatch 通过以下代码从 redis 读取批记录(尝试通过 stream.read.batch.size 设置 batchSize)

val data = spark.readStream.format("redis")
  .option("stream.read.batch.size").load()

val query = data.writeStream.foreachBatch { 
  (batchDF: DataFrame, batchId: Long) => ...
  // we count size of batchDF here, we want to limit its size
  // some operation
}

目前我们将 stream.read.batch.size 设置为 128,但这似乎不起作用。 batchSize似乎是随机的,有时超过1000甚至10000。

但是我不想等那么久(10000条记录)因为我有一些操作(在代码注释// some operation)需要尽快完成,所以我想控制最大批处理大小,因此当记录达到此限制时可以立即处理,如何做?

我是 spark-redis 的维护者。目前不支持此功能。 stream.read.batch.size 参数控制单个 Redis API 调用读取的项目数(XREADGROUP 调用的 count 参数)。它不影响每个触发器的项目数(batchDF 大小)。我已经在 github 上为此功能请求开了一张票。

we want to limit its size

您可以使用 Dataset.limit 作为流限制(至少在 Spark 2.4.3 中)。

这样,代码可能如下所示:

val data = spark
  .readStream
  .format("redis")
  .load
  .limit(...your limit here...)