如何在一个微批中设置最大行数?
How to set maximum number of rows in one micro-batch?
我正在使用 spark-structured-streaming foreachBatch
通过以下代码从 redis 读取批记录(尝试通过 stream.read.batch.size
设置 batchSize)
val data = spark.readStream.format("redis")
.option("stream.read.batch.size").load()
val query = data.writeStream.foreachBatch {
(batchDF: DataFrame, batchId: Long) => ...
// we count size of batchDF here, we want to limit its size
// some operation
}
目前我们将 stream.read.batch.size
设置为 128,但这似乎不起作用。 batchSize似乎是随机的,有时超过1000甚至10000。
但是我不想等那么久(10000条记录)因为我有一些操作(在代码注释// some operation
)需要尽快完成,所以我想控制最大批处理大小,因此当记录达到此限制时可以立即处理,如何做?
我是 spark-redis 的维护者。目前不支持此功能。 stream.read.batch.size
参数控制单个 Redis API 调用读取的项目数(XREADGROUP 调用的 count
参数)。它不影响每个触发器的项目数(batchDF 大小)。我已经在 github 上为此功能请求开了一张票。
we want to limit its size
您可以使用 Dataset.limit
作为流限制(至少在 Spark 2.4.3 中)。
这样,代码可能如下所示:
val data = spark
.readStream
.format("redis")
.load
.limit(...your limit here...)
我正在使用 spark-structured-streaming foreachBatch
通过以下代码从 redis 读取批记录(尝试通过 stream.read.batch.size
设置 batchSize)
val data = spark.readStream.format("redis")
.option("stream.read.batch.size").load()
val query = data.writeStream.foreachBatch {
(batchDF: DataFrame, batchId: Long) => ...
// we count size of batchDF here, we want to limit its size
// some operation
}
目前我们将 stream.read.batch.size
设置为 128,但这似乎不起作用。 batchSize似乎是随机的,有时超过1000甚至10000。
但是我不想等那么久(10000条记录)因为我有一些操作(在代码注释// some operation
)需要尽快完成,所以我想控制最大批处理大小,因此当记录达到此限制时可以立即处理,如何做?
我是 spark-redis 的维护者。目前不支持此功能。 stream.read.batch.size
参数控制单个 Redis API 调用读取的项目数(XREADGROUP 调用的 count
参数)。它不影响每个触发器的项目数(batchDF 大小)。我已经在 github 上为此功能请求开了一张票。
we want to limit its size
您可以使用 Dataset.limit
作为流限制(至少在 Spark 2.4.3 中)。
这样,代码可能如下所示:
val data = spark
.readStream
.format("redis")
.load
.limit(...your limit here...)