在 Spring Project Reactor 中重新启动上游发布者时清除流中的运行中元素?

Clear in-flight elements in a stream when an upstream publisher is restarted in Spring Project Reactor?

我有一个发布者在 MongoDB 和 returns 上执行长 运行ning 和大型查询 Flux. Entities that are marked in the database as "processed" will be filtered out and the entities are then buffered and passed to a concatMap 运算符中的数据(以便所有buffered ≤elements 在处理下一个缓冲区中的元素之前处理)。它看起来像这样:

Flux<Entity> entitiesFromMongoDb = myMongoRepository.executeLargeQuery();
entitiesFromMongoDb.filter(entity -> !entity.isProcessed())
                   .buffer(10)
                   .concatMap(bufferedEntityList ->  
                                    Flux.fromIterable(bufferedEntityList)
                                        .flatMap(makeExternalCall)
                                        .then()));

其中 makeExternalCall 调用第三方远程服务器 并且 在调用完成后将实体设置为 processed。在大多数情况下,这工作正常,但是当远程服务器真的很慢或有错误时,makeExternalCall 将重试(使用指数退避)对远程服务器的操作。在某些情况下,处理完所有 10 个外部调用可能需要相当长的时间。事实上,myMongoRepository.executeLargeQuery() 发布者重新启动并再次执行查询可能需要很长时间。现在我们 运行 遇到一个问题,我将在此处尝试描述:

  1. 实体 A 从数据库中读取(即它在 myMongoRepository.executeLargeQuery() 产生的流量中被 return 编辑)。它尚未标记为 "processed",这意味着 entity.isProcessed() 将 return false 并将保留在流中。
  2. 外部服务器真的很慢或宕机,因此 makeExternalCall 被迫在 实体 A 被标记为 "processed" 之前重试操作 数据库
  3. myMongoRepository.executeLargeQuery() 重新启动并再次执行查询。
  4. 再次从数据库中读取实体 A。但问题是已经有另一个实体 A 的实例在飞行中,因为它尚未被先前调用 myMongoRepository.executeLargeQuery().
  5. 标记为 "processed"
  6. 这意味着makeExternalCall将被实体A调用两次,这不是最优的!

我可以向数据库发出额外的请求并检查 makeExternalCall 方法中每个实体的 processed 的状态,但这会导致额外的负载(因为需要额外的请求每个实体)到不是最佳的数据库。

所以我的问题是:

有没有办法以某种方式 "restart" 整个流,从而在 MongoDB 查询触发时清除中间缓冲区(即从正在进行的流中删除正在运行的实体 A) myMongoRepository.executeLargeQuery() 是 restarted/re-executed?或者有更好的方法来处理这个问题吗?

我正在使用 Spring 启动 2.2.4.RELEASE、项目反应堆 3.3.2.RELEASEspring-boot-starter-data-mongodb-reactive 2.2.4.RELEASE

不确定我是否完全理解了问题。但试着回答,因为它听起来很有趣。

由于您需要了解 makeExternalCall 已经在处理的请求,您能否维护一个包含正在处理的实体的集合/本地缓存?

Set<Entity> inProgress = new HashSet<>(); 

Flux<Entity> entitiesFromMongoDb = myMongoRepository.executeLargeQuery();

entitiesFromMongoDb.filter(entity -> !entity.isProcessed())
                   .buffer(10)
                   .map(bufferedEntityList -> {  // remove the inprogress requests to avoid redundant processing
                        bufferedEntityList.removeIf(inProgress::contains);
                        return bufferedEntityList;
                   })
                   .concatMap(bufferedEntityList ->  
                                    inProgress.addAll(bufferedEntityList);
                                    Flux.fromIterable(bufferedEntityList)
                                        .flatMap(makeExternalCall) //assuming once processed, it emits the entity object
                                        .map(entity -> {   //clear growing set
                                            inProgress.remove(entity);
                                            return entity;
                                        })
                                        .then()));

当您需要水平扩展应用程序时,这种方法不是一个好的解决方案。在这种情况下,您可以使用 redis.

这样的外部缓存服务器,而不是维护本地缓存