为什么使用 switchIfEmpty 时项目反应堆会无限期挂起?

Why does project reactor hang indefinitely when switchIfEmpty is used?

背景

我正在使用 Spring Boot 2.2.1、project-reactor 3.3.0 和 spring-data-mongodb 2.2.1,我正在尝试加载数据来自多个查询。我的代码大致如下所示:

Flux.just("type1", "type2", "type3", "type4")
    .concatMap { type ->
        reactiveMongoOperations.find<Map<String, Any>>(BasicQuery("{'type': '$type'}"), "collectionName")
                                .doOnError { e ->
                                    log.error("Caught exception when reading from mongodb: ${e::class.simpleName} - ${e.message}", e)
                                }.switchIfEmpty {
                                    log.warn("Failed to find any documents of type $type")
                                    Mono.empty<Map<String, Any>>()
                                }
    } 
    .. // More operations here
    .subscribe()

问题是,如果 reactiveMongoOperations.find(..) 没有找到给定类型的任何文档(因此 "Failed to find any documents of type $type" 被记录),整个操作将无限期挂起。如果我删除 switchIfEmpty 子句,操作将完成并且一切正常。

问题

  1. 如果我添加 switchIfEmpty 操作,为什么整个操作会挂起?如果我使用 flatMap 而不是 concatMap 没关系,它最终还是会挂起。
  2. 我应该如何记录没有找到特定查询的文档? IE。我想记录当 reactiveMongoOperations.find(..) returns 为空 Flux.
  3. 时未找到任何文件

当从 Kotlin 将代码重写为 Java 时(正如 Thomas 在评论中所建议的那样),我找到了答案!我曾假设我使用了 reactor-kotlin-extensions 库提供的 Kotlin reactor.kotlin.core.publisher.switchIfEmpty 扩展函数:

fun <T> Flux<T>.switchIfEmpty(s: () -> Publisher<T>): Flux<T> = this.switchIfEmpty(Flux.defer { s() })

这里不是这种情况,因此我最终使用了 Flux 中定义的 switchIfEmpty 方法,定义如下:

public final Flux<T> switchIfEmpty(Publisher<? extends T> alternate)

为了让它在没有扩展功能的情况下工作,我可能应该做这样的事情:

.. 
.switchIfEmpty { subscriber ->
    log.warn("Failed to find any documents of type $type")
    subscriber.onComplete()
}

我最初的解决方案没有用,因为 Java 版本假定我 创建 一个 Publisher (我这样做了)并且 调用 这个发布者的一个函数(我没有)。在 Kotlin 中,lambda 参数是可选的,如果您不需要它,这就是类型系统没有捕捉到它的原因。

这是 Kotlin 与 Java 互操作的一种方式可能很棘手。