如何限制活动 Spring WebClient 调用的数量
How to limit the number of active Spring WebClient calls
我有一个要求,我使用 Spring 批处理从 SQL 数据库读取一堆行(数千行)并调用 REST 服务来丰富内容,然后再将它们写入 Kafka 主题。
使用Spring Reactive webClient 时,如何限制活动非阻塞服务调用的数量?在使用 Spring Batch 读取数据后,我是否应该以某种方式在循环中引入 Flux?
(我了解 delayElements 的用法并且它有不同的用途,因为当单个获取服务调用带来大量数据并且您希望服务器减慢速度时 - 不过在这里,我的用例是有点不同,因为我有很多 WebClient 调用要进行,我想限制调用次数以避免内存不足问题,但仍然可以获得非阻塞调用的优势。
非常有趣的问题。我仔细考虑了一下,并想到了一些关于如何做到这一点的想法。我将分享我的想法,希望这里有一些想法可以帮助您进行调查。
不幸的是,我不熟悉 Spring 批处理。然而,这听起来像是 rate limiting, or the classical producer-consumer problem.
的问题
所以,我们有一个生产者,生产了很多消息,我们的消费者跟不上,中间的缓冲变得难以忍受。
我看到的问题是,正如您所描述的,您的 Spring 批处理过程没有作为流或管道工作,但您的反应式 Web 客户端可以。
因此,如果我们能够以流的形式读取数据,那么当记录开始进入管道时,这些记录将由反应式 Web 客户端进行处理,并且使用背压,我们可以控制数据流从 producer/database 侧流。
制作方
因此,我要更改的第一件事是如何从数据库中提取记录。我们需要控制当时从数据库中读取多少记录,通过分页我们的数据检索或通过控制 fetch size 然后,通过背压,控制其中有多少通过反应管道发送到下游.
因此,考虑以下(基本的)数据库数据检索,包装在 Flux
.
Flux<String> getData(DataSource ds) {
return Flux.create(sink -> {
try {
Connection con = ds.getConnection();
con.setAutoCommit(false);
PreparedStatement stm = con.prepareStatement("SELECT order_number FROM orders WHERE order_date >= '2018-08-12'", ResultSet.TYPE_FORWARD_ONLY);
stm.setFetchSize(1000);
ResultSet rs = stm.executeQuery();
sink.onRequest(batchSize -> {
try {
for (int i = 0; i < batchSize; i++) {
if (!rs.next()) {
//no more data, close resources!
rs.close();
stm.close();
con.close();
sink.complete();
break;
}
sink.next(rs.getString(1));
}
} catch (SQLException e) {
//TODO: close resources here
sink.error(e);
}
});
}
catch (SQLException e) {
//TODO: close resources here
sink.error(e);
}
});
}
在上面的例子中:
- 我通过设置获取大小将每批次读取的记录量控制为 1000。
- 接收器将发送订阅者请求的记录量(即
batchSize
),然后使用背压等待它请求更多。
- 当结果集中没有更多记录时,我们完成接收并关闭资源。
- 如果任何时候发生错误,我们都会发回错误并关闭资源。
- 或者,我可以使用分页来读取数据,这可能通过在每个请求周期重新发出查询来简化资源处理。
- 如果订阅被取消或处置 (
sink.onCancel
, sink.onDispose
),您也可以考虑做一些事情,因为关闭连接和其他资源是这里的基础。
消费者端
在消费者端,您注册了一个订阅者,该订阅者当时仅以 1000 条的速度请求消息,并且只有在处理完该批次后才会请求更多消息。
getData(source).subscribe(new BaseSubscriber<String>() {
private int messages = 0;
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(1000);
}
@Override
protected void hookOnNext(String value) {
//make http request
System.out.println(value);
messages++;
if(messages % 1000 == 0) {
//when we're done with a batch
//then we're ready to request for more
upstream().request(1000);
}
}
});
在上面的示例中,当订阅开始时,它会请求第一批 1000 条消息。在 onNext
中,我们处理第一批,使用 Web 客户端发出 http 请求。
一批完成后,我们会向发布者请求另一批 1000 件,依此类推。
给你!使用背压,您可以控制当时有多少打开的 HTTP 请求。
我的示例非常初级,需要一些额外的工作才能使其投入生产,但我相信这有望提供一些可以适应您的 Spring 批处理场景的想法。
我有一个要求,我使用 Spring 批处理从 SQL 数据库读取一堆行(数千行)并调用 REST 服务来丰富内容,然后再将它们写入 Kafka 主题。
使用Spring Reactive webClient 时,如何限制活动非阻塞服务调用的数量?在使用 Spring Batch 读取数据后,我是否应该以某种方式在循环中引入 Flux?
(我了解 delayElements 的用法并且它有不同的用途,因为当单个获取服务调用带来大量数据并且您希望服务器减慢速度时 - 不过在这里,我的用例是有点不同,因为我有很多 WebClient 调用要进行,我想限制调用次数以避免内存不足问题,但仍然可以获得非阻塞调用的优势。
非常有趣的问题。我仔细考虑了一下,并想到了一些关于如何做到这一点的想法。我将分享我的想法,希望这里有一些想法可以帮助您进行调查。
不幸的是,我不熟悉 Spring 批处理。然而,这听起来像是 rate limiting, or the classical producer-consumer problem.
的问题所以,我们有一个生产者,生产了很多消息,我们的消费者跟不上,中间的缓冲变得难以忍受。
我看到的问题是,正如您所描述的,您的 Spring 批处理过程没有作为流或管道工作,但您的反应式 Web 客户端可以。
因此,如果我们能够以流的形式读取数据,那么当记录开始进入管道时,这些记录将由反应式 Web 客户端进行处理,并且使用背压,我们可以控制数据流从 producer/database 侧流。
制作方
因此,我要更改的第一件事是如何从数据库中提取记录。我们需要控制当时从数据库中读取多少记录,通过分页我们的数据检索或通过控制 fetch size 然后,通过背压,控制其中有多少通过反应管道发送到下游.
因此,考虑以下(基本的)数据库数据检索,包装在 Flux
.
Flux<String> getData(DataSource ds) {
return Flux.create(sink -> {
try {
Connection con = ds.getConnection();
con.setAutoCommit(false);
PreparedStatement stm = con.prepareStatement("SELECT order_number FROM orders WHERE order_date >= '2018-08-12'", ResultSet.TYPE_FORWARD_ONLY);
stm.setFetchSize(1000);
ResultSet rs = stm.executeQuery();
sink.onRequest(batchSize -> {
try {
for (int i = 0; i < batchSize; i++) {
if (!rs.next()) {
//no more data, close resources!
rs.close();
stm.close();
con.close();
sink.complete();
break;
}
sink.next(rs.getString(1));
}
} catch (SQLException e) {
//TODO: close resources here
sink.error(e);
}
});
}
catch (SQLException e) {
//TODO: close resources here
sink.error(e);
}
});
}
在上面的例子中:
- 我通过设置获取大小将每批次读取的记录量控制为 1000。
- 接收器将发送订阅者请求的记录量(即
batchSize
),然后使用背压等待它请求更多。 - 当结果集中没有更多记录时,我们完成接收并关闭资源。
- 如果任何时候发生错误,我们都会发回错误并关闭资源。
- 或者,我可以使用分页来读取数据,这可能通过在每个请求周期重新发出查询来简化资源处理。
- 如果订阅被取消或处置 (
sink.onCancel
,sink.onDispose
),您也可以考虑做一些事情,因为关闭连接和其他资源是这里的基础。
消费者端
在消费者端,您注册了一个订阅者,该订阅者当时仅以 1000 条的速度请求消息,并且只有在处理完该批次后才会请求更多消息。
getData(source).subscribe(new BaseSubscriber<String>() {
private int messages = 0;
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(1000);
}
@Override
protected void hookOnNext(String value) {
//make http request
System.out.println(value);
messages++;
if(messages % 1000 == 0) {
//when we're done with a batch
//then we're ready to request for more
upstream().request(1000);
}
}
});
在上面的示例中,当订阅开始时,它会请求第一批 1000 条消息。在 onNext
中,我们处理第一批,使用 Web 客户端发出 http 请求。
一批完成后,我们会向发布者请求另一批 1000 件,依此类推。
给你!使用背压,您可以控制当时有多少打开的 HTTP 请求。
我的示例非常初级,需要一些额外的工作才能使其投入生产,但我相信这有望提供一些可以适应您的 Spring 批处理场景的想法。