为什么使用 switchIfEmpty 时项目反应堆会无限期挂起?
Why does project reactor hang indefinitely when switchIfEmpty is used?
背景
我正在使用 Spring Boot 2.2.1、project-reactor 3.3.0 和 spring-data-mongodb 2.2.1,我正在尝试加载数据来自多个查询。我的代码大致如下所示:
Flux.just("type1", "type2", "type3", "type4")
.concatMap { type ->
reactiveMongoOperations.find<Map<String, Any>>(BasicQuery("{'type': '$type'}"), "collectionName")
.doOnError { e ->
log.error("Caught exception when reading from mongodb: ${e::class.simpleName} - ${e.message}", e)
}.switchIfEmpty {
log.warn("Failed to find any documents of type $type")
Mono.empty<Map<String, Any>>()
}
}
.. // More operations here
.subscribe()
问题是,如果 reactiveMongoOperations.find(..)
没有找到给定类型的任何文档(因此 "Failed to find any documents of type $type"
被记录),整个操作将无限期挂起。如果我删除 switchIfEmpty
子句,操作将完成并且一切正常。
问题
- 如果我添加
switchIfEmpty
操作,为什么整个操作会挂起?如果我使用 flatMap
而不是 concatMap
没关系,它最终还是会挂起。
- 我应该如何记录没有找到特定查询的文档? IE。我想记录当
reactiveMongoOperations.find(..)
returns 为空 Flux
. 时未找到任何文件
当从 Kotlin 将代码重写为 Java 时(正如 Thomas 在评论中所建议的那样),我找到了答案!我曾假设我使用了 reactor-kotlin-extensions
库提供的 Kotlin reactor.kotlin.core.publisher.switchIfEmpty
扩展函数:
fun <T> Flux<T>.switchIfEmpty(s: () -> Publisher<T>): Flux<T> = this.switchIfEmpty(Flux.defer { s() })
这里不是这种情况,因此我最终使用了 Flux
中定义的 switchIfEmpty
方法,定义如下:
public final Flux<T> switchIfEmpty(Publisher<? extends T> alternate)
为了让它在没有扩展功能的情况下工作,我可能应该做这样的事情:
..
.switchIfEmpty { subscriber ->
log.warn("Failed to find any documents of type $type")
subscriber.onComplete()
}
我最初的解决方案没有用,因为 Java 版本假定我 创建 一个 Publisher
(我这样做了)并且 调用 这个发布者的一个函数(我没有)。在 Kotlin 中,lambda 参数是可选的,如果您不需要它,这就是类型系统没有捕捉到它的原因。
这是 Kotlin 与 Java 互操作的一种方式可能很棘手。
背景
我正在使用 Spring Boot 2.2.1、project-reactor 3.3.0 和 spring-data-mongodb 2.2.1,我正在尝试加载数据来自多个查询。我的代码大致如下所示:
Flux.just("type1", "type2", "type3", "type4")
.concatMap { type ->
reactiveMongoOperations.find<Map<String, Any>>(BasicQuery("{'type': '$type'}"), "collectionName")
.doOnError { e ->
log.error("Caught exception when reading from mongodb: ${e::class.simpleName} - ${e.message}", e)
}.switchIfEmpty {
log.warn("Failed to find any documents of type $type")
Mono.empty<Map<String, Any>>()
}
}
.. // More operations here
.subscribe()
问题是,如果 reactiveMongoOperations.find(..)
没有找到给定类型的任何文档(因此 "Failed to find any documents of type $type"
被记录),整个操作将无限期挂起。如果我删除 switchIfEmpty
子句,操作将完成并且一切正常。
问题
- 如果我添加
switchIfEmpty
操作,为什么整个操作会挂起?如果我使用flatMap
而不是concatMap
没关系,它最终还是会挂起。 - 我应该如何记录没有找到特定查询的文档? IE。我想记录当
reactiveMongoOperations.find(..)
returns 为空Flux
. 时未找到任何文件
当从 Kotlin 将代码重写为 Java 时(正如 Thomas 在评论中所建议的那样),我找到了答案!我曾假设我使用了 reactor-kotlin-extensions
库提供的 Kotlin reactor.kotlin.core.publisher.switchIfEmpty
扩展函数:
fun <T> Flux<T>.switchIfEmpty(s: () -> Publisher<T>): Flux<T> = this.switchIfEmpty(Flux.defer { s() })
这里不是这种情况,因此我最终使用了 Flux
中定义的 switchIfEmpty
方法,定义如下:
public final Flux<T> switchIfEmpty(Publisher<? extends T> alternate)
为了让它在没有扩展功能的情况下工作,我可能应该做这样的事情:
..
.switchIfEmpty { subscriber ->
log.warn("Failed to find any documents of type $type")
subscriber.onComplete()
}
我最初的解决方案没有用,因为 Java 版本假定我 创建 一个 Publisher
(我这样做了)并且 调用 这个发布者的一个函数(我没有)。在 Kotlin 中,lambda 参数是可选的,如果您不需要它,这就是类型系统没有捕捉到它的原因。
这是 Kotlin 与 Java 互操作的一种方式可能很棘手。