在 Spring Project Reactor 中重新启动上游发布者时清除流中的运行中元素?
Clear in-flight elements in a stream when an upstream publisher is restarted in Spring Project Reactor?
我有一个发布者在 MongoDB 和 returns 上执行长 运行ning 和大型查询 Flux. Entities that are marked in the database as "processed" will be filtered out and the entities are then buffered and passed to a concatMap 运算符中的数据(以便所有buffered ≤elements 在处理下一个缓冲区中的元素之前处理)。它看起来像这样:
Flux<Entity> entitiesFromMongoDb = myMongoRepository.executeLargeQuery();
entitiesFromMongoDb.filter(entity -> !entity.isProcessed())
.buffer(10)
.concatMap(bufferedEntityList ->
Flux.fromIterable(bufferedEntityList)
.flatMap(makeExternalCall)
.then()));
其中 makeExternalCall
调用第三方远程服务器 并且 在调用完成后将实体设置为 processed
。在大多数情况下,这工作正常,但是当远程服务器真的很慢或有错误时,makeExternalCall
将重试(使用指数退避)对远程服务器的操作。在某些情况下,处理完所有 10 个外部调用可能需要相当长的时间。事实上,myMongoRepository.executeLargeQuery()
发布者重新启动并再次执行查询可能需要很长时间。现在我们 运行 遇到一个问题,我将在此处尝试描述:
- 实体 A 从数据库中读取(即它在
myMongoRepository.executeLargeQuery()
产生的流量中被 return 编辑)。它尚未标记为 "processed",这意味着 entity.isProcessed()
将 return false
并将保留在流中。
- 外部服务器真的很慢或宕机,因此
makeExternalCall
被迫在 实体 A 被标记为 "processed" 之前重试操作 数据库
myMongoRepository.executeLargeQuery()
重新启动并再次执行查询。
- 再次从数据库中读取实体 A。但问题是已经有另一个实体 A 的实例在飞行中,因为它尚未被先前调用
myMongoRepository.executeLargeQuery()
. 标记为 "processed"
- 这意味着
makeExternalCall
将被实体A调用两次,这不是最优的!
我可以向数据库发出额外的请求并检查 makeExternalCall
方法中每个实体的 processed
的状态,但这会导致额外的负载(因为需要额外的请求每个实体)到不是最佳的数据库。
所以我的问题是:
有没有办法以某种方式 "restart" 整个流,从而在 MongoDB 查询触发时清除中间缓冲区(即从正在进行的流中删除正在运行的实体 A) myMongoRepository.executeLargeQuery()
是 restarted/re-executed?或者有更好的方法来处理这个问题吗?
我正在使用 Spring 启动 2.2.4.RELEASE
、项目反应堆 3.3.2.RELEASE
和 spring-boot-starter-data-mongodb-reactive
2.2.4.RELEASE
。
不确定我是否完全理解了问题。但试着回答,因为它听起来很有趣。
由于您需要了解 makeExternalCall
已经在处理的请求,您能否维护一个包含正在处理的实体的集合/本地缓存?
Set<Entity> inProgress = new HashSet<>();
Flux<Entity> entitiesFromMongoDb = myMongoRepository.executeLargeQuery();
entitiesFromMongoDb.filter(entity -> !entity.isProcessed())
.buffer(10)
.map(bufferedEntityList -> { // remove the inprogress requests to avoid redundant processing
bufferedEntityList.removeIf(inProgress::contains);
return bufferedEntityList;
})
.concatMap(bufferedEntityList ->
inProgress.addAll(bufferedEntityList);
Flux.fromIterable(bufferedEntityList)
.flatMap(makeExternalCall) //assuming once processed, it emits the entity object
.map(entity -> { //clear growing set
inProgress.remove(entity);
return entity;
})
.then()));
当您需要水平扩展应用程序时,这种方法不是一个好的解决方案。在这种情况下,您可以使用 redis
.
这样的外部缓存服务器,而不是维护本地缓存
我有一个发布者在 MongoDB 和 returns 上执行长 运行ning 和大型查询 Flux. Entities that are marked in the database as "processed" will be filtered out and the entities are then buffered and passed to a concatMap 运算符中的数据(以便所有buffered ≤elements 在处理下一个缓冲区中的元素之前处理)。它看起来像这样:
Flux<Entity> entitiesFromMongoDb = myMongoRepository.executeLargeQuery();
entitiesFromMongoDb.filter(entity -> !entity.isProcessed())
.buffer(10)
.concatMap(bufferedEntityList ->
Flux.fromIterable(bufferedEntityList)
.flatMap(makeExternalCall)
.then()));
其中 makeExternalCall
调用第三方远程服务器 并且 在调用完成后将实体设置为 processed
。在大多数情况下,这工作正常,但是当远程服务器真的很慢或有错误时,makeExternalCall
将重试(使用指数退避)对远程服务器的操作。在某些情况下,处理完所有 10 个外部调用可能需要相当长的时间。事实上,myMongoRepository.executeLargeQuery()
发布者重新启动并再次执行查询可能需要很长时间。现在我们 运行 遇到一个问题,我将在此处尝试描述:
- 实体 A 从数据库中读取(即它在
myMongoRepository.executeLargeQuery()
产生的流量中被 return 编辑)。它尚未标记为 "processed",这意味着entity.isProcessed()
将 returnfalse
并将保留在流中。 - 外部服务器真的很慢或宕机,因此
makeExternalCall
被迫在 实体 A 被标记为 "processed" 之前重试操作 数据库 myMongoRepository.executeLargeQuery()
重新启动并再次执行查询。- 再次从数据库中读取实体 A。但问题是已经有另一个实体 A 的实例在飞行中,因为它尚未被先前调用
myMongoRepository.executeLargeQuery()
. 标记为 "processed"
- 这意味着
makeExternalCall
将被实体A调用两次,这不是最优的!
我可以向数据库发出额外的请求并检查 makeExternalCall
方法中每个实体的 processed
的状态,但这会导致额外的负载(因为需要额外的请求每个实体)到不是最佳的数据库。
所以我的问题是:
有没有办法以某种方式 "restart" 整个流,从而在 MongoDB 查询触发时清除中间缓冲区(即从正在进行的流中删除正在运行的实体 A) myMongoRepository.executeLargeQuery()
是 restarted/re-executed?或者有更好的方法来处理这个问题吗?
我正在使用 Spring 启动 2.2.4.RELEASE
、项目反应堆 3.3.2.RELEASE
和 spring-boot-starter-data-mongodb-reactive
2.2.4.RELEASE
。
不确定我是否完全理解了问题。但试着回答,因为它听起来很有趣。
由于您需要了解 makeExternalCall
已经在处理的请求,您能否维护一个包含正在处理的实体的集合/本地缓存?
Set<Entity> inProgress = new HashSet<>();
Flux<Entity> entitiesFromMongoDb = myMongoRepository.executeLargeQuery();
entitiesFromMongoDb.filter(entity -> !entity.isProcessed())
.buffer(10)
.map(bufferedEntityList -> { // remove the inprogress requests to avoid redundant processing
bufferedEntityList.removeIf(inProgress::contains);
return bufferedEntityList;
})
.concatMap(bufferedEntityList ->
inProgress.addAll(bufferedEntityList);
Flux.fromIterable(bufferedEntityList)
.flatMap(makeExternalCall) //assuming once processed, it emits the entity object
.map(entity -> { //clear growing set
inProgress.remove(entity);
return entity;
})
.then()));
当您需要水平扩展应用程序时,这种方法不是一个好的解决方案。在这种情况下,您可以使用 redis
.