使用 .filterWhen 定义两个工作流

Defining Two Workflows Using Using .filterWhen

我有一个异步谓词来检查我的数据库中是否存在文档。根据它是否存在,我有两个工作流程。我想在单个反应管道中实现这一点:

Flux.fromIterable(docIds)
    .filterWhen(docId -> dbConnector.isPresent(docId))
    .flatMap(a -> workflow1(a))  //workflow1 is guaranteed to return 1 element
    .switchIfEmpty(Mono.defer(() -> workflow2(rejectedDocIdHere)));

我面临的问题是我没有办法找到rejectedDocIdHere。这可以通过在管道外设置 AtomicReference<T> 来解决,但我真的不想这样做。我想到的其他一些事情:

  1. 使用嵌套 flatMap 作为:

     Flux.fromIterable(docIds)
         .flatMap(docId -> {
              return Mono.just(docId)
                         .filterWhen(dbConnector::isPresent)
                         .flatMap(this::workflow1)
                         .switchIfEmtpy(Mono.defer(() -> workflow2(docId)))
     })
    

有没有办法避免这种嵌套?我觉得我正在尝试将 filter 用于未制作的东西 - 它基本上过滤掉了项目,我将无法在管道的稍后位置找到这些被拒绝的项目。无论如何想看看响应式专家对此有何看法。

I feel like I am trying to use filter for something it wasn't made

我同意这一点。

我只会在您绝对不再需要此元素时才使用 filterfilterWhen()。如果你这样做,那么我的偏好是 flatMap() 到一个单独的类型,然后根据该类型执行后续的反应操作:

Flux.fromIterable(docIds)
    .flatMap(docId -> dbConnector.isPresent(docId).zipWith(Mono.just(docId))
       //From this point onwards you have a type containing both the ID, and its validation status
    .flatMap(t -> t.getT1() ?
                  workflow1(t.getT2()) :
                  Mono.defer(() -> workflow2(t.getT2())))

请注意,为了简洁起见,上面的示例仅使用了原始元组,但我始终倾向于在实际使用中使用专用类型;我发现它使代码更清晰。您当然可以将该示例中的那两个 flatMap() 调用压缩为一个,但随后您会进入您试图避免的嵌套情况,并且它可能会使反应链的进一步扩展变得更加混乱。